30 #include "splash/version.hpp" 31 #include "splash/ParallelDataCollector.hpp" 32 #include "splash/AttributeInfo.hpp" 33 #include "splash/basetypes/basetypes.hpp" 34 #include "splash/core/DCParallelDataSet.hpp" 35 #include "splash/core/DCAttribute.hpp" 36 #include "splash/core/DCParallelGroup.hpp" 37 #include "splash/core/logging.hpp" 38 #include "splash/core/H5IdWrapper.hpp" 47 void ParallelDataCollector::setFileAccessParams(hid_t& fileAccProperties)
49 fileAccProperties = H5Pcreate(H5P_FILE_ACCESS);
50 H5Pset_fapl_mpio(fileAccProperties, options.mpiComm, options.mpiInfo);
52 int metaCacheElements = 0;
53 size_t rawCacheElements = 0;
54 size_t rawCacheSize = 0;
65 H5Pget_cache(fileAccProperties, &metaCacheElements, &rawCacheElements, &rawCacheSize, &policy);
66 rawCacheSize = 256 * 1024 * 1024;
67 H5Pset_cache(fileAccProperties, metaCacheElements, rawCacheElements, rawCacheSize, policy);
69 log_msg(3,
"Raw Data Cache (File) = %llu KiB", (
long long unsigned) (rawCacheSize / 1024));
72 std::string ParallelDataCollector::getExceptionString(std::string func, std::string msg,
75 std::stringstream full_msg;
76 full_msg <<
"Exception for ParallelDataCollector::" << func <<
80 full_msg <<
" (" << info <<
")";
82 return full_msg.str();
85 void ParallelDataCollector::indexToPos(
int index, Dimensions mpiSize, Dimensions &mpiPos)
87 mpiPos[2] = index / (mpiSize[0] * mpiSize[1]);
88 mpiPos[1] = (index % (mpiSize[0] * mpiSize[1])) / mpiSize[0];
89 mpiPos[0] = index % mpiSize[0];
92 void ParallelDataCollector::listFilesInDir(
const std::string baseFilename, std::set<int32_t> &ids)
95 log_msg(2,
"listing files for %s", baseFilename.c_str());
101 std::string dir_path, name;
102 std::string::size_type pos = baseFilename.find_last_of(
'/');
103 if (pos == std::string::npos)
105 dir_path.assign(
".");
106 name.assign(baseFilename);
109 dir_path.assign(baseFilename.c_str(), baseFilename.c_str() + pos);
110 name.assign(baseFilename.c_str() + pos + 1);
117 dirp = opendir(dir_path.c_str());
120 throw DCException(getExceptionString(
"listFilesInDir",
121 "Failed to open directory", dir_path.c_str()));
124 while ((dp = readdir(dirp)) != NULL)
127 if (strstr(dp->d_name, name.c_str()) == dp->d_name)
130 fname.assign(dp->d_name);
133 const size_t fileNameLength = fname.size() - 3;
134 if (fname.rfind(
".h5") != fileNameLength)
139 std::string idStr = fname.substr(fname.rfind(
"_") + 1,
140 fileNameLength - name.size());
142 int32_t
id = strtol(idStr.c_str(), &endPtr, 10);
143 if (endPtr && *endPtr == 0L) {
145 log_msg(3,
"found file %s with ID %d", fname.c_str(), id);
149 (void) closedir(dirp);
157 const Dimensions topology, uint32_t maxFileHandles) :
158 handles(maxFileHandles, HandleMgr::FNS_ITERATIONS),
159 fileStatus(FST_CLOSED)
163 if (MPI_Comm_dup(comm, &(options.mpiComm)) != MPI_SUCCESS)
164 throw DCException(getExceptionString(
"ParallelDataCollector",
165 "failed to duplicate MPI communicator"));
167 MPI_Comm_rank(options.mpiComm, &(options.mpiRank));
168 options.enableCompression =
false;
169 options.mpiInfo = info;
171 options.mpiTopology.set(topology);
177 throw DCException(getExceptionString(
"ParallelDataCollector",
178 "failed to initialize/open HDF5 library"));
180 #ifndef SPLASH_VERBOSE_HDF5 182 if (H5Eset_auto2(H5E_DEFAULT, NULL, NULL) < 0)
183 throw DCException(getExceptionString(
"ParallelDataCollector",
184 "failed to disable error printing"));
188 setFileAccessParams(fileAccProperties);
190 handles.registerFileCreate(fileCreateCallback, &options);
191 handles.registerFileOpen(fileOpenCallback, &options);
193 indexToPos(options.mpiRank, options.mpiTopology, options.mpiPos);
199 H5Pclose(fileAccProperties);
205 if (options.mpiComm != MPI_COMM_NULL)
207 log_msg(1,
"finalizing data collector");
208 MPI_Comm_free(&options.mpiComm);
209 options.mpiComm = MPI_COMM_NULL;
216 log_msg(1,
"opening parallel data collector");
218 if (filename == NULL)
219 throw DCException(getExceptionString(
"open",
"filename must not be null"));
221 if (fileStatus != FST_CLOSED)
222 throw DCException(getExceptionString(
"open",
"this access is not permitted"));
224 this->baseFilename.assign(filename);
226 switch (attr.fileAccType)
229 case FAT_READ_MERGED:
230 openRead(filename, attr);
233 openWrite(filename, attr);
236 openCreate(filename, attr);
243 if (fileStatus == FST_CLOSED)
246 log_msg(1,
"closing parallel data collector");
253 fileStatus = FST_CLOSED;
258 std::set<int32_t> ids;
259 listFilesInDir(this->baseFilename, ids);
262 options.maxID = *(ids.rbegin());
264 return options.maxID;
269 mpiSize.
set(options.mpiTopology);
275 std::set<int32_t> file_ids;
276 listFilesInDir(this->baseFilename, file_ids);
279 *count = file_ids.size();
284 for (std::set<int32_t>::const_iterator iter = file_ids.begin();
285 iter != file_ids.end(); ++iter)
297 std::stringstream group_id_name;
298 group_id_name << SDC_GROUP_DATA <<
"/" << id;
301 DCParallelGroup group;
302 group.open(handles.get(
id), group_id_name.str());
304 DCParallelGroup::VisitObjCBType param;
306 param.entries = entries;
308 DCParallelGroup::getEntriesInternal(group.getHandle(), group_id_name.str(),
"", ¶m);
311 *count = param.count;
321 throw DCException(getExceptionString(
"readGlobalAttributeInfo",
"a parameter was null"));
323 if (fileStatus == FST_CLOSED || fileStatus == FST_CREATING)
324 throw DCException(getExceptionString(
"readGlobalAttributeInfo",
"this access is not permitted"));
326 DCParallelGroup group_custom;
327 group_custom.open(handles.get(
id), PDC_GROUP_CUSTOM);
331 return DCAttribute::readAttributeInfo(name, group_custom.getHandle());
334 log_msg(0,
"Exception: %s", e.what());
344 if (name == NULL || data == NULL)
345 throw DCException(getExceptionString(
"readGlobalAttribute",
"a parameter was null"));
347 if (fileStatus == FST_CLOSED || fileStatus == FST_CREATING)
348 throw DCException(getExceptionString(
"readGlobalAttribute",
"this access is not permitted"));
350 DCParallelGroup group_custom;
351 group_custom.open(handles.get(
id), PDC_GROUP_CUSTOM);
355 DCAttribute::readAttribute(name, group_custom.getHandle(), data);
358 log_msg(0,
"Exception: %s", e.what());
359 throw DCException(getExceptionString(
"readGlobalAttribute",
"failed to open attribute", name));
381 if (name == NULL || data == NULL)
382 throw DCException(getExceptionString(
"writeGlobalAttribute",
"a parameter was null"));
384 if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
385 throw DCException(getExceptionString(
"writeGlobalAttribute",
"this access is not permitted"));
387 if (ndims < 1u || ndims > DSP_DIM_MAX)
388 throw DCException(getExceptionString(
"writeGlobalAttribute",
"maximum dimension `ndims` is invalid"));
390 DCParallelGroup group_custom;
391 group_custom.open(handles.get(
id), PDC_GROUP_CUSTOM);
395 DCAttribute::writeAttribute(name, type.
getDataType(), group_custom.getHandle(), ndims, dims, data);
398 log_msg(0,
"Exception: %s", e.what());
399 throw DCException(getExceptionString(
"writeGlobalAttribute",
"failed to write attribute", name));
403 hid_t ParallelDataCollector::openGroup(DCGroup& group, int32_t
id,
const char* dataName)
throw (
DCException)
407 if (dataName && strlen(dataName) == 0)
408 throw DCException(getExceptionString(
"readAttribute",
"empty dataset name"));
410 if (fileStatus == FST_CLOSED)
411 throw DCException(getExceptionString(
"readAttribute",
"this access is not permitted"));
413 std::string group_path, obj_name;
414 std::string dataNameInternal =
"";
416 dataNameInternal.assign(dataName);
417 DCDataSet::getFullDataPath(dataNameInternal, SDC_GROUP_DATA,
id, group_path, obj_name);
419 group.open(handles.get(
id), group_path);
424 hid_t obj_id = H5Oopen(group.getHandle(), obj_name.c_str(), H5P_DEFAULT);
427 throw DCException(getExceptionString(
"readAttribute",
428 "dataset not found", obj_name.c_str()));
436 const char *dataName,
437 const char *attrName,
442 if (attrName == NULL)
443 throw DCException(getExceptionString(
"readAttributeInfo",
"a parameter was null"));
445 if (strlen(attrName) == 0)
446 throw DCException(getExceptionString(
"readAttributeInfo",
"empty attribute name"));
448 DCParallelGroup group;
449 H5ObjectId objId(openGroup(group,
id, dataName));
452 return DCAttribute::readAttributeInfo(attrName, group.getHandle());
454 return DCAttribute::readAttributeInfo(attrName, objId);
458 const char *dataName,
459 const char *attrName,
465 if (attrName == NULL || data == NULL)
466 throw DCException(getExceptionString(
"readAttribute",
"a parameter was null"));
468 if (strlen(attrName) == 0)
469 throw DCException(getExceptionString(
"readAttribute",
"empty attribute name"));
471 DCParallelGroup group;
472 H5ObjectId objId(openGroup(group,
id, dataName));
475 DCAttribute::readAttribute(attrName, group.getHandle(), data);
477 DCAttribute::readAttribute(attrName, objId, data);
482 const char *dataName,
483 const char *attrName,
494 const char *dataName,
495 const char *attrName,
501 if (attrName == NULL || data == NULL)
502 throw DCException(getExceptionString(
"writeAttribute",
"a parameter was null"));
505 if (dataName && strlen(dataName) == 0)
506 throw DCException(getExceptionString(
"writeAttribute",
"empty dataset name"));
508 if (strlen(attrName) == 0)
509 throw DCException(getExceptionString(
"writeAttribute",
"empty attribute name"));
511 if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
512 throw DCException(getExceptionString(
"writeAttribute",
"this access is not permitted"));
514 if (ndims < 1u || ndims > DSP_DIM_MAX)
515 throw DCException(getExceptionString(
"writeAttribute",
"maximum dimension `ndims` is invalid"));
520 std::string group_path, obj_name;
521 std::string dataNameInternal =
"";
523 dataNameInternal.assign(dataName);
524 DCDataSet::getFullDataPath(dataNameInternal, SDC_GROUP_DATA,
id, group_path, obj_name);
526 DCParallelGroup group;
535 std::string pathAndName(group_path +
"/" + obj_name);
536 if(!DCParallelGroup::exists(handles.get(
id), pathAndName))
537 group.create(handles.get(
id), pathAndName);
540 group.open(handles.get(
id), group_path);
541 hid_t obj_id = H5Oopen(group.getHandle(), obj_name.c_str(), H5P_DEFAULT);
544 throw DCException(getExceptionString(
"writeAttribute",
545 "object not found", obj_name.c_str()));
550 DCAttribute::writeAttribute(attrName, type.
getDataType(), obj_id, ndims, dims, data);
560 group.openCreate(handles.get(
id), group_path);
561 DCAttribute::writeAttribute(attrName, type.
getDataType(), group.getHandle(), ndims, dims, data);
582 if (fileStatus != FST_READING && fileStatus != FST_WRITING)
583 throw DCException(getExceptionString(
"read",
"this access is not permitted"));
586 readCompleteDataSet(handles.get(
id), id, name, dstBuffer, dstOffset,
597 this->
read(
id, localSize, globalOffset, name, localSize,
610 if (fileStatus != FST_READING && fileStatus != FST_WRITING)
611 throw DCException(getExceptionString(
"read",
"this access is not permitted"));
614 readDataSet(handles.get(
id), id, name, dstBuffer, dstOffset,
615 localSize, globalOffset, sizeRead, ndims, buf);
624 if (fileStatus != FST_READING && fileStatus != FST_WRITING)
625 throw DCException(getExceptionString(
"readMeta",
"this access is not permitted"));
628 return readDataSetMeta(handles.get(
id), id, name, dstBuffer, dstOffset,
633 const Selection select,
const char* name,
const void* buf)
637 gatherMPIWrites(ndims, select.count, globalSize, globalOffset);
639 write(
id, globalSize, globalOffset,
640 type, ndims, select, name, buf);
646 const Selection select,
const char* name,
const void* buf)
649 throw DCException(getExceptionString(
"write",
"parameter name is NULL"));
651 if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
652 throw DCException(getExceptionString(
"write",
"this access is not permitted"));
654 if (ndims < 1 || ndims > DSP_DIM_MAX)
655 throw DCException(getExceptionString(
"write",
"maximum dimension is invalid"));
658 std::string group_path, dset_name;
659 DCDataSet::getFullDataPath(name, SDC_GROUP_DATA,
id, group_path, dset_name);
661 DCParallelGroup group;
662 group.openCreate(handles.get(
id), group_path);
665 writeDataSet(group.getHandle(), globalSize, globalOffset, type, ndims,
666 select, dset_name.c_str(), buf);
676 throw DCException(getExceptionString(
"reserve",
"a parameter was NULL"));
678 if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
679 throw DCException(getExceptionString(
"write",
"this access is not permitted"));
681 if (ndims < 1 || ndims > DSP_DIM_MAX)
682 throw DCException(getExceptionString(
"write",
"maximum dimension is invalid"));
684 reserveInternal(
id, globalSize, ndims, type, name);
696 throw DCException(getExceptionString(
"reserve",
"a parameter was NULL"));
698 if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
699 throw DCException(getExceptionString(
"write",
"this access is not permitted"));
701 if (ndims < 1 || ndims > DSP_DIM_MAX)
702 throw DCException(getExceptionString(
"write",
"maximum dimension is invalid"));
705 gatherMPIWrites(ndims, size, global_size, global_offset);
707 reserveInternal(
id, global_size, ndims, type, name);
710 globalSize->set(global_size);
713 globalOffset->set(global_offset);
724 throw DCException(getExceptionString(
"append",
"parameter name is NULL"));
726 if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
727 throw DCException(getExceptionString(
"append",
"this access is not permitted"));
729 if (ndims < 1 || ndims > DSP_DIM_MAX)
730 throw DCException(getExceptionString(
"append",
"maximum dimension is invalid"));
733 std::string group_path, dset_name;
734 DCDataSet::getFullDataPath(name, SDC_GROUP_DATA,
id, group_path, dset_name);
736 DCParallelGroup group;
737 group.open(handles.get(
id), group_path);
740 DCParallelDataSet dataset(dset_name.c_str());
741 dataset.setWriteIndependent();
743 if (!dataset.open(group.getHandle()))
746 "Cannot open dataset (missing reserve?)", dset_name.c_str()));
748 dataset.write(
Selection(size), globalOffset, buf);
756 log_msg(1,
"removing group %d",
id);
758 if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
759 throw DCException(getExceptionString(
"remove",
"this access is not permitted"));
761 std::stringstream group_id_name;
762 group_id_name << SDC_GROUP_DATA <<
"/" << id;
764 DCParallelGroup::remove(handles.get(
id), group_id_name.str());
773 log_msg(1,
"removing dataset %s from group %d", name,
id);
775 if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
776 throw DCException(getExceptionString(
"remove",
"this access is not permitted"));
779 throw DCException(getExceptionString(
"remove",
"a parameter was NULL"));
781 std::string group_path, dset_name;
782 DCDataSet::getFullDataPath(name, SDC_GROUP_DATA,
id, group_path, dset_name);
784 DCParallelGroup group;
785 group.open(handles.get(
id), group_path);
787 if (H5Ldelete(group.getHandle(), dset_name.c_str(), H5P_LINK_ACCESS_DEFAULT) < 0)
789 throw DCException(getExceptionString(
"remove",
"failed to remove dataset", name));
799 if (srcName == NULL || dstName == NULL)
800 throw DCException(getExceptionString(
"createReference",
"a parameter was NULL"));
802 if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
803 throw DCException(getExceptionString(
"createReference",
"this access is not permitted"));
806 throw DCException(getExceptionString(
"createReference",
807 "source and destination ID must be identical", NULL));
809 if (srcName == dstName)
810 throw DCException(getExceptionString(
"createReference",
811 "a reference must not be identical to the referenced data", srcName));
814 std::string src_group_path, src_dset_name, dst_dset_name;
815 DCDataSet::getFullDataPath(srcName, SDC_GROUP_DATA, srcID, src_group_path, src_dset_name);
816 DCDataSet::getFullDataPath(dstName, SDC_GROUP_DATA, dstID, src_group_path, dst_dset_name);
818 DCParallelGroup src_group;
819 src_group.open(handles.get(srcID), src_group_path);
822 DCParallelDataSet src_dataset(src_dset_name.c_str());
823 src_dataset.open(src_group.getHandle());
825 DCParallelDataSet dst_dataset(dst_dset_name.c_str());
828 dst_dataset.createReference(src_group.getHandle(), src_group.getHandle(), src_dataset);
838 void ParallelDataCollector::fileCreateCallback(H5Handle handle, uint32_t index,
void *userData)
841 Options *options = (Options*) userData;
844 DCParallelGroup group;
845 group.create(handle, PDC_GROUP_CUSTOM);
849 group.create(handle, SDC_GROUP_DATA);
852 writeHeader(handle, index, options->enableCompression, options->mpiTopology);
855 void ParallelDataCollector::fileOpenCallback(H5Handle , uint32_t index,
void *userData)
858 Options *options = (Options*) userData;
860 options->maxID = std::max(options->maxID, (int32_t) index);
863 void ParallelDataCollector::writeHeader(hid_t fHandle, uint32_t
id,
864 bool enableCompression,
Dimensions mpiTopology)
868 DCParallelGroup group;
869 group.create(fHandle, SDC_GROUP_HEADER);
872 bool compression = enableCompression;
873 std::stringstream splashVersion;
874 splashVersion << SPLASH_VERSION_MAJOR <<
"." 875 << SPLASH_VERSION_MINOR <<
"." 876 << SPLASH_VERSION_PATCH;
877 std::stringstream splashFormat;
878 splashFormat << SPLASH_FILE_FORMAT_MAJOR <<
"." 879 << SPLASH_FILE_FORMAT_MINOR;
881 ColTypeInt32 ctInt32;
888 DCAttribute::writeAttribute(SDC_ATTR_MAX_ID, ctInt32.getDataType(),
889 group.getHandle(), &index);
891 DCAttribute::writeAttribute(SDC_ATTR_COMPRESSION, ctBool.
getDataType(),
892 group.getHandle(), &compression);
894 DCAttribute::writeAttribute(SDC_ATTR_MPI_SIZE, dim_t.
getDataType(),
897 DCAttribute::writeAttribute(SDC_ATTR_VERSION, ctStringVersion.getDataType(),
898 group.getHandle(), splashVersion.str().c_str());
900 DCAttribute::writeAttribute(SDC_ATTR_FORMAT, ctStringFormat.getDataType(),
901 group.getHandle(), splashFormat.str().c_str());
904 void ParallelDataCollector::openCreate(
const char *filename,
908 this->fileStatus = FST_CREATING;
918 handles.open(
Dimensions(1, 1, 1), filename, fileAccProperties, H5F_ACC_TRUNC);
921 void ParallelDataCollector::openRead(
const char* filename,
FileCreationAttr& )
924 this->fileStatus = FST_READING;
928 handles.open(
Dimensions(1, 1, 1), filename, fileAccProperties, H5F_ACC_RDONLY);
931 void ParallelDataCollector::openWrite(
const char* filename,
FileCreationAttr& )
934 this->fileStatus = FST_WRITING;
941 handles.open(
Dimensions(1, 1, 1), filename, fileAccProperties, H5F_ACC_RDWR);
944 void ParallelDataCollector::readCompleteDataSet(H5Handle h5File,
955 log_msg(2,
"readCompleteDataSet");
957 if (h5File < 0 || name == NULL)
958 throw DCException(getExceptionString(
"readCompleteDataSet",
"invalid parameters"));
960 std::string group_path, dset_name;
961 DCDataSet::getFullDataPath(name, SDC_GROUP_DATA,
id, group_path, dset_name);
963 DCParallelGroup group;
964 group.open(h5File, group_path);
966 DCParallelDataSet dataset(dset_name.c_str());
967 dataset.open(group.getHandle());
968 const Dimensions src_size(dataset.getSize() - srcOffset);
970 dataset.read(dstBuffer, dstOffset, src_size, srcOffset, sizeRead, srcRank, dst);
974 void ParallelDataCollector::readDataSet(H5Handle h5File,
988 if (h5File < 0 || name == NULL)
989 throw DCException(getExceptionString(
"readDataSet",
"invalid parameters"));
991 std::string group_path, dset_name;
992 DCDataSet::getFullDataPath(name, SDC_GROUP_DATA,
id, group_path, dset_name);
994 DCParallelGroup group;
995 group.open(h5File, group_path);
997 DCParallelDataSet dataset(dset_name.c_str());
998 dataset.open(group.getHandle());
1000 dataset.read(dstBuffer, dstOffset, srcSize, srcOffset, sizeRead, srcRank, dst);
1004 CollectionType* ParallelDataCollector::readDataSetMeta(H5Handle h5File,
1014 log_msg(2,
"readDataSetMeta");
1016 std::string group_path, dset_name;
1017 DCDataSet::getFullDataPath(name, SDC_GROUP_DATA,
id, group_path, dset_name);
1020 group.open(h5File, group_path);
1022 DCDataSet dataset(dset_name.c_str());
1023 dataset.open(group.getHandle());
1027 std::vector<DataCollector::DCEntry> entries(entrySize);
1032 int32_t entry_id = -1;
1033 for(
size_t i = 0; i < entrySize; ++i)
1034 if(std::string(name) == entries[i].name)
1036 entry_id = int32_t(i);
1041 throw DCException(getExceptionString(
"readDataSetMeta",
"Entry not found by name", name));
1043 Dimensions src_size(dataset.getSize() - srcOffset);
1044 dataset.read(dstBuffer, dstOffset, src_size, srcOffset, sizeRead, srcDims, NULL);
1047 log_msg(3,
"Entry '%s' (%d) is of type: %s",
1048 entries[entry_id].name.c_str(),
1050 entries[entry_id].colType->toString().c_str());
1052 return entries[entry_id].colType;
1055 void ParallelDataCollector::writeDataSet(H5Handle group,
1066 DCParallelDataSet dataset(name);
1069 dataset.create(datatype, group, globalSize, ndims,
1070 this->options.enableCompression,
false);
1071 dataset.write(srcSelect, globalOffset, data);
1075 void ParallelDataCollector::gatherMPIWrites(
int ndims,
const Dimensions localSize,
1079 uint64_t write_sizes[options.mpiSize * DSP_DIM_MAX];
1080 uint64_t local_write_size[DSP_DIM_MAX] = {localSize[0], localSize[1], localSize[2]};
1082 globalSize.
set(1, 1, 1);
1083 globalOffset.
set(0, 0, 0);
1085 if (MPI_Allgather(local_write_size, DSP_DIM_MAX, MPI_UNSIGNED_LONG_LONG,
1086 write_sizes, DSP_DIM_MAX, MPI_UNSIGNED_LONG_LONG, options.mpiComm) != MPI_SUCCESS)
1087 throw DCException(getExceptionString(
"gatherMPIWrites",
1088 "MPI_Allgather failed", NULL));
1090 Dimensions tmp_mpi_topology(options.mpiTopology);
1094 tmp_mpi_topology.
set(options.mpiTopology.getScalarSize(), 1, 1);
1095 tmp_mpi_pos.
set(options.mpiRank, 0, 0);
1098 if ((ndims == 2) && (tmp_mpi_topology[2] > 1))
1100 throw DCException(getExceptionString(
"gatherMPIWrites",
1101 "cannot auto-detect global size/offset for 2D data when writing with 3D topology", NULL));
1104 for (
int i = 0; i < ndims; ++i)
1108 for (
size_t dim = 0; dim < tmp_mpi_topology[i]; ++dim)
1116 index = dim * tmp_mpi_topology[0];
1119 index = dim * tmp_mpi_topology[0] * tmp_mpi_topology[1];
1122 globalSize[i] += write_sizes[index * DSP_DIM_MAX + i];
1123 if (dim < tmp_mpi_pos[i])
1124 globalOffset[i] += write_sizes[index * DSP_DIM_MAX + i];
1129 size_t ParallelDataCollector::getNDims(H5Handle h5File,
1130 int32_t
id,
const char* name)
1132 if (h5File < 0 || name == NULL)
1133 throw DCException(getExceptionString(
"getNDims",
"invalid parameters"));
1135 std::string group_path, dset_name;
1136 DCDataSet::getFullDataPath(name, SDC_GROUP_DATA,
id, group_path, dset_name);
1138 DCParallelGroup group;
1139 group.open(h5File, group_path);
1143 DCParallelDataSet dataset(dset_name.c_str());
1144 dataset.open(group.getHandle());
1146 ndims = dataset.getNDims();
1153 void ParallelDataCollector::reserveInternal(int32_t
id,
1160 log_msg(2,
"reserveInternal");
1163 std::string group_path, dset_name;
1164 DCDataSet::getFullDataPath(name, SDC_GROUP_DATA,
id, group_path, dset_name);
1166 DCParallelGroup group;
1167 group.openCreate(handles.get(
id), group_path);
1169 DCParallelDataSet dataset(dset_name.c_str());
1171 dataset.create(type, group.getHandle(), globalSize, ndims,
1172 this->options.enableCompression,
true);
1180 throw DCException(getExceptionString(
"readGlobalAttribute",
1181 "feature currently not supported by Parallel HDF5"));
1187 throw DCException(getExceptionString(
"readGlobalAttribute",
1188 "feature currently not supported by Parallel HDF5"));
1195 throw DCException(getExceptionString(
"readGlobalAttribute",
1196 "feature currently not supported by Parallel HDF5"));
1200 size_t ,
const char* ,
const void* )
1203 throw DCException(getExceptionString(
"readGlobalAttribute",
1204 "feature currently not supported by Parallel HDF5"));
1208 size_t ,
size_t ,
size_t ,
const char* ,
1211 throw DCException(getExceptionString(
"readGlobalAttribute",
1212 "feature currently not supported by Parallel HDF5"));
1224 throw DCException(getExceptionString(
"readGlobalAttribute",
1225 "feature currently not supported by Parallel HDF5"));
EXTERN void setLogMpiRank(int rank)
void createReference(int32_t srcID, const char *srcName, int32_t dstID, const char *dstName)
void readAttribute(int32_t id, const char *dataName, const char *attrName, void *buf, Dimensions *mpiPosition=NULL)
void open(const char *filename, FileCreationAttr &attr)
CollectionType * readMeta(int32_t id, const char *name, const Dimensions dstBuffer, const Dimensions dstOffset, Dimensions &sizeRead)
void append(int32_t id, const Dimensions size, uint32_t rank, const Dimensions globalOffset, const char *name, const void *buf)
void set(hsize_t x, hsize_t y, hsize_t z)
AttributeInfo readGlobalAttributeInfo(int32_t id, const char *name, Dimensions *mpiPosition=NULL)
void readGlobalAttribute(int32_t id, const char *name, void *buf)
size_t getScalarSize() const
void writeGlobalAttribute(int32_t id, const CollectionType &type, const char *name, const void *buf)
AttributeInfo readAttributeInfo(int32_t id, const char *dataName, const char *attrName, Dimensions *mpiPosition=NULL)
void getEntriesForID(int32_t id, DCEntry *entries, size_t *count)
void getEntryIDs(int32_t *ids, size_t *count)
void write(int32_t id, const CollectionType &type, uint32_t rank, const Selection select, const char *name, const void *buf)
void read(int32_t id, const char *name, Dimensions &sizeRead, void *buf)
EXTERN void parseEnvVars(void)
const H5DataType & getDataType() const
EXTERN void log_msg(int level, const char *fmt,...)
ParallelDataCollector(MPI_Comm comm, MPI_Info info, const Dimensions topology, uint32_t maxFileHandles)
void reserve(int32_t id, const Dimensions globalSize, uint32_t rank, const CollectionType &type, const char *name)
void getMPISize(Dimensions &mpiSize)
void writeAttribute(int32_t id, const CollectionType &type, const char *dataName, const char *attrName, const void *buf)
virtual ~ParallelDataCollector()