libSplash
SerialDataCollector.cpp
1 
25 #include <cstring>
26 #include <time.h>
27 #include <stdlib.h>
28 #include <cassert>
29 #include <sys/stat.h>
30 
31 #include "splash/SerialDataCollector.hpp"
32 #include "splash/AttributeInfo.hpp"
33 #include "splash/core/DCAttribute.hpp"
34 #include "splash/core/DCDataSet.hpp"
35 #include "splash/core/DCGroup.hpp"
36 #include "splash/core/SDCHelper.hpp"
37 #include "splash/core/logging.hpp"
38 #include "splash/core/H5IdWrapper.hpp"
39 #include "splash/basetypes/basetypes.hpp"
40 
41 namespace splash
42 {
43 
44  /*******************************************************************************
45  * PRIVATE FUNCTIONS
46  *******************************************************************************/
47 
48  void SerialDataCollector::setFileAccessParams(hid_t& fileAccProperties)
49  {
50  fileAccProperties = H5P_FILE_ACCESS_DEFAULT;
51 
52  int metaCacheElements = 0;
53  size_t rawCacheElements = 0;
54  size_t rawCacheSize = 0;
55  double policy = 0.0;
56 
57  // set new cache size
58  H5Pget_cache(fileAccProperties, &metaCacheElements, &rawCacheElements, &rawCacheSize, &policy);
59  rawCacheSize = 256 * 1024 * 1024;
60  H5Pset_cache(fileAccProperties, metaCacheElements, rawCacheElements, rawCacheSize, policy);
61 
62  log_msg(3, "Raw Data Cache (File) = %llu KiB", (long long unsigned) (rawCacheSize / 1024));
63  }
64 
65  bool SerialDataCollector::fileExists(std::string filename)
66  {
67  struct stat fileInfo;
68  return (stat(filename.c_str(), &fileInfo) == 0);
69  }
70 
71  std::string SerialDataCollector::getFullFilename(const Dimensions mpiPos, std::string baseFilename,
72  bool isFullNameAllowed) const throw (DCException)
73  {
74  // Check for existing extension
75  if (baseFilename.find(".h5") == baseFilename.length() - 3)
76  {
77  if (isFullNameAllowed)
78  return baseFilename;
79  else
80  throw DCException("Full filename is not allowed!");
81  }
82 
83  std::stringstream serial_filename;
84  serial_filename << baseFilename << "_" << mpiPos[0] << "_" << mpiPos[1] <<
85  "_" << mpiPos[2] << ".h5";
86 
87  return serial_filename.str();
88  }
89 
90  std::string SerialDataCollector::getExceptionString(std::string func, std::string msg,
91  const char *info)
92  {
93  std::stringstream full_msg;
94  full_msg << "Exception for SerialDataCollector::" << func <<
95  ": " << msg;
96 
97  if (info != NULL)
98  full_msg << " (" << info << ")";
99 
100  return full_msg.str();
101  }
102 
103  /*******************************************************************************
104  * PUBLIC FUNCTIONS
105  *******************************************************************************/
106 
107  SerialDataCollector::SerialDataCollector(uint32_t maxFileHandles) :
108  handles(maxFileHandles, HandleMgr::FNS_FULLNAME),
109  fileStatus(FST_CLOSED),
110  maxID(-1),
111  mpiTopology(1, 1, 1)
112  {
113 #ifdef COL_TYPE_CPP
114  throw DCException("Check your defines !");
115 #endif
116 
117  parseEnvVars();
118 
119  if (H5open() < 0)
120  throw DCException(getExceptionString("SerialDataCollector",
121  "failed to initialize/open HDF5 library"));
122 
123 #ifndef SPLASH_VERBOSE_HDF5
124  // Suppress automatic output of HDF5 exception messages
125  if (H5Eset_auto2(H5E_DEFAULT, NULL, NULL) < 0)
126  throw DCException(getExceptionString("SerialDataCollector",
127  "failed to disable error printing"));
128 #endif
129 
130  // set some default file access parameters
131  setFileAccessParams(fileAccProperties);
132  }
133 
135  {
136  close();
137  }
138 
139  void SerialDataCollector::open(const char* filename, FileCreationAttr &attr)
140  throw (DCException)
141  {
142  log_msg(1, "opening serial data collector");
143 
144  if (filename == NULL)
145  throw DCException(getExceptionString("open", "filename must not be null"));
146 
147  if (fileStatus != FST_CLOSED)
148  throw DCException(getExceptionString("open", "this access is not permitted"));
149 
150  switch (attr.fileAccType)
151  {
152  case FAT_READ:
153  openRead(filename, attr);
154  break;
155  case FAT_WRITE:
156  openWrite(filename, attr);
157  break;
158  case FAT_CREATE:
159  openCreate(filename, attr);
160  break;
161  case FAT_READ_MERGED:
162  openMerge(filename);
163  break;
164  }
165  }
166 
168  {
169  if (fileStatus == FST_CLOSED)
170  return;
171 
172  log_msg(1, "closing serial data collector");
173 
174  if ((fileStatus == FST_CREATING || fileStatus == FST_WRITING) &&
175  maxID >= 0)
176  {
177  // write number of iterations
178  try
179  {
180  DCGroup group;
181  group.open(handles.get(0), SDC_GROUP_HEADER);
182  ColTypeInt32 ctInt32;
183  DCAttribute::writeAttribute(SDC_ATTR_MAX_ID, ctInt32.getDataType(),
184  group.getHandle(), &maxID);
185  } catch (const DCException& e)
186  {
187  log_msg(0, "Exception: %s", e.what());
188  log_msg(1, "continuing...");
189  }
190  }
191 
192  maxID = -1;
193  mpiTopology.set(1, 1, 1);
194 
195  // close opened hdf5 file handles
196  handles.close();
197 
198  fileStatus = FST_CLOSED;
199  }
200 
201  void SerialDataCollector::openCustomGroup(DCGroup& group,
202  Dimensions *mpiPosition) throw (DCException)
203  {
204  // Note: Factored out from readGlobalAttribute
205 
206  if (fileStatus == FST_CLOSED || fileStatus == FST_CREATING)
207  throw DCException(getExceptionString("openCustomGroup", "this access is not permitted"));
208 
209  std::stringstream group_custom_name;
210  if (mpiPosition == NULL || fileStatus == FST_MERGING)
211  group_custom_name << SDC_GROUP_CUSTOM;
212  else
213  group_custom_name << SDC_GROUP_CUSTOM << "_" <<
214  (*mpiPosition)[0] << "_" << (*mpiPosition)[1] << "_" << (*mpiPosition)[2];
215  const std::string custom_string = group_custom_name.str();
216 
217 
218  uint32_t mpi_rank = 0;
219  if (fileStatus == FST_MERGING)
220  {
221  Dimensions mpi_pos(0, 0, 0);
222  if (mpiPosition != NULL)
223  mpi_pos = *mpiPosition;
224 
225  mpi_rank = mpi_pos[2] * mpiTopology[0] * mpiTopology[1] + mpi_pos[1] * mpiTopology[0] + mpi_pos[0];
226  }
227 
228  group.open(handles.get(mpi_rank), custom_string.c_str());
229  }
230 
231  hid_t SerialDataCollector::openGroup(DCGroup& group, int32_t id, const char* dataName,
232  Dimensions *mpiPosition) throw (DCException)
233  {
234  // Note: Factored out from readAttribute
235 
236  // dataName may be NULL, attribute is read to iteration group in that case
237  if (dataName && strlen(dataName) == 0)
238  throw DCException(getExceptionString("openGroup", "empty dataset name"));
239 
240  if (fileStatus == FST_CLOSED || fileStatus == FST_CREATING)
241  throw DCException(getExceptionString("openGroup", "this access is not permitted"));
242 
243  std::string group_path, obj_name;
244  std::string dataNameInternal = "";
245  if (dataName)
246  dataNameInternal.assign(dataName);
247  DCDataSet::getFullDataPath(dataNameInternal, SDC_GROUP_DATA, id, group_path, obj_name);
248 
249  Dimensions mpi_pos(0, 0, 0);
250  if ((fileStatus == FST_MERGING) && (mpiPosition != NULL))
251  {
252  mpi_pos.set(*mpiPosition);
253  }
254 
255  group.open(handles.get(mpi_pos), group_path);
256 
257  if (dataName)
258  {
259  // read from the dataset or group
260  if (H5Lexists(group.getHandle(), obj_name.c_str(), H5P_LINK_ACCESS_DEFAULT))
261  return H5Oopen(group.getHandle(), obj_name.c_str(), H5P_DEFAULT);
262  else
263  {
264  throw DCException(getExceptionString("openGroup",
265  "dataset not found", obj_name.c_str()));
266  }
267  } else
268  return -1;
269  }
270 
272  int32_t /*id*/,
273  const char* name,
274  Dimensions *mpiPosition)
275  throw (DCException)
276  {
277  // mpiPosition is allowed to be NULL here
278  if (name == NULL)
279  throw DCException(getExceptionString("readGlobalAttributeInfo", "a parameter was null"));
280 
281  DCGroup group_custom;
282  openCustomGroup(group_custom, mpiPosition);
283 
284  return DCAttribute::readAttributeInfo(name, group_custom.getHandle());
285  }
286 
288  const char* name,
289  void* data,
290  Dimensions *mpiPosition)
291  throw (DCException)
292  {
293  // mpiPosition is allowed to be NULL here
294  if (name == NULL || data == NULL)
295  throw DCException(getExceptionString("readGlobalAttribute", "a parameter was null"));
296 
297  DCGroup group_custom;
298  openCustomGroup(group_custom, mpiPosition);
299 
300  try
301  {
302  DCAttribute::readAttribute(name, group_custom.getHandle(), data);
303  } catch (const DCException&)
304  {
305  throw DCException(getExceptionString("readGlobalAttribute", "failed to open attribute", name));
306  }
307  }
308 
310  const char* name,
311  const void* data)
312  throw (DCException)
313  {
314  const Dimensions dims(1, 1, 1);
315  writeGlobalAttribute(type, name, 1u, dims, data);
316  }
317 
319  const char* name,
320  uint32_t ndims,
321  const Dimensions dims,
322  const void* data)
323  throw (DCException)
324  {
325  if (name == NULL || data == NULL)
326  throw DCException(getExceptionString("writeGlobalAttribute", "a parameter was null"));
327 
328  if (fileStatus == FST_CLOSED || fileStatus == FST_READING || fileStatus == FST_MERGING)
329  throw DCException(getExceptionString("writeGlobalAttribute", "this access is not permitted"));
330 
331  if (ndims < 1u || ndims > DSP_DIM_MAX)
332  throw DCException(getExceptionString("writeGlobalAttribute", "maximum dimension `ndims` is invalid"));
333 
334  DCGroup group_custom;
335  group_custom.open(handles.get(0), SDC_GROUP_CUSTOM);
336 
337  try
338  {
339  DCAttribute::writeAttribute(name, type.getDataType(), group_custom.getHandle(), ndims, dims, data);
340  } catch (const DCException& e)
341  {
342  std::cerr << e.what() << std::endl;
343  throw DCException(getExceptionString("writeGlobalAttribute", "failed to write attribute", name));
344  }
345  }
346 
348  const char *dataName,
349  const char *attrName,
350  Dimensions *mpiPosition)
351  throw (DCException)
352  {
353  // mpiPosition is allowed to be NULL here
354  if (attrName == NULL)
355  throw DCException(getExceptionString("readAttributeInfo", "a parameter was null"));
356 
357  if (strlen(attrName) == 0)
358  throw DCException(getExceptionString("readAttributeInfo", "empty attribute name"));
359 
360  DCGroup group;
361  H5ObjectId objId(openGroup(group, id, dataName, mpiPosition));
362 
363  // When no object is returned read attribute from the iteration group
364  if (!objId)
365  return DCAttribute::readAttributeInfo(attrName, group.getHandle());
366  else
367  return DCAttribute::readAttributeInfo(attrName, objId);
368  }
369 
371  const char *dataName,
372  const char *attrName,
373  void* data,
374  Dimensions *mpiPosition)
375  throw (DCException)
376  {
377  // mpiPosition is allowed to be NULL here
378  if (attrName == NULL || data == NULL)
379  throw DCException(getExceptionString("readAttribute", "a parameter was null"));
380 
381  if (strlen(attrName) == 0)
382  throw DCException(getExceptionString("readAttribute", "empty attribute name"));
383 
384  DCGroup group;
385  H5ObjectId objId(openGroup(group, id, dataName, mpiPosition));
386 
387  // When no object is returned read attribute from the iteration group
388  if (objId)
389  DCAttribute::readAttribute(attrName, objId, data);
390  else
391  DCAttribute::readAttribute(attrName, group.getHandle(), data);
392  }
393 
395  const CollectionType& type,
396  const char *dataName,
397  const char *attrName,
398  const void* data)
399  throw (DCException)
400  {
401  const Dimensions dims(1, 1, 1);
402  SerialDataCollector::writeAttribute( id, type, dataName, attrName,
403  1u, dims, data);
404  }
405 
407  const CollectionType& type,
408  const char *dataName,
409  const char *attrName,
410  uint32_t ndims,
411  const Dimensions dims,
412  const void* data)
413  throw (DCException)
414  {
415  if (attrName == NULL || data == NULL)
416  throw DCException(getExceptionString("writeAttribute", "a parameter was null"));
417 
418  // dataName may be NULL, attribute is attached to iteration group in that case
419  if (dataName && strlen(dataName) == 0)
420  throw DCException(getExceptionString("writeAttribute", "empty dataset name"));
421 
422  if (strlen(attrName) == 0)
423  throw DCException(getExceptionString("writeAttribute", "empty attribute name"));
424 
425  if (fileStatus == FST_CLOSED || fileStatus == FST_READING || fileStatus == FST_MERGING)
426  throw DCException(getExceptionString("writeAttribute", "this access is not permitted"));
427 
428  if (ndims < 1u || ndims > DSP_DIM_MAX)
429  throw DCException(getExceptionString("writeAttribute", "maximum dimension `ndims` is invalid"));
430 
431  /* group_path: absolute path to the last inode
432  * obj_name: last inode, can be a group or a dataset
433  */
434  std::string group_path, obj_name;
435  std::string dataNameInternal = "";
436  if (dataName)
437  dataNameInternal.assign(dataName);
438  DCDataSet::getFullDataPath(dataNameInternal, SDC_GROUP_DATA, id, group_path, obj_name);
439 
440  DCGroup group;
441  if (dataName)
442  {
443  /* if the specified inode (obj_name) does not exist
444  * (as dataset or group), create all missing groups along group_path
445  * and even create an empty group for obj_name itself
446  *
447  * group_path + "/" + obj_name is the absolute path of dataName
448  */
449  std::string pathAndName(group_path + "/" + obj_name);
450  if(!DCGroup::exists(handles.get(id), pathAndName))
451  group.create(handles.get(id), pathAndName);
452 
453  // attach attribute to the dataset or group
454  group.open(handles.get(0), group_path);
455 
456  hid_t obj_id = H5Oopen(group.getHandle(), obj_name.c_str(), H5P_DEFAULT);
457  if (obj_id < 0)
458  {
459  throw DCException(getExceptionString("writeAttribute",
460  "object not found", obj_name.c_str()));
461  }
462 
463  try
464  {
465  DCAttribute::writeAttribute(attrName, type.getDataType(), obj_id, ndims, dims, data);
466  } catch (const DCException&)
467  {
468  H5Oclose(obj_id);
469  throw;
470  }
471  H5Oclose(obj_id);
472  } else
473  {
474  // attach attribute to the iteration group
475  group.openCreate(handles.get(0), group_path);
476  DCAttribute::writeAttribute(attrName, type.getDataType(), group.getHandle(), ndims, dims, data);
477  }
478  }
479 
480  void SerialDataCollector::read(int32_t id,
481  const char* name,
482  Dimensions &sizeRead,
483  void* data)
484  throw (DCException)
485  {
486  this->read(id, name, Dimensions(0, 0, 0), Dimensions(0, 0, 0), sizeRead, data);
487  }
488 
489  void SerialDataCollector::read(int32_t id,
490  const char* name,
491  const Dimensions dstBuffer,
492  const Dimensions dstOffset,
493  Dimensions &sizeRead,
494  void* data)
495  throw (DCException)
496  {
497  if (fileStatus != FST_READING && fileStatus != FST_WRITING && fileStatus != FST_MERGING)
498  throw DCException(getExceptionString("read", "this access is not permitted"));
499 
500  uint32_t ndims = 0;
501  readCompleteDataSet(handles.get(0), id, name, dstBuffer, dstOffset,
502  Dimensions(0, 0, 0), sizeRead, ndims, data);
503  }
504 
506  const char* name,
507  const Dimensions dstBuffer,
508  const Dimensions dstOffset,
509  Dimensions &sizeRead)
510  throw (DCException)
511  {
512  if (fileStatus != FST_READING && fileStatus != FST_WRITING && fileStatus != FST_MERGING)
513  throw DCException(getExceptionString("readMeta", "this access is not permitted"));
514 
515  uint32_t ndims = 0;
516  return readDataSetMeta(handles.get(id), id, name, dstBuffer, dstOffset,
517  Dimensions(0, 0, 0), sizeRead, ndims);
518  }
519 
520  void SerialDataCollector::write(int32_t id, const CollectionType& type, uint32_t ndims,
521  const Selection select, const char* name, const void* data)
522  throw (DCException)
523  {
524  if (name == NULL)
525  throw DCException(getExceptionString("write", "parameter name is NULL"));
526 
527  if (fileStatus == FST_CLOSED || fileStatus == FST_READING || fileStatus == FST_MERGING)
528  throw DCException(getExceptionString("write", "this access is not permitted"));
529 
530  if (ndims < 1 || ndims > DSP_DIM_MAX)
531  throw DCException(getExceptionString("write", "maximum dimension is invalid"));
532 
533  if (id > this->maxID)
534  this->maxID = id;
535 
536  std::string group_path, dset_name;
537  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
538 
539  DCGroup group;
540  group.openCreate(handles.get(0), group_path);
541 
542  // write data to the group
543  try
544  {
545  writeDataSet(group.getHandle(), type, ndims, select, dset_name.c_str(), data);
546  } catch (const DCException&)
547  {
548  throw;
549  }
550  }
551 
552  void SerialDataCollector::append(int32_t id, const CollectionType& type,
553  size_t count, const char* name, const void* data)
554  throw (DCException)
555  {
556  append(id, type, count, 0, 1, name, data);
557  }
558 
559  void SerialDataCollector::append(int32_t id, const CollectionType& type,
560  size_t count, size_t offset, size_t stride, const char* name, const void* data)
561  throw (DCException)
562  {
563  if (name == NULL)
564  throw DCException(getExceptionString("append", "parameter name is NULL"));
565 
566  if (fileStatus == FST_CLOSED || fileStatus == FST_READING || fileStatus == FST_MERGING)
567  throw DCException(getExceptionString("append", "this access is not permitted"));
568 
569  if (id > this->maxID)
570  this->maxID = id;
571 
572  std::string group_path, dset_name;
573  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
574 
575  DCGroup group;
576  group.openCreate(handles.get(0), group_path);
577 
578  // write data to the group
579  try
580  {
581  appendDataSet(group.getHandle(), type, count, offset,
582  stride, dset_name.c_str(), data);
583  } catch (const DCException&)
584  {
585  throw;
586  }
587  }
588 
590  throw (DCException)
591  {
592  log_msg(1, "removing group %d", id);
593 
594  if (fileStatus == FST_CLOSED || fileStatus == FST_READING || fileStatus == FST_MERGING)
595  throw DCException(getExceptionString("remove", "this access is not permitted"));
596 
597  std::stringstream group_id_name;
598  group_id_name << SDC_GROUP_DATA << "/" << id;
599 
600  DCGroup::remove(handles.get(0), group_id_name.str());
601 
602  // update maxID to new highest group
603  maxID = 0;
604  size_t num_groups = 0;
605  getEntryIDs(NULL, &num_groups);
606 
607  int32_t *groups = new int32_t[num_groups];
608  getEntryIDs(groups, NULL);
609 
610  for (size_t i = 0; i < num_groups; ++i)
611  if (groups[i] > maxID)
612  maxID = groups[i];
613 
614  delete[] groups;
615  groups = NULL;
616  }
617 
618  void SerialDataCollector::remove(int32_t id, const char* name)
619  throw (DCException)
620  {
621  log_msg(1, "removing dataset %s from group %d", name, id);
622 
623  if (fileStatus == FST_CLOSED || fileStatus == FST_READING || fileStatus == FST_MERGING)
624  throw DCException(getExceptionString("remove", "this access is not permitted"));
625 
626  if (name == NULL)
627  throw DCException(getExceptionString("remove", "parameter name is NULL"));
628 
629  std::string group_path, dset_name;
630  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
631 
632  DCGroup group;
633  group.open(handles.get(0), group_path);
634 
635  if (H5Ldelete(group.getHandle(), dset_name.c_str(), H5P_LINK_ACCESS_DEFAULT) < 0)
636  {
637  throw DCException(getExceptionString("remove",
638  "failed to remove dataset", dset_name.c_str()));
639  }
640  }
641 
643  const char *srcName,
644  int32_t dstID,
645  const char *dstName)
646  throw (DCException)
647  {
648  if (srcName == NULL || dstName == NULL)
649  throw DCException(getExceptionString("createReference", "a parameter was NULL"));
650 
651  if (fileStatus == FST_CLOSED || fileStatus == FST_READING || fileStatus == FST_MERGING)
652  throw DCException(getExceptionString("createReference", "this access is not permitted"));
653 
654  if (srcID == dstID && srcName == dstName)
655  throw DCException(getExceptionString("createReference",
656  "a reference must not be identical to the referenced data", srcName));
657 
658  // open source group
659  std::string src_group_path, src_dset_name;
660  DCDataSet::getFullDataPath(srcName, SDC_GROUP_DATA, srcID, src_group_path, src_dset_name);
661 
662  DCGroup src_group, dst_group;
663  src_group.open(handles.get(0), src_group_path);
664 
665  // open destination group
666  std::string dst_group_path, dst_dset_name;
667  DCDataSet::getFullDataPath(dstName, SDC_GROUP_DATA, dstID, dst_group_path, dst_dset_name);
668 
669  // if destination group does not exist, it is created
670  dst_group.openCreate(handles.get(0), dst_group_path);
671 
672  // open source dataset
673  try
674  {
675  DCDataSet src_dataset(src_dset_name.c_str());
676  src_dataset.open(src_group.getHandle());
677 
678  DCDataSet dst_dataset(dst_dset_name.c_str());
679  // create the actual reference as a new dataset
680  dst_dataset.createReference(dst_group.getHandle(),
681  src_group.getHandle(), src_dataset);
682 
683  dst_dataset.close();
684  src_dataset.close();
685 
686  } catch (const DCException& e)
687  {
688  throw e;
689  }
690  }
691 
693  const char *srcName,
694  int32_t dstID,
695  const char *dstName,
696  Dimensions count,
697  Dimensions offset,
698  Dimensions stride)
699  throw (DCException)
700  {
701  if (srcName == NULL || dstName == NULL)
702  throw DCException(getExceptionString("createReference", "a parameter was NULL"));
703 
704  if (fileStatus == FST_CLOSED || fileStatus == FST_READING || fileStatus == FST_MERGING)
705  throw DCException(getExceptionString("createReference", "this access is not permitted"));
706 
707  if (srcID == dstID && srcName == dstName)
708  throw DCException(getExceptionString("createReference",
709  "a reference must not be identical to the referenced data", srcName));
710 
711  // open source group
712  std::string src_group_path, src_dset_name;
713  DCDataSet::getFullDataPath(srcName, SDC_GROUP_DATA, srcID, src_group_path, src_dset_name);
714 
715  DCGroup src_group, dst_group;
716  src_group.open(handles.get(0), src_group_path);
717 
718  // open destination group
719  std::string dst_group_path, dst_dset_name;
720  DCDataSet::getFullDataPath(dstName, SDC_GROUP_DATA, dstID, dst_group_path, dst_dset_name);
721 
722  // if destination group does not exist, it is created
723  dst_group.openCreate(handles.get(0), dst_group_path);
724 
725  // open source dataset
726  try
727  {
728  DCDataSet src_dataset(src_dset_name.c_str());
729  src_dataset.open(src_group.getHandle());
730 
731  DCDataSet dst_dataset(dst_dset_name.c_str());
732  // create the actual reference as a new dataset
733  dst_dataset.createReference(dst_group.getHandle(), src_group.getHandle(),
734  src_dataset, count, offset, stride);
735 
736  dst_dataset.close();
737  src_dataset.close();
738 
739  } catch (const DCException& e)
740  {
741  throw e;
742  }
743  }
744 
746  {
747  return maxID;
748  }
749 
751  {
752  mpiSize.set(this->mpiTopology);
753  }
754 
755  void SerialDataCollector::getEntryIDs(int32_t* ids, size_t* count)
756  throw (DCException)
757  {
758  DCGroup group;
759  group.open(handles.get(0), SDC_GROUP_DATA);
760 
761  hsize_t data_entries = 0;
762  if (H5Gget_num_objs(group.getHandle(), &data_entries) < 0)
763  {
764  throw DCException(getExceptionString("getEntryIDs",
765  "Failed to get entries in data group", SDC_GROUP_DATA));
766  }
767 
768  if (count != NULL)
769  *count = data_entries;
770 
771  if (ids != NULL)
772  {
773  for (size_t i = 0; i < data_entries; i++)
774  {
775  char *group_id_name = NULL;
776  ssize_t group_id_name_len = H5Gget_objname_by_idx(group.getHandle(), i, NULL, 0);
777  if (group_id_name_len < 0)
778  {
779  throw DCException(getExceptionString("getEntryIDs",
780  "Failed to get object name in group", group_id_name));
781  }
782 
783  group_id_name = new char[group_id_name_len + 1];
784  H5Gget_objname_by_idx(group.getHandle(), i, group_id_name, group_id_name_len + 1);
785  ids[i] = atoi(group_id_name);
786  delete[] group_id_name;
787  }
788  }
789  }
790 
791  void SerialDataCollector::getEntriesForID(int32_t id, DCEntry *entries, size_t *count)
792  throw (DCException)
793  {
794  std::stringstream group_id_name;
795  group_id_name << SDC_GROUP_DATA << "/" << id;
796 
797  // open data group for id
798  DCGroup group;
799  group.open(handles.get(0), group_id_name.str());
800 
801  DCGroup::VisitObjCBType param;
802  param.count = 0;
803  param.entries = entries;
804 
805  DCGroup::getEntriesInternal(group.getHandle(), group_id_name.str(), "", &param);
806 
807  if (count)
808  *count = param.count;
809  }
810 
811  /*******************************************************************************
812  * PROTECTED FUNCTIONS
813  *******************************************************************************/
814 
815  void SerialDataCollector::openCreate(const char *filename,
816  FileCreationAttr& attr)
817  throw (DCException)
818  {
819  this->fileStatus = FST_CREATING;
820 
821  std::string full_filename = getFullFilename(attr.mpiPosition, filename,
822  attr.mpiSize.getScalarSize() == 1);
823 
824  this->enableCompression = attr.enableCompression;
825 
826  log_msg(1, "compression = %d", attr.enableCompression);
827 
828  // open file
829  handles.open(full_filename, fileAccProperties, H5F_ACC_TRUNC);
830 
831  this->maxID = 0;
832  this->mpiTopology.set(attr.mpiSize);
833 
834  // write datatypes and header information to the file
835  SDCHelper::writeHeader(handles.get(0), attr.mpiPosition, &(this->maxID),
836  &(this->enableCompression), &(this->mpiTopology), false);
837 
838  // the custom group hold user-specified attributes
839  DCGroup group;
840  group.create(handles.get(0), SDC_GROUP_CUSTOM);
841  group.close();
842 
843  // the data group holds the actual simulation data
844  group.create(handles.get(0), SDC_GROUP_DATA);
845  group.close();
846  }
847 
848  void SerialDataCollector::openWrite(const char* filename, FileCreationAttr& attr)
849  throw (DCException)
850  {
851  fileStatus = FST_WRITING;
852 
853  std::string full_filename = getFullFilename(attr.mpiPosition, filename,
854  attr.mpiSize.getScalarSize() == 1);
855 
856  this->enableCompression = attr.enableCompression;
857 
858  if (fileExists(full_filename))
859  {
860  // read reference data from target file
861  SDCHelper::getReferenceData(full_filename.c_str(), &(this->maxID), &(this->mpiTopology));
862 
863  handles.open(full_filename, fileAccProperties, H5F_ACC_RDWR);
864  } else
865  {
866  openCreate(filename, attr);
867  }
868  }
869 
870  void SerialDataCollector::openMerge(const char* filename)
871  throw (DCException)
872  {
873  this->fileStatus = FST_MERGING;
874 
875  // open reference file to get mpi information
876  std::string full_filename = getFullFilename(Dimensions(0, 0, 0), filename, true);
877 
878  if (!fileExists(full_filename))
879  {
880  this->fileStatus = FST_CLOSED;
881  throw DCException(getExceptionString("openMerge", "File not found.", full_filename.c_str()));
882  }
883 
884  // read reference data from target file
885  SDCHelper::getReferenceData(full_filename.c_str(), &(this->maxID), &(this->mpiTopology));
886 
887  // no compression for in-memory datasets
888  this->enableCompression = false;
889 
890  handles.setFileNameScheme(HandleMgr::FNS_MPI);
891  handles.open(mpiTopology, filename, fileAccProperties, H5F_ACC_RDONLY);
892  }
893 
894  void SerialDataCollector::openRead(const char* filename, FileCreationAttr& attr)
895  throw (DCException)
896  {
897  this->fileStatus = FST_READING;
898 
899  std::string full_filename = getFullFilename(attr.mpiPosition, filename, true);
900 
901  if (!fileExists(full_filename))
902  {
903  this->fileStatus = FST_CLOSED;
904  throw DCException(getExceptionString("openRead", "File not found", full_filename.c_str()));
905  }
906 
907  // read reference data from target file
908  SDCHelper::getReferenceData(full_filename.c_str(), &(this->maxID), &(this->mpiTopology));
909 
910  handles.open(full_filename, fileAccProperties, H5F_ACC_RDONLY);
911  }
912 
913  void SerialDataCollector::writeDataSet(hid_t group,
914  const CollectionType& datatype,
915  uint32_t ndims,
916  const Selection select,
917  const char* name,
918  const void* data) throw (DCException)
919  {
920  log_msg(2, "writeDataSet");
921 
922  DCDataSet dataset(name);
923  // always create dataset but write data only if all dimensions > 0 and data available
924  // not extensible
925  dataset.create(datatype, group, select.count, ndims,
926  this->enableCompression, false);
927  if (data && (select.count.getScalarSize() > 0))
928  dataset.write(select, Dimensions(0, 0, 0), data);
929  dataset.close();
930  }
931 
932  void SerialDataCollector::appendDataSet(hid_t group, const CollectionType& datatype,
933  size_t count, size_t offset, size_t stride, const char* name, const void* data)
934  throw (DCException)
935  {
936  log_msg(2, "appendDataSet");
937 
938  DCDataSet dataset(name);
939 
940  if (!dataset.open(group))
941  {
942  Dimensions data_size(count, 1, 1);
943  // create dataset extensible
944  dataset.create(datatype, group, data_size, 1, this->enableCompression, true);
945 
946  if (count > 0)
947  dataset.write(Selection(data_size,
948  Dimensions(offset + count * stride, 1, 1),
949  Dimensions(offset, 0, 0),
950  Dimensions(stride, 1, 1)),
951  Dimensions(0, 0, 0),
952  data);
953  } else
954  if (count > 0)
955  dataset.append(count, offset, stride, data);
956 
957  dataset.close();
958  }
959 
960  size_t SerialDataCollector::getNDims(H5Handle h5File,
961  int32_t id,
962  const char* name)
963  {
964  if (h5File < 0 || name == NULL)
965  throw DCException(getExceptionString("getNDims", "invalid parameters"));
966 
967  std::string group_path, dset_name;
968  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
969 
970  DCGroup group;
971  group.open(h5File, group_path);
972 
973  size_t ndims = 0;
974 
975  try
976  {
977  DCDataSet dataset(dset_name.c_str());
978  dataset.open(group.getHandle());
979 
980  ndims = dataset.getNDims();
981 
982  dataset.close();
983  } catch (const DCException& e)
984  {
985  throw e;
986  }
987 
988  return ndims;
989  }
990 
991  void SerialDataCollector::readCompleteDataSet(H5Handle h5File,
992  int32_t id,
993  const char* name,
994  const Dimensions dstBuffer,
995  const Dimensions dstOffset,
996  const Dimensions srcOffset,
997  Dimensions &sizeRead,
998  uint32_t& srcDims,
999  void* dst)
1000  throw (DCException)
1001  {
1002  log_msg(2, "readCompleteDataSet");
1003 
1004  std::string group_path, dset_name;
1005  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
1006 
1007  DCGroup group;
1008  group.open(h5File, group_path);
1009 
1010  DCDataSet dataset(dset_name.c_str());
1011  dataset.open(group.getHandle());
1012  Dimensions src_size(dataset.getSize() - srcOffset);
1013  dataset.read(dstBuffer, dstOffset, src_size, srcOffset, sizeRead, srcDims, dst);
1014  dataset.close();
1015  }
1016 
1017  void SerialDataCollector::readDataSet(H5Handle h5File,
1018  int32_t id,
1019  const char* name,
1020  const Dimensions dstBuffer,
1021  const Dimensions dstOffset,
1022  const Dimensions srcSize,
1023  const Dimensions srcOffset,
1024  Dimensions &sizeRead,
1025  uint32_t& srcDims,
1026  void* dst)
1027  throw (DCException)
1028  {
1029  log_msg(2, "readDataSet");
1030 
1031  std::string group_path, dset_name;
1032  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
1033 
1034  DCGroup group;
1035  group.open(h5File, group_path);
1036 
1037  DCDataSet dataset(dset_name.c_str());
1038  dataset.open(group.getHandle());
1039  dataset.read(dstBuffer, dstOffset, srcSize, srcOffset, sizeRead, srcDims, dst);
1040  dataset.close();
1041  }
1042 
1043  CollectionType* SerialDataCollector::readDataSetMeta(H5Handle h5File,
1044  int32_t id,
1045  const char* name,
1046  const Dimensions dstBuffer,
1047  const Dimensions dstOffset,
1048  const Dimensions srcOffset,
1049  Dimensions &sizeRead,
1050  uint32_t& srcDims)
1051  throw (DCException)
1052  {
1053  log_msg(2, "readDataSetMeta");
1054 
1055  std::string group_path, dset_name;
1056  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
1057 
1058  DCGroup group;
1059  group.open(h5File, group_path);
1060 
1061  DCDataSet dataset(dset_name.c_str());
1062  dataset.open(group.getHandle());
1063 
1064  size_t entrySize;
1065  getEntriesForID(id, NULL, &entrySize);
1066  std::vector<DataCollector::DCEntry> entries(entrySize);
1067 
1068  getEntriesForID(id, &(*entries.begin()), NULL);
1069 
1070  // find entry by name
1071  int32_t entry_id = -1;
1072  for(size_t i = 0; i < entrySize; ++i)
1073  if(std::string(name) == entries[i].name)
1074  {
1075  entry_id = int32_t(i);
1076  break;
1077  }
1078 
1079  if(entry_id < 0)
1080  throw DCException(getExceptionString("readDataSetMeta", "Entry not found by name"));
1081 
1082  Dimensions src_size(dataset.getSize() - srcOffset);
1083  dataset.read(dstBuffer, dstOffset, src_size, srcOffset, sizeRead, srcDims, NULL);
1084  dataset.close();
1085 
1086  log_msg(3, "Entry '%s' (%d) is of type: %s",
1087  entries[entry_id].name.c_str(),
1088  entry_id,
1089  entries[entry_id].colType->toString().c_str());
1090 
1091  return entries[entry_id].colType;
1092  }
1093 
1094  void SerialDataCollector::readSizeInternal(H5Handle h5File,
1095  int32_t id,
1096  const char* name,
1097  Dimensions &sizeRead)
1098  throw (DCException)
1099  {
1100  log_msg(2, "readSizeInternal");
1101 
1102  std::string group_path, dset_name;
1103  DCDataSet::getFullDataPath(name, SDC_GROUP_DATA, id, group_path, dset_name);
1104 
1105  DCGroup group;
1106  group.open(h5File, group_path);
1107 
1108  try
1109  {
1110  DCDataSet dataset(dset_name.c_str());
1111  dataset.open(group.getHandle());
1112  sizeRead.set(dataset.getSize());
1113  dataset.close();
1114  } catch (const DCException& e)
1115  {
1116  throw e;
1117  }
1118  }
1119 
1120  hid_t SerialDataCollector::openDatasetHandle(int32_t id,
1121  const char *dsetName,
1122  Dimensions *mpiPosition)
1123  throw (DCException)
1124  {
1125  std::string group_path, dset_name;
1126  DCDataSet::getFullDataPath(dsetName, SDC_GROUP_DATA, id, group_path, dset_name);
1127 
1128  Dimensions mpi_pos(0, 0, 0);
1129  if ((fileStatus == FST_MERGING) && (mpiPosition != NULL))
1130  {
1131  mpi_pos.set(*mpiPosition);
1132  }
1133 
1134  DCGroup group;
1135  group.open(handles.get(mpi_pos), group_path);
1136 
1137  hid_t dataset_handle = -1;
1138  if (H5Lexists(group.getHandle(), dset_name.c_str(), H5P_LINK_ACCESS_DEFAULT))
1139  {
1140  dataset_handle = H5Dopen(group.getHandle(), dset_name.c_str(), H5P_DEFAULT);
1141  } else
1142  {
1143  throw DCException(getExceptionString("openDatasetInternal",
1144  "dataset not found", dset_name.c_str()));
1145  }
1146 
1147  return dataset_handle;
1148  }
1149 
1150  void SerialDataCollector::closeDatasetHandle(hid_t handle)
1151  throw (DCException)
1152  {
1153  H5Dclose(handle);
1154  }
1155 
1156 } // namespace DataCollector
CollectionType * readMeta(int32_t id, const char *name, const Dimensions dstBuffer, const Dimensions dstOffset, Dimensions &sizeRead)
void getEntriesForID(int32_t id, DCEntry *entries, size_t *count)
void append(int32_t id, const CollectionType &type, size_t count, const char *name, const void *data)
void getEntryIDs(int32_t *ids, size_t *count)
void open(const char *filename, FileCreationAttr &attr)
void writeGlobalAttribute(const CollectionType &type, const char *name, const void *data)
void set(hsize_t x, hsize_t y, hsize_t z)
Definition: Dimensions.hpp:230
void getMPISize(Dimensions &mpiSize)
void readAttribute(int32_t id, const char *dataName, const char *attrName, void *data, Dimensions *mpiPosition=NULL)
size_t getScalarSize() const
Definition: Dimensions.hpp:219
void write(int32_t id, const CollectionType &type, uint32_t ndims, const Selection select, const char *name, const void *data)
void readGlobalAttribute(const char *name, void *data, Dimensions *mpiPosition=NULL)
void writeAttribute(int32_t id, const CollectionType &type, const char *dataName, const char *attrName, const void *data)
EXTERN void parseEnvVars(void)
Definition: logging.cpp:41
EXTERN void log_msg(int level, const char *fmt,...)
Definition: logging.cpp:56
SerialDataCollector(uint32_t maxFileHandles)
void read(int32_t id, const char *name, Dimensions &sizeRead, void *data)
void createReference(int32_t srcID, const char *srcName, int32_t dstID, const char *dstName)
AttributeInfo readGlobalAttributeInfo(int32_t id, const char *name, Dimensions *mpiPosition=NULL)
AttributeInfo readAttributeInfo(int32_t id, const char *dataName, const char *attrName, Dimensions *mpiPosition=NULL)