Streaming Data with HDF5

Hi HDF% Users and developers,

I have the following requirements:

  1. I have a few record types - each type is written in an own dataset
  2. The datasets have a compound type, the length is unlimited, the rank/dimension is 1
  3. The compound type is not known at Compile time. A XML-File is parsed and the compound type is derived.
    (This is solved by an own example program. The data of each record is aligned in the buffer like the compiler would do it)
  4. The length of the dataset/array is not known, the dataset must be extended dynamically - streaming,

H5cpp.org has a solution with h5::append, but only with a fixed compound type/schema - so we know, a solution exists.
Unfortunately, the template based code is hard to understand.

I only want to append records at the end of the dataset like std::vector::push_back()
and the HDF5 mechanisms should do the rest…

Any proposals are welcome
Thanks
Rudolf

please provide the MWE upto the point where you call h5::append<T>(...) with a pseudo call to all caps H5::APPEND(ds, data) where the ds is of hid_t

I will also need the example XML
steve
author of H5CPP/h5append

Hi @rudolf_vinzenz_johan,

Another way to implement a streaming data solution in C++ with HDF5 is through HDFql. In example #3, you can see a streaming data solution using HDFql. That said, and as suggested by @steven, if you could share an example of an XML file that would be great as we could better assist you.

Thanks!

Hello,
I have written a program how I will think the solution:
(As a not native english speaker it is simplier to speek C++ :slight_smile:)
—><----------- Main example program —><----
// ExtensibleCompoundEnums.cpp
//
#include
#include
#include “H5Cpp.h”
#include “OutputColumns.h”

void append(H5::DataSet &dataset, H5::DataType& thetype, const void *data);

int main()
{
/* Output to explore the size of the types */
std::cout << “Compound with Enums” << std::endl;
std::cout << “sizeof(int)=” << sizeof(int) << " Native=" << H5::PredType::NATIVE_INT.getSize() << std::endl;
std::cout << “sizeof(double)=” << sizeof(double) << " Native=" << H5::PredType::NATIVE_DOUBLE.getSize() << std::endl;

try
{
	size_t length = 10;
	/*
	 * Turn off the auto-printing when failure occurs so that we can
	 * handle the errors appropriately
	 */
	 //H5::Exception::dontPrint();

	hsize_t dim[1] = { 0 }; // At the begin the size is 0, it will be expanded
	hsize_t maxdim[1] = { H5S_UNLIMITED };
	H5::DataSpace space(1, dim, maxdim);
	H5::H5File file("CE.h5", H5F_ACC_TRUNC);

	/* Here are the enums ad the columns defined.
	 *  In the real programm, this information is read from an external schema, may be in XML
	 */
	//? How can the EnumType named ?
	H5::EnumType et_onoff(H5::PredType::NATIVE_INT);
	int eval;
	eval = 0;		et_onoff.insert("Off", &eval);
	eval = 1;       et_onoff.insert("On", &eval);
	std::clog << "EnumType OnOff Nmembers=" << et_onoff.getNmembers() << std::endl;

	H5::EnumType et_color(H5::PredType::NATIVE_INT);
	eval = 0; et_color.insert("white", &eval);
	eval = 1; et_color.insert("red", &eval);
	eval = 2; et_color.insert("blue", &eval);
	eval = 3; et_color.insert("yellow", &eval);
	std::clog << "EnumType Color Nmembers=" << et_color.getNmembers() << std::endl;

	OutputColumns *output = new OutputColumns();
	output->addColumn("a", H5::PredType::NATIVE_INT);
	output->addColumn("b", et_onoff);
	output->addColumn("c", H5::PredType::NATIVE_DOUBLE);
	output->addColumn("d", et_color);

	output->dump();
	H5::CompType thetype(output->getSize());
	output->initCompound(thetype);
	output->allocBuffer();

	/* Modify dataset creation properties, enable chunking */
	H5::DSetCreatPropList cparms;
	hsize_t chunk_dims[1] = { 1 };
	cparms.setChunk(1, chunk_dims);
	cparms.setFillValue(thetype, output->getBuffer());
	H5::DataSet dataset = file.createDataSet("ArrayOfMyType", thetype, space, cparms);

	/* here the data is filled. 
	 * In the real system, the data may come from the network in binary form and it is interpreted according the external Schema in XML
	 * Here we care, that the schema above is fullfilled
	 * In the real system megabyte of data is streamed
	 * The enums are received als integer values
	 */
	for (int i = 0; i < length; i++)
	{
		output->beginRow();
		output->writeint(i);
		output->writeint(i % 2); // enum onoff
		output->writedouble(i / 2.0);
		output->writeint(i % 4); // enum color
		output->endRow();

		append(dataset, thetype, output->getBuffer());
	}
	delete output;
}
// catch failure caused by the H5File operations
catch (H5::FileIException error) {
	std::clog << error.getCDetailMsg() << std::endl;
	error.printErrorStack();
	return -1;
}

// catch failure caused by the DataSet operations
catch (H5::DataSetIException error) {
	std::clog << error.getCDetailMsg() << std::endl;
	error.printErrorStack();
	return -1;
}

// catch failure caused by the DataSpace operations
catch (H5::DataSpaceIException error) {
	std::clog << error.getCDetailMsg() << std::endl;
	error.printErrorStack();
	return -1;
}

// catch failure caused by the DataSpace operations
catch (H5::DataTypeIException error) {
	std::clog << error.getCDetailMsg() << std::endl;
	error.printErrorStack();
	return -1;
}

return 0;

}

void append(H5::DataSet &dataset, H5::DataType& thetype, const void *data)
{
hsize_t size[1];
hsize_t offset[1];
hsize_t dims[1];

H5::DataSpace fspaceold = dataset.getSpace();
int r = fspaceold.getSimpleExtentDims(dims);

hsize_t dims2[1] = { 1 }; // dimensions to add
size[0] = dims[0] + dims2[0];
dataset.extend(size);

H5::DataSpace fspacenew = dataset.getSpace(); // Dataset after resize
offset[0] = dims[0];
fspacenew.selectHyperslab(H5S_SELECT_SET, dims2, offset);
H5::DataSpace mspace2(1, dims2);
// Write the Data to the hyperslab
dataset.write(data, thetype, mspace2, fspacenew);

}
-------------><---------- OutputColumns.h ------------><-----------------------
#pragma once
#include
#include
#include <sys/types.h>
#include “H5Cpp.h”

class OutputColumns
{
public:
OutputColumns():currentoffset_(0),buffer_(0),
currow_(0) {}
~OutputColumns();
void addColumn(const std::string& name, const H5::DataType& dt);
size_t getSize() { return currentoffset_; }
void initCompound(H5::CompType& comptype);
void dump();
void allocBuffer();
void *getBuffer();

void beginRow();
void endRow();
void writeint(int i);
void writedouble(double d);

private:
struct Entry
{
std::string name;
H5::DataType t;
off_t offset;

	Entry();
	Entry(const std::string& pname, const H5::DataType& dt, off_t poffset);
	Entry(const Entry& other);
};
std::vector<Entry> entries_;
off_t currentoffset_;
char *buffer_;
char *currow_;
std::vector<Entry>::iterator curfield_;

};
----------------><--------------- OutputColumns.cpp ----------------><-------------------------
#include “OutputColumns.h”
#include

/**

  • alignment : offset will be aligned
  • a = 2,4,8
  • off: offset
    */
    void align(size_t a, off_t &off)
    {
    off = ((off + (a - 1)) / a)*a;
    }

OutputColumns::~OutputColumns()
{
if (buffer_ != nullptr)
{
//free(buffer_); heap broken ???
buffer_ = nullptr;
}
}

void OutputColumns::addColumn(const std::string & name, const H5::DataType & dt)
{
align(dt.getSize(), currentoffset_);
Entry e(name, dt, currentoffset_);
entries_.push_back(e);
currentoffset_ += dt.getSize();
}

void OutputColumns::initCompound(H5::CompType & comptype)
{
for (std::vector::iterator i = entries_.begin(); i < entries_.end(); ++i)
{
comptype.insertMember(i->name, i->offset, i->t);
}
}

void OutputColumns::dump()
{
for (std::vector::iterator i = entries_.begin(); i < entries_.end(); ++i)
{
std::cout << i->name << “\toffset=” << i->offset /<< “\ttype=” << i->t.getObjName()/ << std::endl;
}
}

void OutputColumns::allocBuffer()
{
buffer_ = (char *)malloc(getSize()+16);
memset(buffer_, ‘\0’, getSize());
}

void *OutputColumns::getBuffer()
{
return buffer_;
}

void OutputColumns::beginRow()
{
if (currow_ == 0)
{
currow_ = buffer_;
}
else
{
currow_ += getSize();
}
curfield_ = entries_.begin();
}

void OutputColumns::endRow()
{

}

void OutputColumns::writeint(int i)
{
//missing: check if current field is an int
char *curpos = currow_ + curfield_->offset;
*(int *)(curpos) = i;
curfield_++;
}

void OutputColumns::writedouble(double d)
{
//missing: check if current field is a double
char *curpos = currow_ + curfield_->offset;
*(double *)(curpos) = d;
curfield_++;
}

OutputColumns::Entry::Entry()
{
name = “”;
offset = 0;
}

OutputColumns::Entry::Entry(const std::string& pname, const H5::DataType& dt, off_t poffset):name(pname),offset(poffset)
{
t.copy(dt);
}

OutputColumns::Entry::Entry(const Entry & other)
{
name = other.name;
t.copy(other.t);
offset = other.offset;
}
--------------><------------------><--------------------------------
Remarks:

  1. The append-Function use hyperslabs, I derived it from example_ds.cpp, stripped it down to one dimension and changed it to a compound type. Then I adapted it to this with dynamic/generic columns example.
    The hyperslab-way with resizing the dataset works, but I’m afraid that it is to slow for streaming a lot of (sensor-)data.
    As I looked to https://hdf5ql.com, I see that they also worked with hyperslabs.
    As I investigated hdf5cpp.org, iH5::append, I saw a function used H5write_chunk which seems more performant.

  2. The above program compiles and links, but it crashes when destructing the DataSet.
    (OS Windows, Visual Studio 2017).
    I will try it under Linux and investigate with valgrind to find the error.

  3. A predecessor program, with explores the dynamic layout with a fixed size of records
    worked correctly and h5dump dumps the data and resolves the enums.

4)I need only simple Types like NATIVE_INTEGER and NATIVE_DOUBLE and enums,
the data is not structured.
5) The details of the Schema in XML is not important here, imported is the fact fat the layout of a HDF5-Compound can be generic.

  1. The above program will be refactored: all stuff with the DataSet belongs in OutputColumns. OutputColumns will be become abstract, many outputformats are thinkable (text-CSV, …)

Questions:

  1. How to make performant streaming in HDF5 ?
  2. Why the DataSet crashes in destructor in close() ? I will ask valgrind and report the answer soon.

Thanks for your answer
Rudolf

Thanks for posting this @rudolf_vinzenz_johan!

Just a small remark on one of the remarks on your post… :slight_smile:

HDFql can indeed write (and read for that matter) data chunks directly into a dataset (using HDF5 C function H5Dwrite_chunk for that purpose). This bypasses several internal processing steps of the HDF5 library itself (e.g. data conversion, filter pipeline), which can lead to a much faster writing. Example:

INSERT DIRECTLY INTO my_dataset(5) VALUES(10)

This code snippet inserts (i.e. writes) value 10 directly into chunk #5 of dataset my_dataset.

Here is a link to the minimum working example, the purpose of MWE is to help the other to focus on the problem, as opposed to ask him to write the driver routines.
Providing an MWE not only increases the chance of being helped, but also aids helps you to understand the problem itself.

This is how I would go about it:

#include <filesystem>
#include <h5cpp/all>

struct compound_t{};        // dummy type, used only to trick the preprocessor 
thread_local hid_t my_type; // the evil global variable

namespace h5::impl::detail { // somewhat internal, this is clumsy i know... 
    template<class T> using any_t = hid_t<T,H5Tclose,true,true,hdf5::any>; // hook up property lists
	template <> struct hid_t<compound_t, H5Tclose, true,true, hdf5::type> : public any_t<compound_t> {
		hid_t() : parent(my_type) {} // we have to hook up the global variable
	};
}

int main() {
    h5::fd_t fd = h5::create("e  xample.h5", H5F_ACC_TRUNC);

    // this part can be rinse-and-repeat 
    my_type = H5Tcreate(H5T_COMPOUND, 24); // gets closed with RAII!!!
    H5Tinsert(my_type, "01",  0, H5T_NATIVE_UINT32);
    H5Tinsert(my_type, "02",  4, H5T_NATIVE_UINT32);
    H5Tinsert(my_type, "03",  8, H5T_NATIVE_INT64);
    H5Tinsert(my_type, "0N", 16, H5T_NATIVE_DOUBLE);
    H5Tpack(my_type);

    h5::pt_t pt = h5::create<compound_t>(fd, "stream of dynamic types", h5::max_dims{H5S_UNLIMITED},
                                  h5::chunk{4} | h5::gzip{9} ); 
   // end rinse-and-repeat, just don't forget to copy the `pt_t descriptor, you will need it for IO`
    char payload[24]={0}; // this is the packed 18 bytes, you are to persist, probably you have to malloc this
    uint32_t* a = (uint32_t*)payload; // coloring the first field later at this position
    for(int i=0; i<10; i++){
        *a = i; // actual coloring, so we can keep track of correctness 
        h5::append(pt, payload); // same properties as before, nothing has changed
    }
}   

And here is the brief output:

HDF5 "example.h5" {
GROUP "/" {
   DATASET "stream of dynamic types" {
      DATATYPE  H5T_COMPOUND {
         H5T_STD_U32LE "01";
         H5T_STD_U32LE "02";
         H5T_STD_I64LE "03";
         H5T_IEEE_F64LE "0N";
      }
      DATASPACE  SIMPLE { ( 10 ) / ( H5S_UNLIMITED ) }
      DATA {...}

NOTE: there is a bug in h5::append<pointer_type>() call, in case anyone is interested to reproduce the result, I will upload both the project and the fix onto my github page.
best: steve

The cause of crashing is in buffer overwrite. OutputColumns::beginRow() has to be changed to:
void OutputColumns::beginRow()
{
currow_ = buffer_;
curfield_ = entries_.begin();
}
The original code was a relict of a try with variable layout and a fix length of the dataset.

With this change the programm works.

1 Like