30 #include "splash/version.hpp"    31 #include "splash/ParallelDataCollector.hpp"    32 #include "splash/AttributeInfo.hpp"    33 #include "splash/basetypes/basetypes.hpp"    34 #include "splash/core/DCParallelDataSet.hpp"    35 #include "splash/core/DCAttribute.hpp"    36 #include "splash/core/DCParallelGroup.hpp"    37 #include "splash/core/logging.hpp"    38 #include "splash/core/H5IdWrapper.hpp"    47     void ParallelDataCollector::setFileAccessParams(hid_t& fileAccProperties)
    49         fileAccProperties = H5Pcreate(H5P_FILE_ACCESS);
    50         H5Pset_fapl_mpio(fileAccProperties, options.mpiComm, options.mpiInfo);
    52         int metaCacheElements = 0;
    53         size_t rawCacheElements = 0;
    54         size_t rawCacheSize = 0;
    65         H5Pget_cache(fileAccProperties, &metaCacheElements, &rawCacheElements, &rawCacheSize, &policy);
    66         rawCacheSize = 256 * 1024 * 1024;
    67         H5Pset_cache(fileAccProperties, metaCacheElements, rawCacheElements, rawCacheSize, policy);
    69         log_msg(3, 
"Raw Data Cache (File) = %llu KiB", (
long long unsigned) (rawCacheSize / 1024));
    72     std::string ParallelDataCollector::getExceptionString(std::string func, std::string msg,
    75         std::stringstream full_msg;
    76         full_msg << 
"Exception for ParallelDataCollector::" << func <<
    80             full_msg << 
" (" << info << 
")";
    82         return full_msg.str();
    85     void ParallelDataCollector::indexToPos(
int index, Dimensions mpiSize, Dimensions &mpiPos)
    87         mpiPos[2] = index / (mpiSize[0] * mpiSize[1]);
    88         mpiPos[1] = (index % (mpiSize[0] * mpiSize[1])) / mpiSize[0];
    89         mpiPos[0] = index % mpiSize[0];
    92     void ParallelDataCollector::listFilesInDir(
const std::string baseFilename, std::set<int32_t> &ids)
    95         log_msg(2, 
"listing files for %s", baseFilename.c_str());
   101         std::string dir_path, name;
   102         std::string::size_type pos = baseFilename.find_last_of(
'/');
   103         if (pos == std::string::npos)
   105             dir_path.assign(
".");
   106             name.assign(baseFilename);
   109             dir_path.assign(baseFilename.c_str(), baseFilename.c_str() + pos);
   110             name.assign(baseFilename.c_str() + pos + 1);
   117         dirp = opendir(dir_path.c_str());
   120             throw DCException(getExceptionString(
"listFilesInDir",
   121                     "Failed to open directory", dir_path.c_str()));
   124         while ((dp = readdir(dirp)) != NULL)
   127             if (strstr(dp->d_name, name.c_str()) == dp->d_name)
   130                 fname.assign(dp->d_name);
   133                 const size_t fileNameLength = fname.size() - 3;
   134                 if (fname.rfind(
".h5") != fileNameLength)
   139                 std::string idStr = fname.substr(fname.rfind(
"_") + 1,
   140                         fileNameLength - name.size());
   142                 int32_t 
id = strtol(idStr.c_str(), &endPtr, 10);
   143                 if (endPtr && *endPtr == 0L) {
   145                     log_msg(3, 
"found file %s with ID %d", fname.c_str(), id);
   149         (void) closedir(dirp);
   157             const Dimensions topology, uint32_t maxFileHandles) :
   158     handles(maxFileHandles, HandleMgr::FNS_ITERATIONS),
   159     fileStatus(FST_CLOSED)
   163         if (MPI_Comm_dup(comm, &(options.mpiComm)) != MPI_SUCCESS)
   164             throw DCException(getExceptionString(
"ParallelDataCollector",
   165                 "failed to duplicate MPI communicator"));
   167         MPI_Comm_rank(options.mpiComm, &(options.mpiRank));
   168         options.enableCompression = 
false;
   169         options.mpiInfo = info;
   171         options.mpiTopology.set(topology);
   177             throw DCException(getExceptionString(
"ParallelDataCollector",
   178                 "failed to initialize/open HDF5 library"));
   180 #ifndef SPLASH_VERBOSE_HDF5   182         if (H5Eset_auto2(H5E_DEFAULT, NULL, NULL) < 0)
   183             throw DCException(getExceptionString(
"ParallelDataCollector",
   184                 "failed to disable error printing"));
   188         setFileAccessParams(fileAccProperties);
   190         handles.registerFileCreate(fileCreateCallback, &options);
   191         handles.registerFileOpen(fileOpenCallback, &options);
   193         indexToPos(options.mpiRank, options.mpiTopology, options.mpiPos);
   199         H5Pclose(fileAccProperties);
   205         if (options.mpiComm != MPI_COMM_NULL)
   207             log_msg(1, 
"finalizing data collector");
   208             MPI_Comm_free(&options.mpiComm);
   209             options.mpiComm = MPI_COMM_NULL;
   216         log_msg(1, 
"opening parallel data collector");
   218         if (filename == NULL)
   219             throw DCException(getExceptionString(
"open", 
"filename must not be null"));
   221         if (fileStatus != FST_CLOSED)
   222             throw DCException(getExceptionString(
"open", 
"this access is not permitted"));
   224         this->baseFilename.assign(filename);
   226         switch (attr.fileAccType)
   229             case FAT_READ_MERGED:
   230                 openRead(filename, attr);
   233                 openWrite(filename, attr);
   236                 openCreate(filename, attr);
   243         if (fileStatus == FST_CLOSED)
   246         log_msg(1, 
"closing parallel data collector");
   253         fileStatus = FST_CLOSED;
   258         std::set<int32_t> ids;
   259         listFilesInDir(this->baseFilename, ids);
   262             options.maxID = *(ids.rbegin());
   264         return options.maxID;
   269         mpiSize.
set(options.mpiTopology);
   275         std::set<int32_t> file_ids;
   276         listFilesInDir(this->baseFilename, file_ids);
   279             *count = file_ids.size();
   284             for (std::set<int32_t>::const_iterator iter = file_ids.begin();
   285                     iter != file_ids.end(); ++iter)
   297         std::stringstream group_id_name;
   298         group_id_name << SDC_GROUP_DATA << 
"/" << id;
   301         DCParallelGroup group;
   302         group.open(handles.get(
id), group_id_name.str());
   304         DCParallelGroup::VisitObjCBType param;
   306         param.entries = entries;
   308         DCParallelGroup::getEntriesInternal(group.getHandle(), group_id_name.str(), 
"", ¶m);
   311             *count = param.count;
   321             throw DCException(getExceptionString(
"readGlobalAttributeInfo", 
"a parameter was null"));
   323         if (fileStatus == FST_CLOSED || fileStatus == FST_CREATING)
   324             throw DCException(getExceptionString(
"readGlobalAttributeInfo", 
"this access is not permitted"));
   326         DCParallelGroup group_custom;
   327         group_custom.open(handles.get(
id), PDC_GROUP_CUSTOM);
   331             return DCAttribute::readAttributeInfo(name, group_custom.getHandle());
   334             log_msg(0, 
"Exception: %s", e.what());
   344         if (name == NULL || data == NULL)
   345             throw DCException(getExceptionString(
"readGlobalAttribute", 
"a parameter was null"));
   347         if (fileStatus == FST_CLOSED || fileStatus == FST_CREATING)
   348             throw DCException(getExceptionString(
"readGlobalAttribute", 
"this access is not permitted"));
   350         DCParallelGroup group_custom;
   351         group_custom.open(handles.get(
id), PDC_GROUP_CUSTOM);
   355             DCAttribute::readAttribute(name, group_custom.getHandle(), data);
   358             log_msg(0, 
"Exception: %s", e.what());
   359             throw DCException(getExceptionString(
"readGlobalAttribute", 
"failed to open attribute", name));
   381         if (name == NULL || data == NULL)
   382             throw DCException(getExceptionString(
"writeGlobalAttribute", 
"a parameter was null"));
   384         if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
   385             throw DCException(getExceptionString(
"writeGlobalAttribute", 
"this access is not permitted"));
   387         if (ndims < 1u || ndims > DSP_DIM_MAX)
   388             throw DCException(getExceptionString(
"writeGlobalAttribute", 
"maximum dimension `ndims` is invalid"));
   390         DCParallelGroup group_custom;
   391         group_custom.open(handles.get(
id), PDC_GROUP_CUSTOM);
   395             DCAttribute::writeAttribute(name, type.
getDataType(), group_custom.getHandle(), ndims, dims, data);
   398             log_msg(0, 
"Exception: %s", e.what());
   399             throw DCException(getExceptionString(
"writeGlobalAttribute", 
"failed to write attribute", name));
   403     hid_t ParallelDataCollector::openGroup(DCGroup& group, int32_t 
id, 
const char* dataName) 
throw (
DCException)
   407         if (dataName && strlen(dataName) == 0)
   408             throw DCException(getExceptionString(
"readAttribute", 
"empty dataset name"));
   410         if (fileStatus == FST_CLOSED)
   411             throw DCException(getExceptionString(
"readAttribute", 
"this access is not permitted"));
   413         std::string group_path, obj_name;
   414         std::string dataNameInternal = 
"";
   416             dataNameInternal.assign(dataName);
   417         DCDataSet::getFullDataPath(dataNameInternal, SDC_GROUP_DATA, 
id, group_path, obj_name);
   419         group.open(handles.get(
id), group_path);
   424             hid_t obj_id = H5Oopen(group.getHandle(), obj_name.c_str(), H5P_DEFAULT);
   427                 throw DCException(getExceptionString(
"readAttribute",
   428                         "dataset not found", obj_name.c_str()));
   436             const char *dataName,
   437             const char *attrName,
   442         if (attrName == NULL)
   443             throw DCException(getExceptionString(
"readAttributeInfo", 
"a parameter was null"));
   445         if (strlen(attrName) == 0)
   446             throw DCException(getExceptionString(
"readAttributeInfo", 
"empty attribute name"));
   448         DCParallelGroup group;
   449         H5ObjectId objId(openGroup(group, 
id, dataName));
   452             return DCAttribute::readAttributeInfo(attrName, group.getHandle());
   454             return DCAttribute::readAttributeInfo(attrName, objId);
   458             const char *dataName,
   459             const char *attrName,
   465         if (attrName == NULL || data == NULL)
   466             throw DCException(getExceptionString(
"readAttribute", 
"a parameter was null"));
   468         if (strlen(attrName) == 0)
   469             throw DCException(getExceptionString(
"readAttribute", 
"empty attribute name"));
   471         DCParallelGroup group;
   472         H5ObjectId objId(openGroup(group, 
id, dataName));
   475             DCAttribute::readAttribute(attrName, group.getHandle(), data);
   477             DCAttribute::readAttribute(attrName, objId, data);
   482             const char *dataName,
   483             const char *attrName,
   494             const char *dataName,
   495             const char *attrName,
   501         if (attrName == NULL || data == NULL)
   502             throw DCException(getExceptionString(
"writeAttribute", 
"a parameter was null"));
   505         if (dataName && strlen(dataName) == 0)
   506             throw DCException(getExceptionString(
"writeAttribute", 
"empty dataset name"));
   508         if (strlen(attrName) == 0)
   509             throw DCException(getExceptionString(
"writeAttribute", 
"empty attribute name"));
   511         if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
   512             throw DCException(getExceptionString(
"writeAttribute", 
"this access is not permitted"));
   514         if (ndims < 1u || ndims > DSP_DIM_MAX)
   515             throw DCException(getExceptionString(
"writeAttribute", 
"maximum dimension `ndims` is invalid"));
   520         std::string group_path, obj_name;
   521         std::string dataNameInternal = 
"";
   523             dataNameInternal.assign(dataName);
   524         DCDataSet::getFullDataPath(dataNameInternal, SDC_GROUP_DATA, 
id, group_path, obj_name);
   526         DCParallelGroup group;
   535             std::string pathAndName(group_path + 
"/" + obj_name);
   536             if(!DCParallelGroup::exists(handles.get(
id), pathAndName))
   537                 group.create(handles.get(
id), pathAndName);
   540             group.open(handles.get(
id), group_path);
   541             hid_t obj_id = H5Oopen(group.getHandle(), obj_name.c_str(), H5P_DEFAULT);
   544                 throw DCException(getExceptionString(
"writeAttribute",
   545                         "object not found", obj_name.c_str()));
   550                 DCAttribute::writeAttribute(attrName, type.
getDataType(), obj_id, ndims, dims, data);
   560             group.openCreate(handles.get(
id), group_path);
   561             DCAttribute::writeAttribute(attrName, type.
getDataType(), group.getHandle(), ndims, dims, data);
   582         if (fileStatus != FST_READING && fileStatus != FST_WRITING)
   583             throw DCException(getExceptionString(
"read", 
"this access is not permitted"));
   586         readCompleteDataSet(handles.get(
id), id, name, dstBuffer, dstOffset,
   597         this->
read(
id, localSize, globalOffset, name, localSize,
   610         if (fileStatus != FST_READING && fileStatus != FST_WRITING)
   611             throw DCException(getExceptionString(
"read", 
"this access is not permitted"));
   614         readDataSet(handles.get(
id), id, name, dstBuffer, dstOffset,
   615                 localSize, globalOffset, sizeRead, ndims, buf);
   624          if (fileStatus != FST_READING && fileStatus != FST_WRITING)
   625              throw DCException(getExceptionString(
"readMeta", 
"this access is not permitted"));
   628          return readDataSetMeta(handles.get(
id), id, name, dstBuffer, dstOffset,
   633             const Selection select, 
const char* name, 
const void* buf)
   637         gatherMPIWrites(ndims, select.count, globalSize, globalOffset);
   639         write(
id, globalSize, globalOffset,
   640                 type, ndims, select, name, buf);
   646             const Selection select, 
const char* name, 
const void* buf)
   649             throw DCException(getExceptionString(
"write", 
"parameter name is NULL"));
   651         if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
   652             throw DCException(getExceptionString(
"write", 
"this access is not permitted"));
   654         if (ndims < 1 || ndims > DSP_DIM_MAX)
   655             throw DCException(getExceptionString(
"write", 
"maximum dimension is invalid"));
   658         std::string group_path, dset_name;
   659         DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, 
id, group_path, dset_name);
   661         DCParallelGroup group;
   662         group.openCreate(handles.get(
id), group_path);
   665         writeDataSet(group.getHandle(), globalSize, globalOffset, type, ndims,
   666                 select, dset_name.c_str(), buf);
   676             throw DCException(getExceptionString(
"reserve", 
"a parameter was NULL"));
   678         if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
   679             throw DCException(getExceptionString(
"write", 
"this access is not permitted"));
   681         if (ndims < 1 || ndims > DSP_DIM_MAX)
   682             throw DCException(getExceptionString(
"write", 
"maximum dimension is invalid"));
   684         reserveInternal(
id, globalSize, ndims, type, name);
   696             throw DCException(getExceptionString(
"reserve", 
"a parameter was NULL"));
   698         if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
   699             throw DCException(getExceptionString(
"write", 
"this access is not permitted"));
   701         if (ndims < 1 || ndims > DSP_DIM_MAX)
   702             throw DCException(getExceptionString(
"write", 
"maximum dimension is invalid"));
   705         gatherMPIWrites(ndims, size, global_size, global_offset);
   707         reserveInternal(
id, global_size, ndims, type, name);
   710             globalSize->set(global_size);
   713             globalOffset->set(global_offset);
   724             throw DCException(getExceptionString(
"append", 
"parameter name is NULL"));
   726         if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
   727             throw DCException(getExceptionString(
"append", 
"this access is not permitted"));
   729         if (ndims < 1 || ndims > DSP_DIM_MAX)
   730             throw DCException(getExceptionString(
"append", 
"maximum dimension is invalid"));
   733         std::string group_path, dset_name;
   734         DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, 
id, group_path, dset_name);
   736         DCParallelGroup group;
   737         group.open(handles.get(
id), group_path);
   740         DCParallelDataSet dataset(dset_name.c_str());
   741         dataset.setWriteIndependent();
   743         if (!dataset.open(group.getHandle()))
   746                     "Cannot open dataset (missing reserve?)", dset_name.c_str()));
   748             dataset.write(
Selection(size), globalOffset, buf);
   756         log_msg(1, 
"removing group %d", 
id);
   758         if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
   759             throw DCException(getExceptionString(
"remove", 
"this access is not permitted"));
   761         std::stringstream group_id_name;
   762         group_id_name << SDC_GROUP_DATA << 
"/" << id;
   764         DCParallelGroup::remove(handles.get(
id), group_id_name.str());
   773         log_msg(1, 
"removing dataset %s from group %d", name, 
id);
   775         if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
   776             throw DCException(getExceptionString(
"remove", 
"this access is not permitted"));
   779             throw DCException(getExceptionString(
"remove", 
"a parameter was NULL"));
   781         std::string group_path, dset_name;
   782         DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, 
id, group_path, dset_name);
   784         DCParallelGroup group;
   785         group.open(handles.get(
id), group_path);
   787         if (H5Ldelete(group.getHandle(), dset_name.c_str(), H5P_LINK_ACCESS_DEFAULT) < 0)
   789             throw DCException(getExceptionString(
"remove", 
"failed to remove dataset", name));
   799         if (srcName == NULL || dstName == NULL)
   800             throw DCException(getExceptionString(
"createReference", 
"a parameter was NULL"));
   802         if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
   803             throw DCException(getExceptionString(
"createReference", 
"this access is not permitted"));
   806             throw DCException(getExceptionString(
"createReference",
   807                 "source and destination ID must be identical", NULL));
   809         if (srcName == dstName)
   810             throw DCException(getExceptionString(
"createReference",
   811                 "a reference must not be identical to the referenced data", srcName));
   814         std::string src_group_path, src_dset_name, dst_dset_name;
   815         DCDataSet::getFullDataPath(srcName, SDC_GROUP_DATA, srcID, src_group_path, src_dset_name);
   816         DCDataSet::getFullDataPath(dstName, SDC_GROUP_DATA, dstID, src_group_path, dst_dset_name);
   818         DCParallelGroup src_group;
   819         src_group.open(handles.get(srcID), src_group_path);
   822         DCParallelDataSet src_dataset(src_dset_name.c_str());
   823         src_dataset.open(src_group.getHandle());
   825         DCParallelDataSet dst_dataset(dst_dset_name.c_str());
   828         dst_dataset.createReference(src_group.getHandle(), src_group.getHandle(), src_dataset);
   838     void ParallelDataCollector::fileCreateCallback(H5Handle handle, uint32_t index, 
void *userData)
   841         Options *options = (Options*) userData;
   844         DCParallelGroup group;
   845         group.create(handle, PDC_GROUP_CUSTOM);
   849         group.create(handle, SDC_GROUP_DATA);
   852         writeHeader(handle, index, options->enableCompression, options->mpiTopology);
   855     void ParallelDataCollector::fileOpenCallback(H5Handle , uint32_t index, 
void *userData)
   858         Options *options = (Options*) userData;
   860         options->maxID = std::max(options->maxID, (int32_t) index);
   863     void ParallelDataCollector::writeHeader(hid_t fHandle, uint32_t 
id,
   864             bool enableCompression, 
Dimensions mpiTopology)
   868         DCParallelGroup group;
   869         group.create(fHandle, SDC_GROUP_HEADER);
   872         bool compression = enableCompression;
   873         std::stringstream splashVersion;
   874         splashVersion << SPLASH_VERSION_MAJOR << 
"."   875                       << SPLASH_VERSION_MINOR << 
"."   876                       << SPLASH_VERSION_PATCH;
   877         std::stringstream splashFormat;
   878         splashFormat << SPLASH_FILE_FORMAT_MAJOR << 
"."   879                      << SPLASH_FILE_FORMAT_MINOR;
   881         ColTypeInt32 ctInt32;
   888         DCAttribute::writeAttribute(SDC_ATTR_MAX_ID, ctInt32.getDataType(),
   889                 group.getHandle(), &index);
   891         DCAttribute::writeAttribute(SDC_ATTR_COMPRESSION, ctBool.
getDataType(),
   892                 group.getHandle(), &compression);
   894         DCAttribute::writeAttribute(SDC_ATTR_MPI_SIZE, dim_t.
getDataType(),
   897         DCAttribute::writeAttribute(SDC_ATTR_VERSION, ctStringVersion.getDataType(),
   898                 group.getHandle(), splashVersion.str().c_str());
   900         DCAttribute::writeAttribute(SDC_ATTR_FORMAT, ctStringFormat.getDataType(),
   901                 group.getHandle(), splashFormat.str().c_str());
   904     void ParallelDataCollector::openCreate(
const char *filename,
   908         this->fileStatus = FST_CREATING;
   918         handles.open(
Dimensions(1, 1, 1), filename, fileAccProperties, H5F_ACC_TRUNC);
   921     void ParallelDataCollector::openRead(
const char* filename, 
FileCreationAttr& )
   924         this->fileStatus = FST_READING;
   928         handles.open(
Dimensions(1, 1, 1), filename, fileAccProperties, H5F_ACC_RDONLY);
   931     void ParallelDataCollector::openWrite(
const char* filename, 
FileCreationAttr& )
   934         this->fileStatus = FST_WRITING;
   941         handles.open(
Dimensions(1, 1, 1), filename, fileAccProperties, H5F_ACC_RDWR);
   944     void ParallelDataCollector::readCompleteDataSet(H5Handle h5File,
   955         log_msg(2, 
"readCompleteDataSet");
   957         if (h5File < 0 || name == NULL)
   958             throw DCException(getExceptionString(
"readCompleteDataSet", 
"invalid parameters"));
   960         std::string group_path, dset_name;
   961         DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, 
id, group_path, dset_name);
   963         DCParallelGroup group;
   964         group.open(h5File, group_path);
   966         DCParallelDataSet dataset(dset_name.c_str());
   967         dataset.open(group.getHandle());
   968         const Dimensions src_size(dataset.getSize() - srcOffset);
   970         dataset.read(dstBuffer, dstOffset, src_size, srcOffset, sizeRead, srcRank, dst);
   974     void ParallelDataCollector::readDataSet(H5Handle h5File,
   988         if (h5File < 0 || name == NULL)
   989             throw DCException(getExceptionString(
"readDataSet", 
"invalid parameters"));
   991         std::string group_path, dset_name;
   992         DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, 
id, group_path, dset_name);
   994         DCParallelGroup group;
   995         group.open(h5File, group_path);
   997         DCParallelDataSet dataset(dset_name.c_str());
   998         dataset.open(group.getHandle());
  1000         dataset.read(dstBuffer, dstOffset, srcSize, srcOffset, sizeRead, srcRank, dst);
  1004     CollectionType* ParallelDataCollector::readDataSetMeta(H5Handle h5File,
  1014         log_msg(2, 
"readDataSetMeta");
  1016         std::string group_path, dset_name;
  1017         DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, 
id, group_path, dset_name);
  1020         group.open(h5File, group_path);
  1022         DCDataSet dataset(dset_name.c_str());
  1023         dataset.open(group.getHandle());
  1027         std::vector<DataCollector::DCEntry> entries(entrySize);
  1032         int32_t entry_id = -1;
  1033         for(
size_t i = 0; i < entrySize; ++i)
  1034             if(std::string(name) == entries[i].name)
  1036                 entry_id = int32_t(i);
  1041             throw DCException(getExceptionString(
"readDataSetMeta", 
"Entry not found by name", name));
  1043         Dimensions src_size(dataset.getSize() - srcOffset);
  1044         dataset.read(dstBuffer, dstOffset, src_size, srcOffset, sizeRead, srcDims, NULL);
  1047         log_msg(3, 
"Entry '%s' (%d) is of type: %s",
  1048                 entries[entry_id].name.c_str(),
  1050                 entries[entry_id].colType->toString().c_str());
  1052         return entries[entry_id].colType;
  1055     void ParallelDataCollector::writeDataSet(H5Handle group,
  1066         DCParallelDataSet dataset(name);
  1069         dataset.create(datatype, group, globalSize, ndims,
  1070                 this->options.enableCompression, 
false);
  1071         dataset.write(srcSelect, globalOffset, data);
  1075     void ParallelDataCollector::gatherMPIWrites(
int ndims, 
const Dimensions localSize,
  1079         uint64_t write_sizes[options.mpiSize * DSP_DIM_MAX];
  1080         uint64_t local_write_size[DSP_DIM_MAX] = {localSize[0], localSize[1], localSize[2]};
  1082         globalSize.
set(1, 1, 1);
  1083         globalOffset.
set(0, 0, 0);
  1085         if (MPI_Allgather(local_write_size, DSP_DIM_MAX, MPI_UNSIGNED_LONG_LONG,
  1086                 write_sizes, DSP_DIM_MAX, MPI_UNSIGNED_LONG_LONG, options.mpiComm) != MPI_SUCCESS)
  1087             throw DCException(getExceptionString(
"gatherMPIWrites",
  1088                 "MPI_Allgather failed", NULL));
  1090         Dimensions tmp_mpi_topology(options.mpiTopology);
  1094             tmp_mpi_topology.
set(options.mpiTopology.getScalarSize(), 1, 1);
  1095             tmp_mpi_pos.
set(options.mpiRank, 0, 0);
  1098         if ((ndims == 2) && (tmp_mpi_topology[2] > 1))
  1100             throw DCException(getExceptionString(
"gatherMPIWrites",
  1101                     "cannot auto-detect global size/offset for 2D data when writing with 3D topology", NULL));
  1104         for (
int i = 0; i < ndims; ++i)
  1108             for (
size_t dim = 0; dim < tmp_mpi_topology[i]; ++dim)
  1116                         index = dim * tmp_mpi_topology[0];
  1119                         index = dim * tmp_mpi_topology[0] * tmp_mpi_topology[1];
  1122                 globalSize[i] += write_sizes[index * DSP_DIM_MAX + i];
  1123                 if (dim < tmp_mpi_pos[i])
  1124                     globalOffset[i] += write_sizes[index * DSP_DIM_MAX + i];
  1129     size_t ParallelDataCollector::getNDims(H5Handle h5File,
  1130             int32_t 
id, 
const char* name)
  1132         if (h5File < 0 || name == NULL)
  1133             throw DCException(getExceptionString(
"getNDims", 
"invalid parameters"));
  1135         std::string group_path, dset_name;
  1136         DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, 
id, group_path, dset_name);
  1138         DCParallelGroup group;
  1139         group.open(h5File, group_path);
  1143         DCParallelDataSet dataset(dset_name.c_str());
  1144         dataset.open(group.getHandle());
  1146         ndims = dataset.getNDims();
  1153     void ParallelDataCollector::reserveInternal(int32_t 
id,
  1160         log_msg(2, 
"reserveInternal");
  1163         std::string group_path, dset_name;
  1164         DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, 
id, group_path, dset_name);
  1166         DCParallelGroup group;
  1167         group.openCreate(handles.get(
id), group_path);
  1169         DCParallelDataSet dataset(dset_name.c_str());
  1171         dataset.create(type, group.getHandle(), globalSize, ndims,
  1172                 this->options.enableCompression, 
true);
  1180         throw DCException(getExceptionString(
"readGlobalAttribute",
  1181                 "feature currently not supported by Parallel HDF5"));
  1187         throw DCException(getExceptionString(
"readGlobalAttribute",
  1188                 "feature currently not supported by Parallel HDF5"));
  1195         throw DCException(getExceptionString(
"readGlobalAttribute",
  1196                 "feature currently not supported by Parallel HDF5"));
  1200             size_t , 
const char* , 
const void* )
  1203         throw DCException(getExceptionString(
"readGlobalAttribute",
  1204                 "feature currently not supported by Parallel HDF5"));
  1208             size_t , 
size_t , 
size_t , 
const char* ,
  1211         throw DCException(getExceptionString(
"readGlobalAttribute",
  1212                 "feature currently not supported by Parallel HDF5"));
  1224         throw DCException(getExceptionString(
"readGlobalAttribute",
  1225                 "feature currently not supported by Parallel HDF5"));
 EXTERN void setLogMpiRank(int rank)
 
void createReference(int32_t srcID, const char *srcName, int32_t dstID, const char *dstName)
 
void readAttribute(int32_t id, const char *dataName, const char *attrName, void *buf, Dimensions *mpiPosition=NULL)
 
void open(const char *filename, FileCreationAttr &attr)
 
CollectionType * readMeta(int32_t id, const char *name, const Dimensions dstBuffer, const Dimensions dstOffset, Dimensions &sizeRead)
 
void append(int32_t id, const Dimensions size, uint32_t rank, const Dimensions globalOffset, const char *name, const void *buf)
 
void set(hsize_t x, hsize_t y, hsize_t z)
 
AttributeInfo readGlobalAttributeInfo(int32_t id, const char *name, Dimensions *mpiPosition=NULL)
 
void readGlobalAttribute(int32_t id, const char *name, void *buf)
 
size_t getScalarSize() const
 
void writeGlobalAttribute(int32_t id, const CollectionType &type, const char *name, const void *buf)
 
AttributeInfo readAttributeInfo(int32_t id, const char *dataName, const char *attrName, Dimensions *mpiPosition=NULL)
 
void getEntriesForID(int32_t id, DCEntry *entries, size_t *count)
 
void getEntryIDs(int32_t *ids, size_t *count)
 
void write(int32_t id, const CollectionType &type, uint32_t rank, const Selection select, const char *name, const void *buf)
 
void read(int32_t id, const char *name, Dimensions &sizeRead, void *buf)
 
EXTERN void parseEnvVars(void)
 
const H5DataType & getDataType() const
 
EXTERN void log_msg(int level, const char *fmt,...)
 
ParallelDataCollector(MPI_Comm comm, MPI_Info info, const Dimensions topology, uint32_t maxFileHandles)
 
void reserve(int32_t id, const Dimensions globalSize, uint32_t rank, const CollectionType &type, const char *name)
 
void getMPISize(Dimensions &mpiSize)
 
void writeAttribute(int32_t id, const CollectionType &type, const char *dataName, const char *attrName, const void *buf)
 
virtual ~ParallelDataCollector()