libSplash
ParallelDataCollector.cpp
1 
23 #include <cassert>
24 #include <set>
25 #include <dirent.h>
26 #include <stdlib.h>
27 #include <cstring>
28 #include <sstream>
29 
30 #include "splash/version.hpp"
31 #include "splash/ParallelDataCollector.hpp"
32 #include "splash/AttributeInfo.hpp"
33 #include "splash/basetypes/basetypes.hpp"
34 #include "splash/core/DCParallelDataSet.hpp"
35 #include "splash/core/DCAttribute.hpp"
36 #include "splash/core/DCParallelGroup.hpp"
37 #include "splash/core/logging.hpp"
38 #include "splash/core/H5IdWrapper.hpp"
39 
40 namespace splash
41 {
42 
43  /*******************************************************************************
44  * PRIVATE FUNCTIONS
45  *******************************************************************************/
46 
47  void ParallelDataCollector::setFileAccessParams(hid_t& fileAccProperties)
48  {
49  fileAccProperties = H5Pcreate(H5P_FILE_ACCESS);
50  H5Pset_fapl_mpio(fileAccProperties, options.mpiComm, options.mpiInfo);
51 
52  int metaCacheElements = 0;
53  size_t rawCacheElements = 0;
54  size_t rawCacheSize = 0;
55  double policy = 0.0;
56 
57  // set new cache size
58  /*
59  * Note from http://www.hdfgroup.org/HDF5/doc/RM/RM_H5P.html#Property-SetCache:
60  * "Raw dataset chunk caching is not currently supported when using the MPI I/O
61  * and MPI POSIX file drivers in read/write mode [..]. When using one of these
62  * file drivers, all calls to H5Dread and H5Dwrite will access the disk directly,
63  * and H5Pset_cache will have no effect on performance."
64  */
65  H5Pget_cache(fileAccProperties, &metaCacheElements, &rawCacheElements, &rawCacheSize, &policy);
66  rawCacheSize = 256 * 1024 * 1024;
67  H5Pset_cache(fileAccProperties, metaCacheElements, rawCacheElements, rawCacheSize, policy);
68 
69  log_msg(3, "Raw Data Cache (File) = %llu KiB", (long long unsigned) (rawCacheSize / 1024));
70  }
71 
72  std::string ParallelDataCollector::getExceptionString(std::string func, std::string msg,
73  const char *info)
74  {
75  std::stringstream full_msg;
76  full_msg << "Exception for ParallelDataCollector::" << func <<
77  ": " << msg;
78 
79  if (info != NULL)
80  full_msg << " (" << info << ")";
81 
82  return full_msg.str();
83  }
84 
85  void ParallelDataCollector::indexToPos(int index, Dimensions mpiSize, Dimensions &mpiPos)
86  {
87  mpiPos[2] = index / (mpiSize[0] * mpiSize[1]);
88  mpiPos[1] = (index % (mpiSize[0] * mpiSize[1])) / mpiSize[0];
89  mpiPos[0] = index % mpiSize[0];
90  }
91 
92  void ParallelDataCollector::listFilesInDir(const std::string baseFilename, std::set<int32_t> &ids)
93  throw (DCException)
94  {
95  log_msg(2, "listing files for %s", baseFilename.c_str());
96 
97  /* Split baseFilename into path and name prefix.
98  * Always append '_' since PDC filenames are 'prefix_iteration.h5'.
99  * e.g. '/path/to/filename' -> dir_path='/path/to/' name='filename_'
100  */
101  std::string dir_path, name;
102  std::string::size_type pos = baseFilename.find_last_of('/');
103  if (pos == std::string::npos)
104  {
105  dir_path.assign(".");
106  name.assign(baseFilename);
107  } else
108  {
109  dir_path.assign(baseFilename.c_str(), baseFilename.c_str() + pos);
110  name.assign(baseFilename.c_str() + pos + 1);
111  }
112  name.append("_");
113 
114  dirent *dp = NULL;
115  DIR *dirp = NULL;
116 
117  dirp = opendir(dir_path.c_str());
118  if (!dirp)
119  {
120  throw DCException(getExceptionString("listFilesInDir",
121  "Failed to open directory", dir_path.c_str()));
122  }
123 
124  while ((dp = readdir(dirp)) != NULL)
125  {
126  // size matches and starts with name.c_str()
127  if (strstr(dp->d_name, name.c_str()) == dp->d_name)
128  {
129  std::string fname;
130  fname.assign(dp->d_name);
131  // end with correct file extension
132  // 3 is the suffix length including the dot
133  const size_t fileNameLength = fname.size() - 3;
134  if (fname.rfind(".h5") != fileNameLength)
135  continue;
136 
137  // extract id from filename (part between "/path/prefix_" and ".h5")
138  char* endPtr = NULL;
139  std::string idStr = fname.substr(fname.rfind("_") + 1,
140  fileNameLength - name.size());
141 
142  int32_t id = strtol(idStr.c_str(), &endPtr, 10);
143  if (endPtr && *endPtr == 0L) {
144  ids.insert(id);
145  log_msg(3, "found file %s with ID %d", fname.c_str(), id);
146  }
147  }
148  }
149  (void) closedir(dirp);
150  }
151 
152  /*******************************************************************************
153  * PUBLIC FUNCTIONS
154  *******************************************************************************/
155 
156  ParallelDataCollector::ParallelDataCollector(MPI_Comm comm, MPI_Info info,
157  const Dimensions topology, uint32_t maxFileHandles) :
158  handles(maxFileHandles, HandleMgr::FNS_ITERATIONS),
159  fileStatus(FST_CLOSED)
160  {
161  parseEnvVars();
162 
163  if (MPI_Comm_dup(comm, &(options.mpiComm)) != MPI_SUCCESS)
164  throw DCException(getExceptionString("ParallelDataCollector",
165  "failed to duplicate MPI communicator"));
166 
167  MPI_Comm_rank(options.mpiComm, &(options.mpiRank));
168  options.enableCompression = false;
169  options.mpiInfo = info;
170  options.mpiSize = topology.getScalarSize();
171  options.mpiTopology.set(topology);
172  options.maxID = -1;
173 
174  setLogMpiRank(options.mpiRank);
175 
176  if (H5open() < 0)
177  throw DCException(getExceptionString("ParallelDataCollector",
178  "failed to initialize/open HDF5 library"));
179 
180 #ifndef SPLASH_VERBOSE_HDF5
181  // surpress automatic output of HDF5 exception messages
182  if (H5Eset_auto2(H5E_DEFAULT, NULL, NULL) < 0)
183  throw DCException(getExceptionString("ParallelDataCollector",
184  "failed to disable error printing"));
185 #endif
186 
187  // set some default file access parameters
188  setFileAccessParams(fileAccProperties);
189 
190  handles.registerFileCreate(fileCreateCallback, &options);
191  handles.registerFileOpen(fileOpenCallback, &options);
192 
193  indexToPos(options.mpiRank, options.mpiTopology, options.mpiPos);
194  }
195 
197  {
198  close();
199  H5Pclose(fileAccProperties);
200  finalize();
201  }
202 
204  {
205  if (options.mpiComm != MPI_COMM_NULL)
206  {
207  log_msg(1, "finalizing data collector");
208  MPI_Comm_free(&options.mpiComm);
209  options.mpiComm = MPI_COMM_NULL;
210  }
211  }
212 
213  void ParallelDataCollector::open(const char* filename, FileCreationAttr &attr)
214  throw (DCException)
215  {
216  log_msg(1, "opening parallel data collector");
217 
218  if (filename == NULL)
219  throw DCException(getExceptionString("open", "filename must not be null"));
220 
221  if (fileStatus != FST_CLOSED)
222  throw DCException(getExceptionString("open", "this access is not permitted"));
223 
224  this->baseFilename.assign(filename);
225 
226  switch (attr.fileAccType)
227  {
228  case FAT_READ:
229  case FAT_READ_MERGED:
230  openRead(filename, attr);
231  break;
232  case FAT_WRITE:
233  openWrite(filename, attr);
234  break;
235  case FAT_CREATE:
236  openCreate(filename, attr);
237  break;
238  }
239  }
240 
242  {
243  if (fileStatus == FST_CLOSED)
244  return;
245 
246  log_msg(1, "closing parallel data collector");
247 
248  // close opened hdf5 file handles
249  handles.close();
250 
251  options.maxID = -1;
252 
253  fileStatus = FST_CLOSED;
254  }
255 
257  {
258  std::set<int32_t> ids;
259  listFilesInDir(this->baseFilename, ids);
260 
261  if (ids.size() > 0)
262  options.maxID = *(ids.rbegin());
263 
264  return options.maxID;
265  }
266 
268  {
269  mpiSize.set(options.mpiTopology);
270  }
271 
272  void ParallelDataCollector::getEntryIDs(int32_t *ids, size_t *count)
273  throw (DCException)
274  {
275  std::set<int32_t> file_ids;
276  listFilesInDir(this->baseFilename, file_ids);
277 
278  if (count != NULL)
279  *count = file_ids.size();
280 
281  if (ids != NULL)
282  {
283  size_t ctr = 0;
284  for (std::set<int32_t>::const_iterator iter = file_ids.begin();
285  iter != file_ids.end(); ++iter)
286  {
287  ids[ctr] = *iter;
288  ++ctr;
289  }
290  }
291  }
292 
294  size_t *count)
295  throw (DCException)
296  {
297  std::stringstream group_id_name;
298  group_id_name << SDC_GROUP_DATA << "/" << id;
299 
300  // open data group for id
301  DCParallelGroup group;
302  group.open(handles.get(id), group_id_name.str());
303 
304  DCParallelGroup::VisitObjCBType param;
305  param.count = 0;
306  param.entries = entries;
307 
308  DCParallelGroup::getEntriesInternal(group.getHandle(), group_id_name.str(), "", &param);
309 
310  if (count)
311  *count = param.count;
312  }
313 
315  int32_t id,
316  const char* name,
317  Dimensions* /*mpiPosition*/)
318  throw (DCException)
319  {
320  if (name == NULL)
321  throw DCException(getExceptionString("readGlobalAttributeInfo", "a parameter was null"));
322 
323  if (fileStatus == FST_CLOSED || fileStatus == FST_CREATING)
324  throw DCException(getExceptionString("readGlobalAttributeInfo", "this access is not permitted"));
325 
326  DCParallelGroup group_custom;
327  group_custom.open(handles.get(id), PDC_GROUP_CUSTOM);
328 
329  try
330  {
331  return DCAttribute::readAttributeInfo(name, group_custom.getHandle());
332  } catch (const DCException& e)
333  {
334  log_msg(0, "Exception: %s", e.what());
335  throw;
336  }
337  }
338 
340  const char* name,
341  void *data)
342  throw (DCException)
343  {
344  if (name == NULL || data == NULL)
345  throw DCException(getExceptionString("readGlobalAttribute", "a parameter was null"));
346 
347  if (fileStatus == FST_CLOSED || fileStatus == FST_CREATING)
348  throw DCException(getExceptionString("readGlobalAttribute", "this access is not permitted"));
349 
350  DCParallelGroup group_custom;
351  group_custom.open(handles.get(id), PDC_GROUP_CUSTOM);
352 
353  try
354  {
355  DCAttribute::readAttribute(name, group_custom.getHandle(), data);
356  } catch (const DCException& e)
357  {
358  log_msg(0, "Exception: %s", e.what());
359  throw DCException(getExceptionString("readGlobalAttribute", "failed to open attribute", name));
360  }
361  }
362 
364  const CollectionType& type,
365  const char *name,
366  const void* data)
367  throw (DCException)
368  {
369  const Dimensions dims(1, 1, 1);
370  writeGlobalAttribute(id, type, name, 1u, dims, data);
371  }
372 
374  const CollectionType& type,
375  const char *name,
376  uint32_t ndims,
377  const Dimensions dims,
378  const void* data)
379  throw (DCException)
380  {
381  if (name == NULL || data == NULL)
382  throw DCException(getExceptionString("writeGlobalAttribute", "a parameter was null"));
383 
384  if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
385  throw DCException(getExceptionString("writeGlobalAttribute", "this access is not permitted"));
386 
387  if (ndims < 1u || ndims > DSP_DIM_MAX)
388  throw DCException(getExceptionString("writeGlobalAttribute", "maximum dimension `ndims` is invalid"));
389 
390  DCParallelGroup group_custom;
391  group_custom.open(handles.get(id), PDC_GROUP_CUSTOM);
392 
393  try
394  {
395  DCAttribute::writeAttribute(name, type.getDataType(), group_custom.getHandle(), ndims, dims, data);
396  } catch (const DCException& e)
397  {
398  log_msg(0, "Exception: %s", e.what());
399  throw DCException(getExceptionString("writeGlobalAttribute", "failed to write attribute", name));
400  }
401  }
402 
403  hid_t ParallelDataCollector::openGroup(DCGroup& group, int32_t id, const char* dataName) throw (DCException)
404  {
405  // mpiPosition is ignored
406  // dataName may be NULL, attribute is read from iteration group in that case
407  if (dataName && strlen(dataName) == 0)
408  throw DCException(getExceptionString("readAttribute", "empty dataset name"));
409 
410  if (fileStatus == FST_CLOSED)
411  throw DCException(getExceptionString("readAttribute", "this access is not permitted"));
412 
413  std::string group_path, obj_name;
414  std::string dataNameInternal = "";
415  if (dataName)
416  dataNameInternal.assign(dataName);
417  DCDataSet::getFullDataPath(dataNameInternal, SDC_GROUP_DATA, id, group_path, obj_name);
418 
419  group.open(handles.get(id), group_path);
420 
421  if (dataName)
422  {
423  // read attribute from the dataset or group
424  hid_t obj_id = H5Oopen(group.getHandle(), obj_name.c_str(), H5P_DEFAULT);
425  if (obj_id < 0)
426  {
427  throw DCException(getExceptionString("readAttribute",
428  "dataset not found", obj_name.c_str()));
429  }
430  return obj_id;
431  } else
432  return -1;
433  }
434 
436  const char *dataName,
437  const char *attrName,
438  Dimensions* /*mpiPosition*/)
439  throw (DCException)
440  {
441  // mpiPosition is ignored
442  if (attrName == NULL)
443  throw DCException(getExceptionString("readAttributeInfo", "a parameter was null"));
444 
445  if (strlen(attrName) == 0)
446  throw DCException(getExceptionString("readAttributeInfo", "empty attribute name"));
447 
448  DCParallelGroup group;
449  H5ObjectId objId(openGroup(group, id, dataName));
450  // If objId is not set, then read from iteration
451  if(!objId)
452  return DCAttribute::readAttributeInfo(attrName, group.getHandle());
453  else
454  return DCAttribute::readAttributeInfo(attrName, objId);
455  }
456 
458  const char *dataName,
459  const char *attrName,
460  void* data,
461  Dimensions* /*mpiPosition*/)
462  throw (DCException)
463  {
464  // mpiPosition is ignored
465  if (attrName == NULL || data == NULL)
466  throw DCException(getExceptionString("readAttribute", "a parameter was null"));
467 
468  if (strlen(attrName) == 0)
469  throw DCException(getExceptionString("readAttribute", "empty attribute name"));
470 
471  DCParallelGroup group;
472  H5ObjectId objId(openGroup(group, id, dataName));
473  // If objId is not set, then read from iteration
474  if(!objId)
475  DCAttribute::readAttribute(attrName, group.getHandle(), data);
476  else
477  DCAttribute::readAttribute(attrName, objId, data);
478  }
479 
481  const CollectionType& type,
482  const char *dataName,
483  const char *attrName,
484  const void* data)
485  throw (DCException)
486  {
487  const Dimensions dims(1, 1, 1);
488  writeAttribute( id, type, dataName, attrName,
489  1u, dims, data);
490  }
491 
493  const CollectionType& type,
494  const char *dataName,
495  const char *attrName,
496  uint32_t ndims,
497  const Dimensions dims,
498  const void* data)
499  throw (DCException)
500  {
501  if (attrName == NULL || data == NULL)
502  throw DCException(getExceptionString("writeAttribute", "a parameter was null"));
503 
504  // dataName may be NULL, attribute is attached to iteration group in that case
505  if (dataName && strlen(dataName) == 0)
506  throw DCException(getExceptionString("writeAttribute", "empty dataset name"));
507 
508  if (strlen(attrName) == 0)
509  throw DCException(getExceptionString("writeAttribute", "empty attribute name"));
510 
511  if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
512  throw DCException(getExceptionString("writeAttribute", "this access is not permitted"));
513 
514  if (ndims < 1u || ndims > DSP_DIM_MAX)
515  throw DCException(getExceptionString("writeAttribute", "maximum dimension `ndims` is invalid"));
516 
517  /* group_path: absolute path to the last inode
518  * obj_name: last inode, can be a group or a dataset
519  */
520  std::string group_path, obj_name;
521  std::string dataNameInternal = "";
522  if (dataName)
523  dataNameInternal.assign(dataName);
524  DCDataSet::getFullDataPath(dataNameInternal, SDC_GROUP_DATA, id, group_path, obj_name);
525 
526  DCParallelGroup group;
527  if (dataName)
528  {
529  /* if the specified inode (obj_name) does not exist
530  * (as dataset or group), create all missing groups along group_path
531  * and even create an empty group for obj_name itself
532  *
533  * group_path + "/" + obj_name is the absolute path of dataName
534  */
535  std::string pathAndName(group_path + "/" + obj_name);
536  if(!DCParallelGroup::exists(handles.get(id), pathAndName))
537  group.create(handles.get(id), pathAndName);
538 
539  // attach attribute to the dataset or group
540  group.open(handles.get(id), group_path);
541  hid_t obj_id = H5Oopen(group.getHandle(), obj_name.c_str(), H5P_DEFAULT);
542  if (obj_id < 0)
543  {
544  throw DCException(getExceptionString("writeAttribute",
545  "object not found", obj_name.c_str()));
546  }
547 
548  try
549  {
550  DCAttribute::writeAttribute(attrName, type.getDataType(), obj_id, ndims, dims, data);
551  } catch (const DCException&)
552  {
553  H5Oclose(obj_id);
554  throw;
555  }
556  H5Oclose(obj_id);
557  } else
558  {
559  // attach attribute to the iteration
560  group.openCreate(handles.get(id), group_path);
561  DCAttribute::writeAttribute(attrName, type.getDataType(), group.getHandle(), ndims, dims, data);
562  }
563  }
564 
566  const char* name,
567  Dimensions &sizeRead,
568  void* buf)
569  throw (DCException)
570  {
571  this->read(id, name, Dimensions(0, 0, 0), Dimensions(0, 0, 0), sizeRead, buf);
572  }
573 
575  const char* name,
576  Dimensions dstBuffer,
577  Dimensions dstOffset,
578  Dimensions &sizeRead,
579  void* buf)
580  throw (DCException)
581  {
582  if (fileStatus != FST_READING && fileStatus != FST_WRITING)
583  throw DCException(getExceptionString("read", "this access is not permitted"));
584 
585  uint32_t ndims = 0;
586  readCompleteDataSet(handles.get(id), id, name, dstBuffer, dstOffset,
587  Dimensions(0, 0, 0), sizeRead, ndims, buf);
588  }
589 
591  const Dimensions localSize,
592  const Dimensions globalOffset,
593  const char* name,
594  Dimensions &sizeRead,
595  void* buf) throw (DCException)
596  {
597  this->read(id, localSize, globalOffset, name, localSize,
598  Dimensions(0, 0, 0), sizeRead, buf);
599  }
600 
602  const Dimensions localSize,
603  const Dimensions globalOffset,
604  const char* name,
605  const Dimensions dstBuffer,
606  const Dimensions dstOffset,
607  Dimensions &sizeRead,
608  void* buf) throw (DCException)
609  {
610  if (fileStatus != FST_READING && fileStatus != FST_WRITING)
611  throw DCException(getExceptionString("read", "this access is not permitted"));
612 
613  uint32_t ndims = 0;
614  readDataSet(handles.get(id), id, name, dstBuffer, dstOffset,
615  localSize, globalOffset, sizeRead, ndims, buf);
616  }
617 
619  const char* name,
620  const Dimensions dstBuffer,
621  const Dimensions dstOffset,
622  Dimensions& sizeRead) throw (DCException)
623  {
624  if (fileStatus != FST_READING && fileStatus != FST_WRITING)
625  throw DCException(getExceptionString("readMeta", "this access is not permitted"));
626 
627  uint32_t ndims = 0;
628  return readDataSetMeta(handles.get(id), id, name, dstBuffer, dstOffset,
629  Dimensions(0, 0, 0), sizeRead, ndims);
630  }
631 
632  void ParallelDataCollector::write(int32_t id, const CollectionType& type, uint32_t ndims,
633  const Selection select, const char* name, const void* buf)
634  throw (DCException)
635  {
636  Dimensions globalSize, globalOffset;
637  gatherMPIWrites(ndims, select.count, globalSize, globalOffset);
638 
639  write(id, globalSize, globalOffset,
640  type, ndims, select, name, buf);
641  }
642 
643  void ParallelDataCollector::write(int32_t id, const Dimensions globalSize,
644  const Dimensions globalOffset,
645  const CollectionType& type, uint32_t ndims,
646  const Selection select, const char* name, const void* buf)
647  {
648  if (name == NULL)
649  throw DCException(getExceptionString("write", "parameter name is NULL"));
650 
651  if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
652  throw DCException(getExceptionString("write", "this access is not permitted"));
653 
654  if (ndims < 1 || ndims > DSP_DIM_MAX)
655  throw DCException(getExceptionString("write", "maximum dimension is invalid"));
656 
657  // create group for this id/iteration
658  std::string group_path, dset_name;
659  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
660 
661  DCParallelGroup group;
662  group.openCreate(handles.get(id), group_path);
663 
664  // write data to the group
665  writeDataSet(group.getHandle(), globalSize, globalOffset, type, ndims,
666  select, dset_name.c_str(), buf);
667  }
668 
670  const Dimensions globalSize,
671  uint32_t ndims,
672  const CollectionType& type,
673  const char* name) throw (DCException)
674  {
675  if (name == NULL)
676  throw DCException(getExceptionString("reserve", "a parameter was NULL"));
677 
678  if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
679  throw DCException(getExceptionString("write", "this access is not permitted"));
680 
681  if (ndims < 1 || ndims > DSP_DIM_MAX)
682  throw DCException(getExceptionString("write", "maximum dimension is invalid"));
683 
684  reserveInternal(id, globalSize, ndims, type, name);
685  }
686 
688  const Dimensions size,
689  Dimensions *globalSize,
690  Dimensions *globalOffset,
691  uint32_t ndims,
692  const CollectionType& type,
693  const char* name) throw (DCException)
694  {
695  if (name == NULL)
696  throw DCException(getExceptionString("reserve", "a parameter was NULL"));
697 
698  if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
699  throw DCException(getExceptionString("write", "this access is not permitted"));
700 
701  if (ndims < 1 || ndims > DSP_DIM_MAX)
702  throw DCException(getExceptionString("write", "maximum dimension is invalid"));
703 
704  Dimensions global_size, global_offset;
705  gatherMPIWrites(ndims, size, global_size, global_offset);
706 
707  reserveInternal(id, global_size, ndims, type, name);
708 
709  if (globalSize)
710  globalSize->set(global_size);
711 
712  if (globalOffset)
713  globalOffset->set(global_offset);
714  }
715 
717  const Dimensions size,
718  uint32_t ndims,
719  const Dimensions globalOffset,
720  const char *name,
721  const void *buf)
722  {
723  if (name == NULL)
724  throw DCException(getExceptionString("append", "parameter name is NULL"));
725 
726  if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
727  throw DCException(getExceptionString("append", "this access is not permitted"));
728 
729  if (ndims < 1 || ndims > DSP_DIM_MAX)
730  throw DCException(getExceptionString("append", "maximum dimension is invalid"));
731 
732  // create group for this id/iteration
733  std::string group_path, dset_name;
734  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
735 
736  DCParallelGroup group;
737  group.open(handles.get(id), group_path);
738 
739  // write data to the dataset
740  DCParallelDataSet dataset(dset_name.c_str());
741  dataset.setWriteIndependent();
742 
743  if (!dataset.open(group.getHandle()))
744  {
745  throw DCException(getExceptionString("append",
746  "Cannot open dataset (missing reserve?)", dset_name.c_str()));
747  } else
748  dataset.write(Selection(size), globalOffset, buf);
749 
750  dataset.close();
751  }
752 
754  throw (DCException)
755  {
756  log_msg(1, "removing group %d", id);
757 
758  if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
759  throw DCException(getExceptionString("remove", "this access is not permitted"));
760 
761  std::stringstream group_id_name;
762  group_id_name << SDC_GROUP_DATA << "/" << id;
763 
764  DCParallelGroup::remove(handles.get(id), group_id_name.str());
765 
766  // update maxID
767  getMaxID();
768  }
769 
770  void ParallelDataCollector::remove(int32_t id, const char* name)
771  throw (DCException)
772  {
773  log_msg(1, "removing dataset %s from group %d", name, id);
774 
775  if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
776  throw DCException(getExceptionString("remove", "this access is not permitted"));
777 
778  if (name == NULL)
779  throw DCException(getExceptionString("remove", "a parameter was NULL"));
780 
781  std::string group_path, dset_name;
782  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
783 
784  DCParallelGroup group;
785  group.open(handles.get(id), group_path);
786 
787  if (H5Ldelete(group.getHandle(), dset_name.c_str(), H5P_LINK_ACCESS_DEFAULT) < 0)
788  {
789  throw DCException(getExceptionString("remove", "failed to remove dataset", name));
790  }
791  }
792 
794  const char *srcName,
795  int32_t dstID,
796  const char *dstName)
797  throw (DCException)
798  {
799  if (srcName == NULL || dstName == NULL)
800  throw DCException(getExceptionString("createReference", "a parameter was NULL"));
801 
802  if (fileStatus == FST_CLOSED || fileStatus == FST_READING)
803  throw DCException(getExceptionString("createReference", "this access is not permitted"));
804 
805  if (srcID != dstID)
806  throw DCException(getExceptionString("createReference",
807  "source and destination ID must be identical", NULL));
808 
809  if (srcName == dstName)
810  throw DCException(getExceptionString("createReference",
811  "a reference must not be identical to the referenced data", srcName));
812 
813  // open source group
814  std::string src_group_path, src_dset_name, dst_dset_name;
815  DCDataSet::getFullDataPath(srcName, SDC_GROUP_DATA, srcID, src_group_path, src_dset_name);
816  DCDataSet::getFullDataPath(dstName, SDC_GROUP_DATA, dstID, src_group_path, dst_dset_name);
817 
818  DCParallelGroup src_group;
819  src_group.open(handles.get(srcID), src_group_path);
820 
821  // open source dataset
822  DCParallelDataSet src_dataset(src_dset_name.c_str());
823  src_dataset.open(src_group.getHandle());
824 
825  DCParallelDataSet dst_dataset(dst_dset_name.c_str());
826  // create the actual reference as a new dataset
827  // identical src and dst groups
828  dst_dataset.createReference(src_group.getHandle(), src_group.getHandle(), src_dataset);
829 
830  dst_dataset.close();
831  src_dataset.close();
832  }
833 
834  /*******************************************************************************
835  * PROTECTED FUNCTIONS
836  *******************************************************************************/
837 
838  void ParallelDataCollector::fileCreateCallback(H5Handle handle, uint32_t index, void *userData)
839  throw (DCException)
840  {
841  Options *options = (Options*) userData;
842 
843  // the custom group holds user-specified attributes
844  DCParallelGroup group;
845  group.create(handle, PDC_GROUP_CUSTOM);
846  group.close();
847 
848  // the data group holds the actual simulation data
849  group.create(handle, SDC_GROUP_DATA);
850  group.close();
851 
852  writeHeader(handle, index, options->enableCompression, options->mpiTopology);
853  }
854 
855  void ParallelDataCollector::fileOpenCallback(H5Handle /*handle*/, uint32_t index, void *userData)
856  throw (DCException)
857  {
858  Options *options = (Options*) userData;
859 
860  options->maxID = std::max(options->maxID, (int32_t) index);
861  }
862 
863  void ParallelDataCollector::writeHeader(hid_t fHandle, uint32_t id,
864  bool enableCompression, Dimensions mpiTopology)
865  throw (DCException)
866  {
867  // create group for header information (position, grid size, ...)
868  DCParallelGroup group;
869  group.create(fHandle, SDC_GROUP_HEADER);
870 
871  int32_t index = id;
872  bool compression = enableCompression;
873  std::stringstream splashVersion;
874  splashVersion << SPLASH_VERSION_MAJOR << "."
875  << SPLASH_VERSION_MINOR << "."
876  << SPLASH_VERSION_PATCH;
877  std::stringstream splashFormat;
878  splashFormat << SPLASH_FILE_FORMAT_MAJOR << "."
879  << SPLASH_FILE_FORMAT_MINOR;
880 
881  ColTypeInt32 ctInt32;
882  ColTypeBool ctBool;
883  ColTypeDim dim_t;
884  ColTypeString ctStringVersion(splashVersion.str().length());
885  ColTypeString ctStringFormat(splashFormat.str().length());
886 
887  // create master specific header attributes
888  DCAttribute::writeAttribute(SDC_ATTR_MAX_ID, ctInt32.getDataType(),
889  group.getHandle(), &index);
890 
891  DCAttribute::writeAttribute(SDC_ATTR_COMPRESSION, ctBool.getDataType(),
892  group.getHandle(), &compression);
893 
894  DCAttribute::writeAttribute(SDC_ATTR_MPI_SIZE, dim_t.getDataType(),
895  group.getHandle(), mpiTopology.getPointer());
896 
897  DCAttribute::writeAttribute(SDC_ATTR_VERSION, ctStringVersion.getDataType(),
898  group.getHandle(), splashVersion.str().c_str());
899 
900  DCAttribute::writeAttribute(SDC_ATTR_FORMAT, ctStringFormat.getDataType(),
901  group.getHandle(), splashFormat.str().c_str());
902  }
903 
904  void ParallelDataCollector::openCreate(const char *filename,
905  FileCreationAttr& /*attr*/)
906  throw (DCException)
907  {
908  this->fileStatus = FST_CREATING;
909 
910  // filters are currently not supported by parallel HDF5
911  //this->options.enableCompression = attr.enableCompression;
912 
913  log_msg(1, "compression = 0");
914 
915  options.maxID = -1;
916 
917  // open file
918  handles.open(Dimensions(1, 1, 1), filename, fileAccProperties, H5F_ACC_TRUNC);
919  }
920 
921  void ParallelDataCollector::openRead(const char* filename, FileCreationAttr& /*attr*/)
922  throw (DCException)
923  {
924  this->fileStatus = FST_READING;
925 
926  getMaxID();
927 
928  handles.open(Dimensions(1, 1, 1), filename, fileAccProperties, H5F_ACC_RDONLY);
929  }
930 
931  void ParallelDataCollector::openWrite(const char* filename, FileCreationAttr& /*attr*/)
932  throw (DCException)
933  {
934  this->fileStatus = FST_WRITING;
935 
936  getMaxID();
937 
938  // filters are currently not supported by parallel HDF5
939  //this->options.enableCompression = attr.enableCompression;
940 
941  handles.open(Dimensions(1, 1, 1), filename, fileAccProperties, H5F_ACC_RDWR);
942  }
943 
944  void ParallelDataCollector::readCompleteDataSet(H5Handle h5File,
945  int32_t id,
946  const char* name,
947  const Dimensions dstBuffer,
948  const Dimensions dstOffset,
949  const Dimensions srcOffset,
950  Dimensions &sizeRead,
951  uint32_t& srcRank,
952  void* dst)
953  throw (DCException)
954  {
955  log_msg(2, "readCompleteDataSet");
956 
957  if (h5File < 0 || name == NULL)
958  throw DCException(getExceptionString("readCompleteDataSet", "invalid parameters"));
959 
960  std::string group_path, dset_name;
961  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
962 
963  DCParallelGroup group;
964  group.open(h5File, group_path);
965 
966  DCParallelDataSet dataset(dset_name.c_str());
967  dataset.open(group.getHandle());
968  const Dimensions src_size(dataset.getSize() - srcOffset);
969 
970  dataset.read(dstBuffer, dstOffset, src_size, srcOffset, sizeRead, srcRank, dst);
971  dataset.close();
972  }
973 
974  void ParallelDataCollector::readDataSet(H5Handle h5File,
975  int32_t id,
976  const char* name,
977  const Dimensions dstBuffer,
978  const Dimensions dstOffset,
979  const Dimensions srcSize,
980  const Dimensions srcOffset,
981  Dimensions &sizeRead,
982  uint32_t& srcRank,
983  void* dst)
984  throw (DCException)
985  {
986  log_msg(2, "readDataSet");
987 
988  if (h5File < 0 || name == NULL)
989  throw DCException(getExceptionString("readDataSet", "invalid parameters"));
990 
991  std::string group_path, dset_name;
992  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
993 
994  DCParallelGroup group;
995  group.open(h5File, group_path);
996 
997  DCParallelDataSet dataset(dset_name.c_str());
998  dataset.open(group.getHandle());
999 
1000  dataset.read(dstBuffer, dstOffset, srcSize, srcOffset, sizeRead, srcRank, dst);
1001  dataset.close();
1002  }
1003 
1004  CollectionType* ParallelDataCollector::readDataSetMeta(H5Handle h5File,
1005  int32_t id,
1006  const char* name,
1007  const Dimensions dstBuffer,
1008  const Dimensions dstOffset,
1009  const Dimensions srcOffset,
1010  Dimensions &sizeRead,
1011  uint32_t& srcDims)
1012  throw (DCException)
1013  {
1014  log_msg(2, "readDataSetMeta");
1015 
1016  std::string group_path, dset_name;
1017  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
1018 
1019  DCGroup group;
1020  group.open(h5File, group_path);
1021 
1022  DCDataSet dataset(dset_name.c_str());
1023  dataset.open(group.getHandle());
1024 
1025  size_t entrySize;
1026  getEntriesForID(id, NULL, &entrySize);
1027  std::vector<DataCollector::DCEntry> entries(entrySize);
1028 
1029  getEntriesForID(id, &(*entries.begin()), NULL);
1030 
1031  // find entry by name
1032  int32_t entry_id = -1;
1033  for(size_t i = 0; i < entrySize; ++i)
1034  if(std::string(name) == entries[i].name)
1035  {
1036  entry_id = int32_t(i);
1037  break;
1038  }
1039 
1040  if(entry_id < 0)
1041  throw DCException(getExceptionString("readDataSetMeta", "Entry not found by name", name));
1042 
1043  Dimensions src_size(dataset.getSize() - srcOffset);
1044  dataset.read(dstBuffer, dstOffset, src_size, srcOffset, sizeRead, srcDims, NULL);
1045  dataset.close();
1046 
1047  log_msg(3, "Entry '%s' (%d) is of type: %s",
1048  entries[entry_id].name.c_str(),
1049  entry_id,
1050  entries[entry_id].colType->toString().c_str());
1051 
1052  return entries[entry_id].colType;
1053  }
1054 
1055  void ParallelDataCollector::writeDataSet(H5Handle group,
1056  const Dimensions globalSize,
1057  const Dimensions globalOffset,
1058  const CollectionType& datatype,
1059  uint32_t ndims,
1060  const Selection srcSelect,
1061  const char* name,
1062  const void* data) throw (DCException)
1063  {
1064  log_msg(2, "writeDataSet");
1065 
1066  DCParallelDataSet dataset(name);
1067  // always create dataset but write data only if all dimensions > 0
1068  // not extensible
1069  dataset.create(datatype, group, globalSize, ndims,
1070  this->options.enableCompression, false);
1071  dataset.write(srcSelect, globalOffset, data);
1072  dataset.close();
1073  }
1074 
1075  void ParallelDataCollector::gatherMPIWrites(int ndims, const Dimensions localSize,
1076  Dimensions &globalSize, Dimensions &globalOffset)
1077  throw (DCException)
1078  {
1079  uint64_t write_sizes[options.mpiSize * DSP_DIM_MAX];
1080  uint64_t local_write_size[DSP_DIM_MAX] = {localSize[0], localSize[1], localSize[2]};
1081 
1082  globalSize.set(1, 1, 1);
1083  globalOffset.set(0, 0, 0);
1084 
1085  if (MPI_Allgather(local_write_size, DSP_DIM_MAX, MPI_UNSIGNED_LONG_LONG,
1086  write_sizes, DSP_DIM_MAX, MPI_UNSIGNED_LONG_LONG, options.mpiComm) != MPI_SUCCESS)
1087  throw DCException(getExceptionString("gatherMPIWrites",
1088  "MPI_Allgather failed", NULL));
1089 
1090  Dimensions tmp_mpi_topology(options.mpiTopology);
1091  Dimensions tmp_mpi_pos(options.mpiPos);
1092  if (ndims == 1)
1093  {
1094  tmp_mpi_topology.set(options.mpiTopology.getScalarSize(), 1, 1);
1095  tmp_mpi_pos.set(options.mpiRank, 0, 0);
1096  }
1097 
1098  if ((ndims == 2) && (tmp_mpi_topology[2] > 1))
1099  {
1100  throw DCException(getExceptionString("gatherMPIWrites",
1101  "cannot auto-detect global size/offset for 2D data when writing with 3D topology", NULL));
1102  }
1103 
1104  for (int i = 0; i < ndims; ++i)
1105  {
1106  globalSize[i] = 0;
1107  size_t index;
1108  for (size_t dim = 0; dim < tmp_mpi_topology[i]; ++dim)
1109  {
1110  switch (i)
1111  {
1112  case 0:
1113  index = dim;
1114  break;
1115  case 1:
1116  index = dim * tmp_mpi_topology[0];
1117  break;
1118  default:
1119  index = dim * tmp_mpi_topology[0] * tmp_mpi_topology[1];
1120  }
1121 
1122  globalSize[i] += write_sizes[index * DSP_DIM_MAX + i];
1123  if (dim < tmp_mpi_pos[i])
1124  globalOffset[i] += write_sizes[index * DSP_DIM_MAX + i];
1125  }
1126  }
1127  }
1128 
1129  size_t ParallelDataCollector::getNDims(H5Handle h5File,
1130  int32_t id, const char* name)
1131  {
1132  if (h5File < 0 || name == NULL)
1133  throw DCException(getExceptionString("getNDims", "invalid parameters"));
1134 
1135  std::string group_path, dset_name;
1136  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
1137 
1138  DCParallelGroup group;
1139  group.open(h5File, group_path);
1140 
1141  size_t ndims = 0;
1142 
1143  DCParallelDataSet dataset(dset_name.c_str());
1144  dataset.open(group.getHandle());
1145 
1146  ndims = dataset.getNDims();
1147 
1148  dataset.close();
1149 
1150  return ndims;
1151  }
1152 
1153  void ParallelDataCollector::reserveInternal(int32_t id,
1154  const Dimensions globalSize,
1155  uint32_t ndims,
1156  const CollectionType& type,
1157  const char* name)
1158  throw (DCException)
1159  {
1160  log_msg(2, "reserveInternal");
1161 
1162  // create group for this id/iteration
1163  std::string group_path, dset_name;
1164  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
1165 
1166  DCParallelGroup group;
1167  group.openCreate(handles.get(id), group_path);
1168 
1169  DCParallelDataSet dataset(dset_name.c_str());
1170  // create the empty extensible dataset
1171  dataset.create(type, group.getHandle(), globalSize, ndims,
1172  this->options.enableCompression, true);
1173  dataset.close();
1174  }
1175 
1176  /* UNIMPLEMENTED METHODS FROM DATACOLLECTOR. TODO: Unify interface to remove those */
1177  void ParallelDataCollector::readGlobalAttribute(const char*, void*, Dimensions*)
1178  throw (DCException)
1179  {
1180  throw DCException(getExceptionString("readGlobalAttribute",
1181  "feature currently not supported by Parallel HDF5"));
1182  }
1183 
1185  const char* /*name*/, const void* /*data*/) throw (DCException)
1186  {
1187  throw DCException(getExceptionString("readGlobalAttribute",
1188  "feature currently not supported by Parallel HDF5"));
1189  }
1190 
1192  const char* /*name*/, uint32_t /*ndims*/, const Dimensions /*dims*/,
1193  const void* /*data*/) throw (DCException)
1194  {
1195  throw DCException(getExceptionString("readGlobalAttribute",
1196  "feature currently not supported by Parallel HDF5"));
1197  }
1198 
1199  void ParallelDataCollector::append(int32_t /*id*/, const CollectionType& /*type*/,
1200  size_t /*count*/, const char* /*name*/, const void* /*data*/)
1201  throw (DCException)
1202  {
1203  throw DCException(getExceptionString("readGlobalAttribute",
1204  "feature currently not supported by Parallel HDF5"));
1205  }
1206 
1207  void ParallelDataCollector::append(int32_t /*id*/, const CollectionType& /*type*/,
1208  size_t /*count*/, size_t /*offset*/, size_t /*stride*/, const char* /*name*/,
1209  const void* /*data*/) throw (DCException)
1210  {
1211  throw DCException(getExceptionString("readGlobalAttribute",
1212  "feature currently not supported by Parallel HDF5"));
1213  }
1214 
1215  void ParallelDataCollector::createReference(int32_t /*srcID*/,
1216  const char* /*srcName*/,
1217  int32_t /*dstID*/,
1218  const char* /*dstName*/,
1219  Dimensions /*count*/,
1220  Dimensions /*offset*/,
1221  Dimensions /*stride*/)
1222  throw (DCException)
1223  {
1224  throw DCException(getExceptionString("readGlobalAttribute",
1225  "feature currently not supported by Parallel HDF5"));
1226  }
1227 
1228 }
EXTERN void setLogMpiRank(int rank)
Definition: logging.cpp:51
hsize_t * getPointer()
Definition: Dimensions.hpp:192
void createReference(int32_t srcID, const char *srcName, int32_t dstID, const char *dstName)
void readAttribute(int32_t id, const char *dataName, const char *attrName, void *buf, Dimensions *mpiPosition=NULL)
void open(const char *filename, FileCreationAttr &attr)
CollectionType * readMeta(int32_t id, const char *name, const Dimensions dstBuffer, const Dimensions dstOffset, Dimensions &sizeRead)
void append(int32_t id, const Dimensions size, uint32_t rank, const Dimensions globalOffset, const char *name, const void *buf)
void set(hsize_t x, hsize_t y, hsize_t z)
Definition: Dimensions.hpp:230
AttributeInfo readGlobalAttributeInfo(int32_t id, const char *name, Dimensions *mpiPosition=NULL)
void readGlobalAttribute(int32_t id, const char *name, void *buf)
size_t getScalarSize() const
Definition: Dimensions.hpp:219
void writeGlobalAttribute(int32_t id, const CollectionType &type, const char *name, const void *buf)
AttributeInfo readAttributeInfo(int32_t id, const char *dataName, const char *attrName, Dimensions *mpiPosition=NULL)
void getEntriesForID(int32_t id, DCEntry *entries, size_t *count)
void getEntryIDs(int32_t *ids, size_t *count)
void write(int32_t id, const CollectionType &type, uint32_t rank, const Selection select, const char *name, const void *buf)
void read(int32_t id, const char *name, Dimensions &sizeRead, void *buf)
EXTERN void parseEnvVars(void)
Definition: logging.cpp:41
const H5DataType & getDataType() const
EXTERN void log_msg(int level, const char *fmt,...)
Definition: logging.cpp:56
ParallelDataCollector(MPI_Comm comm, MPI_Info info, const Dimensions topology, uint32_t maxFileHandles)
void reserve(int32_t id, const Dimensions globalSize, uint32_t rank, const CollectionType &type, const char *name)
void getMPISize(Dimensions &mpiSize)
void writeAttribute(int32_t id, const CollectionType &type, const char *dataName, const char *attrName, const void *buf)