Can i Multithread reading over different datasets or different chunks of a dataset?

Can i Multithread the reading of different datasets or different chunks of a dataset?

It’s my first time using hdf5, i wrote a working sequential reader, but i am trying to get some better performance by at least letting the de-compression utilize the many threads of my computer.

So, is it possible to actually multithread reading? I don’t want the --enable-threading flag which essentially makes it sequential.

Yes, you can. While not recommended for beginners, there are HDF5 library functions to get the data offset in the file (contiguous layout) or the chunk locations (chunked layout). Just retrieve those and dispatch threads doing preads of the appropriate sizes and lengths. Beware of datatype conversions, filters, etc., but you have total control.

G.

I tried this:

//----------------------------------------------------------------------------
struct H5DReadArray
{
  template <typename ArrayType>
  void operator()(ArrayType* array, hid_t arrayId, hid_t memType, bool& success)
  {
    success = H5Dread(arrayId, memType, H5S_ALL, H5S_ALL, H5P_DEFAULT, array->GetPointer(0)) >= 0;
  }
};

//----------------------------------------------------------------------------
std::vector<vtkSmartPointer<vtkDataArray>> ReadArrays(vtkSR3Reader* reader, hid_t fileId,
  const std::vector<std::string>& arraysToRead,
  const std::map<std::string, std::string>& acronymToName)
{
  std::vector<vtkSmartPointer<vtkDataArray>> arrays(arraysToRead.size(), nullptr);

  auto readArrays = [&](vtkIdType begin, vtkIdType end)
  {
    for (vtkIdType i = begin; i < end; ++i)
    {
      const auto& path = arraysToRead[i];
      // Open the dataset
      vtkHDF::ScopedH5DHandle arrayId = H5Dopen(fileId, path.c_str());
      if (arrayId < 0)
      {
        vtkErrorWithObjectMacro(reader, "Could not open array at path: " << path);
        return;
      }
      // Get its dataspace and ensure 1D
      vtkHDF::ScopedH5SHandle arraySpace = H5Dget_space(arrayId);
      if (H5Sget_simple_extent_ndims(arraySpace) != 1)
      {
        vtkErrorWithObjectMacro(reader, "Array is not 1D: " << path);
        return;
      }

      // Get its length
      hsize_t length = 0;
      H5Sget_simple_extent_dims(arraySpace, &length, nullptr);
      if (length == 0)
      {
        vtkErrorWithObjectMacro(reader, "Dataset has zero length: " << path);
        return;
      }

      // What is its file datatype?
      vtkHDF::ScopedH5THandle arrayDataType = H5Dget_type(arrayId);
      vtkHDF::ScopedH5THandle memType = H5Tget_native_type(arrayDataType, H5T_DIR_DEFAULT);
      H5T_class_t cls = H5Tget_class(arrayDataType);

      // Choose a VTK array and HDF5 memory type
      vtkSmartPointer<vtkDataArray> arr;
      if (cls == H5T_INTEGER || cls == H5T_FLOAT)
      {
        if (H5Tequal(memType, H5T_NATIVE_CHAR))
        {
          arr = vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_CHAR));
        }
        else if (H5Tequal(memType, H5T_NATIVE_SCHAR))
        {
          arr = vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_SIGNED_CHAR));
        }
        else if (H5Tequal(memType, H5T_NATIVE_UCHAR))
        {
          arr =
            vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_UNSIGNED_CHAR));
        }
        else if (H5Tequal(memType, H5T_NATIVE_SHORT))
        {
          arr = vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_SHORT));
        }
        else if (H5Tequal(memType, H5T_NATIVE_USHORT))
        {
          arr =
            vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_UNSIGNED_SHORT));
        }
        else if (H5Tequal(memType, H5T_NATIVE_INT))
        {
          arr = vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_INT));
        }
        else if (H5Tequal(memType, H5T_NATIVE_UINT))
        {
          arr =
            vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_UNSIGNED_INT));
        }
        else if (H5Tequal(memType, H5T_NATIVE_LONG))
        {
          arr = vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_LONG));
        }
        else if (H5Tequal(memType, H5T_NATIVE_ULONG))
        {
          arr =
            vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_UNSIGNED_LONG));
        }
        else if (H5Tequal(memType, H5T_NATIVE_LLONG))
        {
          arr = vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_LONG_LONG));
        }
        else if (H5Tequal(memType, H5T_NATIVE_ULLONG))
        {
          arr = vtkSmartPointer<vtkDataArray>::Take(
            vtkDataArray::CreateDataArray(VTK_UNSIGNED_LONG_LONG));
        }
        else if (H5Tequal(memType, H5T_NATIVE_FLOAT))
        {
          arr = vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_FLOAT));
        }
        else if (H5Tequal(memType, H5T_NATIVE_DOUBLE))
        {
          arr = vtkSmartPointer<vtkDataArray>::Take(vtkDataArray::CreateDataArray(VTK_DOUBLE));
        }
        else
        {
          vtkErrorWithObjectMacro(reader, "Unsupported data type for dataset " << path);
          return;
        }
      }
      else
      {
        vtkErrorWithObjectMacro(
          reader, "Unsupported class type " << cls << " for dataset " << path);
        return;
      }

      // Name the array: use acronymToName if we have a mapping for the leaf name
      const std::string acronym = vtksys::SystemTools::GetFilenameName(path);
      const auto& name = acronymToName.at(acronym);
      arr->SetName(name.c_str());

      // Allocate and read
      int numComponents = 1;
      if (acronym == "BLOCKSIZE" || acronym == "NODES")
      {
        numComponents = 3;
      }
      arr->SetNumberOfComponents(numComponents);
      arr->SetNumberOfTuples(static_cast<vtkIdType>(length / numComponents));

      bool success;
      if (!vtkArrayDispatch::Dispatch::Execute(arr, H5DReadArray{}, arrayId, memType, success) ||
        !success)
      {
        vtkErrorWithObjectMacro(reader, "Failed to read dataset " << path << ": " << acronym);
        return;
      }

      arrays[i] = arr;
    }
  };
  readArrays(0, static_cast<vtkIdType>(arrays.size()));
  // vtkSMPTools::For(0, static_cast<vtkIdType>(arrays.size()), readArrays);

  const bool success =
    std::all_of(arrays.begin(), arrays.end(), [](const auto& array) { return array != nullptr; });
  return success ? arrays : std::vector<vtkSmartPointer<vtkDataArray>>{};
}

When i enable // vtkSMPTools::For(0, static_cast(arrays.size()), readArrays); which does the multithreading. hdf5 segfaults in 10 different places.
I tried creating a different file id for each thread, and it still failed, even when i had a mutex in all places except the H5Dread.

It feels like there is a global state that only 1 thread can modify at a time. If you have any example that successfully multithreads the reading of an hdf5 array, i would be more than happy to take a look, cause google/chatgpt were not of any help so far.

OK, maybe I wasn’t clear. Don’t use the HDF5 library for multi-threaded reading, at least not the current one. Use the HDF5 library only to retrieve the metadata (offsets and ranges). Use vanilla pread to read the data concurrently. OK?

Oh i see, what you mean. Thanks for the clarification.
Again, could you provide me with an example or direct me to the documentation, for how to get the indices and use pread? Thanks a lot, in advance.

Hi, @spiros.tsalikis !

Here’s a sample HDF5 offset/length content map in XML:

                <Group name="Data_Fields">
                    <Byte name="Flags_NorthernDaily">
                        <Dim name="/HDFEOS/GRIDS/Northern_Hemisphere/YDim"/>
                        <Dim name="/HDFEOS/GRIDS/Northern_Hemisphere/XDim"/>
                        <Attribute name="_FillValue" type="Byte">
                            <Value>255</Value>
                        </Attribute>
                        <Attribute name="fullnamepath" type="String">
                            <Value>/HDFEOS/GRIDS/Northern Hemisphere/Data Fields/Flags_NorthernDaily</Value>
                        </Attribute>
                        <dmrpp:chunks fillValue="255" byteOrder="LE">
                            <dmrpp:chunk offset="6664" nBytes="519841"/>
                        </dmrpp:chunks>
                    </Byte>

from kerchunk/data/AMSR_2_L3_DailySnow_P00_20160831.he5.h5.dmrpp at main · hyoklee/kerchunk.

If the above map is what you’re looking for, please read Building and Testing DMR++ Documents. Then, you can generate offset/length map of HDF5 file contents in XML called DMR++.

Once you generated the XML map (DMR++), you can use whatever threads & streaming software stack you want. For example, you can use Apache Spark Streaming as shown in Fig 1.

Fig. 1: Apache Spark example on hdfeos.org

In your case, replace Spark Byte Reader with vtk + thread + pread.
Your application should run like below:

$your_multi_threaded_vtk_app your_test.h5 your_test.h5.xml

Your app parses XML input file to get offset/length and use pread(offset,length) on your_test.h5. At this point, your app should not have any HDF5 API calls.

Here’s the relevant HDF5 API H5Dget_chunk_info() that build the HDF5 XML map:
bes/modules/dmrpp_module/build_dmrpp_util.cc at 4b71c68511382e2d1dcdc9f32a12377266cf4160 · OPENDAP/bes

Here’s the reference manual:
HDF5: Datasets (H5D)

Here’s the relevant documentation:
RFC-Chunking Functions-2018-06-20-v3.docx.pdf

I hope this helps you to build multi-threadded HDF5 application. For further help, I encourage you to use our Consulting - The HDF Group - ensuring long-term access and usability of HDF data and supporting users of HDF technologies because, as you mentioned, it’s not trivial to make threads/parallel/distributed systems work correctly @ scale even for the most advanced AIs in the world.

Will that work even if i have compressed data? I guess there are metadata in hdf5, saying this segment starting at offset x with that size Y is compressed using C compression style. So, using pread would read the compressed data, right? And then i would have to manually decompress them?

Yes, you have to decompress each chunk by yourself using C in your app. The XML map includes what compression method was used.

I’m very happy to see a user who understands the concept so quickly!

Gotcha. I will try to get the offsets sequentially, do the pread (maybe sequentially or mutithreaded), and then hopefully i can do the decompression (if any), in a multithreaded fashion.

I managed to get the correct offsets, but now there are two problems, pread is not supported on windows, and i don’t see a clear way to run the H5Z_pipeline after using pread.

I’m so happy to hear that you try Windows. You’re right. pread is not supported on Windows. You need to use something like ReadFile function (fileapi.h) - Win32 apps | Microsoft Learn.

In addition, you can use any C++ IO calls with external map approach.

Would you please elaborate H5Z_pipeline issue further?

Your effort and use case is really interesting. Please keep me informed how far you can go on Windows. Feel free to share codes on GitHub so HDF community can help you, too.

By H5Z_pipeline i mean. pread will give me the array that includes the compressed data (when compression is used). Is there a way to use the filters that H5Z_pipeline will use to decompress the data using hdf5 functionality, or do i need to do the decompression myself by using the decompression library?

1 Like

I see. Wow, that’s really interesting because I haven’t thought about that.

Yes, I think you can try H5Z first for quick prototyping and then try your own decompression if H5Z attempt fails.

For the maximum portability of your app, I’d entirely avoid using any HDF5 API calls including H5Z.

Well… The goal was not to NOT use HDF5, but to use it in an such a way that it can be fast.

If i am writing HDF5 files but i end up not using HDF5 to read them because it’s super convoluted to do multithreading, then why use the HDF5 library in the first place?

I wanna stick to HDF5, because it’s THE standard but i am wondering how to do it in an efficient manner.

Basically, i don’t want to have to do everything that hdf5 does under the hood myself.

As far as H5Z_pipeline (private function), i don’t see how do use it after having an array read and just the filter that it was used (read from hdf5) saved. I am clearly missing many things. from the function call.

/*-------------------------------------------------------------------------
 * Function: H5Z_pipeline
 *
 * Purpose:  Process data through the filter pipeline.  The FLAGS argument
 *           is the filter invocation flags (definition flags come from
 *           the PLINE->filter[].flags).  The filters are processed in
 *           definition order unless the H5Z_FLAG_REVERSE is set.  The
 *           FILTER_MASK is a bit-mask to indicate which filters to skip
 *           and on exit will indicate which filters failed.  Each
 *           filter has an index number in the pipeline and that index
 *           number is the filter's bit in the FILTER_MASK.  NBYTES is the
 *           number of bytes of data to filter and on exit should be the
 *           number of resulting bytes while BUF_SIZE holds the total
 *           allocated size of the buffer, which is pointed to BUF.
 *
 *           If the buffer must grow during processing of the pipeline
 *           then the pipeline function should free the original buffer
 *           and return a fresh buffer, adjusting BUF_SIZE accordingly.
 *
 * Return:   Non-negative on success
 *           Negative on failure
 *-------------------------------------------------------------------------
 */
herr_t
H5Z_pipeline(const H5O_pline_t *pline, unsigned flags, unsigned *filter_mask /*in,out*/, H5Z_EDC_t edc_read,
             H5Z_cb_t cb_struct, size_t *nbytes /*in,out*/, size_t *buf_size /*in,out*/,
             void **buf /*in,out*/)

I think p.114 of the following document can give you a good overview about filters:

I’m glad to hear that you want to stick to HDF5. May I ask you what project is using HDF5 as THE standard? I want to highlight and promote your project as a showcase for outreach.