H5i_dec_ref hangs


#1

Hi all,

I have a problem connected to the other thread here H5DOpen collective, driver MPIO but it is an obstacle further down the road.

I am able to write to datasets (contiguous ones) independently via the write_direct method but get stuck when trying to close the file. The high level method does the following:

if self.mpi_rank == 0:
        if truncate_file:
            self.truncate_h5_file()
        if not self.f:
            self.open_h5_file_serial()
        self.ingest_metadata(image_path, spectra_path)
        self.close_h5_file()
    self.comm.Barrier()
    self.open_h5_file_parallel()
    if self.mpi_rank == 0:
        self.distribute_work(self.image_path_list)
    else:
        self.write_image_data()
    self.comm.Barrier()
    if self.mpi_rank == 0:
        self.distribute_work(self.spectra_path_list)
    else:
        self.write_spectra_data()
    self.close_h5_file()

but on the self.close_h5_file() all processes hang (everybody consuming 100% cpu, the typical mpi active wait). Namely it happens in the method h5i.dec_ref(id_) in file files.py, line 453.

def close(self):
    """ Close the file.  All open objects become invalid """
    with phil:
        # Check that the file is still open, otherwise skip
        if self.id.valid:
            # We have to explicitly murder all open objects related to the file

            # Close file-resident objects first, then the files.
            # Otherwise we get errors in MPI mode.
            id_list = h5f.get_obj_ids(self.id, ~h5f.OBJ_FILE)
            file_list = h5f.get_obj_ids(self.id, h5f.OBJ_FILE)

            id_list = [x for x in id_list if h5i.get_file_id(x).id == self.id.id]
            file_list = [x for x in file_list if h5i.get_file_id(x).id == self.id.id]

            for id_ in id_list:
                while id_.valid:
                    h5i.dec_ref(id_)

            for id_ in file_list:
                while id_.valid:
                    h5i.dec_ref(id_)

            self.id.close()
            _objects.nonlocal_close()

The strange thing is that every process indeed calls this method, calls the “while id_.valid” the same amount of times (3x) and on the third run of h5i.dec_ref(id_) it hangs. Presumably waiting for other processes to call the function collectively for the last reference to the file.

I have verified that the following program does not hang for any number of processes:

f = h5py.File(H5PATH, 'r+', driver='mpio', comm=MPI.COMM_WORLD)
f.close()

And flushing all datasets or files before closing does not help either.

Thank you very much for your help!

Cheers,

Jiri


#2

Come now, is there really no brave enough gentleman that would venture into this? :slight_smile:

I have spent maybe one working week trying to investigate this and even though I have learned a lot, I am close to becoming mad because of that cursed knowledge…

I have created a copy-paste example that reproduces the same issue, with comments:

import h5py
import numpy as np
from mpi4py import MPI

H5PATH = "test_parallel.h5"

class ParallelWriter:

    def __init__(self, h5_path):
        self.comm = MPI.COMM_WORLD
        self.mpi_size = self.comm.Get_size()
        self.mpi_rank = self.comm.Get_rank()
        self.h5_path = h5_path
        self.f = None

    def ingest_data(self, truncate_file=None):
        if self.mpi_rank == 0:                  # if I'm the master, write all metadata and create datasets
            if truncate_file:
                self.truncate_h5_file()
            self.open_h5_file_serial()
            self.ingest_metadata()
            self.close_h5_file()                # close the file opened in serial mode
        self.open_h5_file_parallel()            # all, including master, let's open file in mpio mode 
        if self.mpi_rank == 0:
            self.distribute_work(self.mpi_size) # master distributes the work and reads information from the file
        else:
            self.write_image_data()             # slaves only write the data, in parallel.
        self.close_h5_file()                    # closing the mpio opened file hangs.

    def ingest_metadata(self):
        for i in range(1, self.mpi_size + 1):
            self.f.require_dataset("dataset_%d" % i, (1000, 1000), np.float)

    def open_h5_file_serial(self):
        self.f = h5py.File(self.h5_path, 'r+')

    def open_h5_file_parallel(self):
        if self.mpi_rank == 0:
            self.f = h5py.File(self.h5_path, 'r', driver='mpio', comm=self.comm)
        else:
            self.f = h5py.File(self.h5_path, 'r+', driver='mpio', comm=self.comm)

    def truncate_h5_file(self):
        self.f = h5py.File(self.h5_path, 'w')
        self.f.close()

    def distribute_work(self, mpi_size):
        for dest in range(1, mpi_size):
            print("Sending work to dest %02d " % dest)
            self.comm.send(obj="test", dest=dest)

    def write_image_data(self):
        status = MPI.Status()
        message = self.receive_work(status)
        img_data = np.zeros((1000, 1000))

        self.f["dataset_%d" % self.mpi_rank].write_direct(img_data)
        print("wrote data to ds: %d" % self.mpi_rank)

    def receive_work(self, status):
        message = self.comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
        print("Received message from master: %s" % message)
        return message

    def close_h5_file(self):
        self.f.close()


writer = ParallelWriter(H5PATH)
writer.ingest_data(truncate_file=True)

#3

Why? Isn’t that asking for trouble? G.


#4

Ah, yep, that was already desperation mode trying different things. If you just use r+ for everybody, it works the same though…

I’ve tried to debug it through the gdb and the reason I trie that one is that the rank 0 always gets stuck in a different flush subroutine than the workers. Rank 0 gets stuck in this wait:

#0  0x00007f32a3849093 in ?? ()
   from /usr/lib/x86_64-linux-gnu/libopen-pal.so.20
#1  0x00007f32a37ed9a9 in opal_progress ()
   from /usr/lib/x86_64-linux-gnu/libopen-pal.so.20
#2  0x00007f32a37f2c75 in sync_wait_mt ()
   from /usr/lib/x86_64-linux-gnu/libopen-pal.so.20
#3  0x00007f32a3d46dbc in ompi_request_default_wait ()
   from /usr/lib/x86_64-linux-gnu/libmpi.so.20
#4  0x00007f32a3d9df7a in ompi_coll_base_barrier_intra_two_procs ()
   from /usr/lib/x86_64-linux-gnu/libmpi.so.20
#5  0x00007f326b64d123 in mca_io_ompio_file_set_size ()
   from /usr/lib/x86_64-linux-gnu/openmpi/lib/openmpi/mca_io_ompio.so
#6  0x00007f32a3d88cb6 in PMPI_File_set_size ()
   from /usr/lib/x86_64-linux-gnu/libmpi.so.20
#7  0x00007f328605b9aa in H5FD__mpio_truncate (_file=_file@entry=0x2fdeb00, 
    dxpl_id=<optimized out>, closing=closing@entry=true) at H5FDmpio.c:1658
#8  0x00007f3285d2bf8b in H5FD_truncate (file=0x2fdeb00, 
    closing=closing@entry=true) at H5FD.c:1612
#9  0x00007f3285cffd19 in H5F__flush_phase2 (f=f@entry=0x2f7f610, 
    closing=closing@entry=true) at H5Fint.c:1934
#10 0x00007f3285d02b3a in H5F__dest (f=f@entry=0x2f7f610, 
    flush=flush@entry=true) at H5Fint.c:1205
#11 0x00007f3285d039d5 in H5F_try_close (f=f@entry=0x2f7f610, 
was_closed=was_closed@entry=0x0) at H5Fint.c:2229
#12 0x00007f3285d03ecf in H5F__close (f=f@entry=0x2f7f610) at H5Fint.c:2056
#13 0x00007f328601df58 in H5VL__native_file_close (file=0x2f7f610, 
    dxpl_id=<optimized out>, req=<optimized out>) at H5VLnative_file.c:883
#14 0x00007f3285ff2de9 in H5VL__file_close (obj=0x2f7f610, cls=0x2d94230, 
    dxpl_id=dxpl_id@entry=792633534417207304, req=req@entry=0x0)
    at H5VLcallback.c:3944
#15 0x00007f328600250a in H5VL_file_close (vol_obj=vol_obj@entry=0x2d40100, 
    dxpl_id=792633534417207304, req=req@entry=0x0) at H5VLcallback.c:3976
#16 0x00007f3285ced4e2 in H5F__close_cb (file_vol_obj=0x2d40100) at H5F.c:242
#17 0x00007f3285dcbc66 in H5I_dec_ref (id=id@entry=72057594037927938)
    at H5I.c:1376
#18 0x00007f3285dcbd5e in H5I_dec_app_ref (id=id@entry=72057594037927938)
    at H5I.c:1421
#19 0x00007f3285dcbf58 in H5Idec_ref (id=id@entry=72057594037927938)
    at H5I.c:1317
#20 0x00007f326e958def in __pyx_f_4h5py_4defs_H5Idec_ref (
    __pyx_v_obj_id=72057594037927938)
    at /home/caucau/SDSSCube/ext_lib/h5py/h5py/h5py/defs.c:17581

All other ranks get stuck in this function:

#0  0x00007f7651216093 in ?? ()
   from /usr/lib/x86_64-linux-gnu/libopen-pal.so.20
#1  0x00007f76511ba9a9 in opal_progress ()
   from /usr/lib/x86_64-linux-gnu/libopen-pal.so.20
#2  0x00007f76511bfc75 in sync_wait_mt ()
   from /usr/lib/x86_64-linux-gnu/libopen-pal.so.20
#3  0x00007f7651713dbc in ompi_request_default_wait ()
   from /usr/lib/x86_64-linux-gnu/libmpi.so.20
#4  0x00007f765176af7a in ompi_coll_base_barrier_intra_two_procs ()
   from /usr/lib/x86_64-linux-gnu/libmpi.so.20
#5  0x00007f76517269ca in PMPI_Barrier ()
   from /usr/lib/x86_64-linux-gnu/libmpi.so.20
#6  0x00007f76278cfcd4 in H5AC__rsp__dist_md_write__flush (f=f@entry=0x234c6e0)
    at H5ACmpio.c:1700
#7  0x00007f76278d1b80 in H5AC__run_sync_point (f=f@entry=0x234c6e0, 
    sync_point_op=sync_point_op@entry=1) at H5ACmpio.c:2162
#8  0x00007f76278d1c74 in H5AC__flush_entries (f=f@entry=0x234c6e0)
    at H5ACmpio.c:2305
#9  0x00007f76274757dd in H5AC_flush (f=f@entry=0x234c6e0) at H5AC.c:681
#10 0x00007f7627588ce7 in H5F__flush_phase2 (f=f@entry=0x234c6e0, 
    closing=closing@entry=true) at H5Fint.c:1918
#11 0x00007f762758bb3a in H5F__dest (f=f@entry=0x234c6e0, 
    flush=flush@entry=true) at H5Fint.c:1205
#12 0x00007f762758c9d5 in H5F_try_close (f=f@entry=0x234c6e0, 
    was_closed=was_closed@entry=0x0) at H5Fint.c:2229
#13 0x00007f762758cecf in H5F__close (f=f@entry=0x234c6e0) at H5Fint.c:2056
#14 0x00007f76278a6f58 in H5VL__native_file_close (file=0x234c6e0, 
    dxpl_id=<optimized out>, req=<optimized out>) at H5VLnative_file.c:883
#15 0x00007f762787bde9 in H5VL__file_close (obj=0x234c6e0, cls=0x21058e0, 
    dxpl_id=dxpl_id@entry=792633534417207304, req=req@entry=0x0)
    at H5VLcallback.c:3944
#16 0x00007f762788b50a in H5VL_file_close (vol_obj=vol_obj@entry=0x23d1cf0, 
    dxpl_id=792633534417207304, req=req@entry=0x0) at H5VLcallback.c:3976
#17 0x00007f76275764e2 in H5F__close_cb (file_vol_obj=0x23d1cf0) at H5F.c:242
#18 0x00007f7627654c66 in H5I_dec_ref (id=id@entry=72057594037927936)
    at H5I.c:1376
#19 0x00007f7627654d5e in H5I_dec_app_ref (id=id@entry=72057594037927936)
    at H5I.c:1421
#20 0x00007f7627654f58 in H5Idec_ref (id=id@entry=72057594037927936)
    at H5I.c:1317
#21 0x00007f761c2e3def in __pyx_f_4h5py_4defs_H5Idec_ref (
    __pyx_v_obj_id=72057594037927936)

you can see that the rank 0 gets stuck on truncating the file for some reason, while the other ranks are waiting for each other when flushing entries.

P.S.: This stacktrace is already with all opening the file in r+ mode.

I’m starting to think whether the parallel writing feature of h5py is worth my troubles, maybe I’ll go back to the Single Writer Multiple Reader model. But that would certainly put a limit on my scalability and that would be a pity. Thank you for help!

BR,

Jiri


#5

Hi, after a while I’m getting back into this challenge. Any chance we could resurrect this thread? Basically, I’m trying to write into an HDF5 file in parallel mode when the datasets were created in serial mode.

We could try to approach this from a different angle - if you could give me a working example how to do that in h5py, I can alter my code without having to dig into why the particular piece of code mentioned above is not working.

Thanks for help!

Jiri


#6

Can we have a C example that shows the desired behavior? (Maybe you have one already…) I’d be happy to help writing one if there’s a clear description of what you are trying to do. G.


#7

Hi, thanks for the reply. I have been experimenting with C, but to avoid any mismatch caused by my rustiness in C, let’s try pseudo-code instead. What I need is fairly simple:

Let’s run this for 10 processes.

  1. If rank 0 -> Open H5 file in serial mode.
  2. If rank 0 -> create 10 datasets.
  3. If rank 0 -> Close H5 file in serial mode.
  4. Barrier
  5. Open the file in parallel mode.
  6. Each rank writes to its own dataset (rank = index of the dataset).
  7. Close the file in parallel mode.

The problem I encounter is that the step #7 gets stuck in a deadlock when freeing resources.

Let me know if this pseudocode is not clear - I will try to produce a working C example.

BR,

Jiri


#8

What’s the layout of the datasets under 2? Does the serial kickstarter know the ranks and extents of the datasets? Or, are we resizing datasets as part of 6? If that were the case, we can still do independent H5Dwrites, but the sizing and shaping has to be collective so that all processes have the same view of what’s going on in the file.

Alternatively, you can save yourself the trouble of parallel mode and have 10 ranks write 10 files and have the kickstarter write a stub file with 10 external links or 10 groups where you mount the other files.

G.


#9

Hi,

  1. The datasets are just in the root and they are already created in the correct sizes
  2. The kickstarter is actually distributing the work as part of 6 to others, so he know the rank.
  3. No resizing is done by the parallel workers afterwards as part of 6 - mentioned in #1.
  4. I cannot save the trouble by using external links, because essential for my functionality are the group and region references which don’t work with external links unfortunately…

The reasoning behind this is:

  1. steps 1-3 go through FITS image+spectrum headers (cca 1 mil. files at this point) where they find all information needed to create the file structure, along with correctly sized datasets. This just involves read of the headers from disk + write to HDF5.
  2. Step 5-7 go through these files again and do some preprocessing before writing them to the prepared datasets. This involves significant processing time (cca 2 seconds per one file) apart from the reads+writes, therefore needs to be in parallel.

I could do just a single writer multiple reader case where only the rank 0 writes in step 6 and everybody sends the preprocessed data to it, but this data to be written is quite big and eventually will hit a limit where I will be sending vast amounts of data between the workers and rank 0.


#10

OK, here’s a conversation re-starter. It’s not exactly your scenario. I glossed over a few ugly truths.

#include "hdf5.h"
#include <mpi.h>

#include <assert.h>
#include <stdlib.h>

int main(int argc, char** argv)
{
  MPI_Comm comm = MPI_COMM_WORLD;
  int comm_rank, comm_size;
  hid_t file;

  MPI_Init(&argc, &argv);
  MPI_Comm_size(comm, &comm_size);
  MPI_Comm_rank(comm, &comm_rank);

  { // create the file in parallel
    hid_t fapl;
    char fname[] = "foo.h5";
    assert((fapl = H5Pcreate(H5P_FILE_ACCESS)) != H5I_INVALID_HID);
    assert(H5Pset_fapl_mpio(fapl, comm, MPI_INFO_NULL) >= 0);
    assert((file = H5Fcreate(fname, H5F_ACC_TRUNC, H5P_DEFAULT, fapl)) !=
           H5I_INVALID_HID);
    assert(H5Pclose(fapl) >= 0);
  }

  // all processes have to call H5Dcreate() to create a dataset, even if the
  // dataset will be accessed by only one MPI rank.
  for (int i = 0; i < comm_size; ++i)
    {
      char name[255];
      hid_t fspace, dset;
      assert((fspace = H5Screate_simple(1, (hsize_t[]){10 + i}, NULL)) !=
             H5I_INVALID_HID);
      assert(sprintf(name, "%d", i) > 0);
      assert((dset = H5Dcreate(file, name, H5T_STD_I32LE, fspace, H5P_DEFAULT,
                               H5P_DEFAULT, H5P_DEFAULT)) != H5I_INVALID_HID);
      assert(H5Dclose(dset) >= 0);
      assert(H5Sclose(fspace) >= 0);
    }

  { // each rank opens and writes its "own" dataset
    hid_t dset;
    char name[255];
    int* data = (int*)malloc((10+comm_rank)*sizeof(int));
    for (int i = 0; i < 10+comm_rank; ++i)
      data[i] = comm_rank;

    assert(sprintf(name, "%d", comm_rank) > 0);
    assert((dset = H5Dopen(file, name, H5P_DEFAULT)) >= 0);

    assert(H5Dwrite(dset, H5T_NATIVE_INT, H5S_ALL, H5S_ALL,
                    H5P_DEFAULT, data) >= 0);

    assert(H5Dclose(dset) >= 0);
    free(data);
  }

  assert(H5Fclose(file) >= 0);

  MPI_Finalize();

  return EXIT_SUCCESS;
}

(Running w/ 4 ranks) The output should look like this:

HDF5 "foo.h5" {
GROUP "/" {
   DATASET "0" {
      DATATYPE  H5T_STD_I32LE
      DATASPACE  SIMPLE { ( 10 ) / ( 10 ) }
      DATA {
      (0): 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
      }
   }
   DATASET "1" {
      DATATYPE  H5T_STD_I32LE
      DATASPACE  SIMPLE { ( 11 ) / ( 11 ) }
      DATA {
      (0): 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
      }
   }
   DATASET "2" {
      DATATYPE  H5T_STD_I32LE
      DATASPACE  SIMPLE { ( 12 ) / ( 12 ) }
      DATA {
      (0): 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2
      }
   }
   DATASET "3" {
      DATATYPE  H5T_STD_I32LE
      DATASPACE  SIMPLE { ( 13 ) / ( 13 ) }
      DATA {
      (0): 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3
      }
   }
}
}

#11

OK I see, so this would mean that every process needs to read all FITS file headers when creating the datasets which would be rather inefficient, but if there is no other way let’s try - maybe the slowdown won’t be that big.

On second thought, there are 2 options. Based on your knowledge how these parallel writes work - which option would be faster?

  1. One process reads the FITS file headers from disk and parses them -> MPI_Sends the parsed information to all other processes to do the parallel dataset creations.
  2. All processes read and parse the FITS file headers and write the datasets. This does not require any other MPI communication than the collective dataset creation.

Thanks for recommendation.


#12

I think the difference between 1. and 2. will come down to the number of ranks and the latency to obtain/size of those FITS headers. For small numbers of ranks (~100), I wouldn’t expect to see a big difference. For large numbers of ranks, 1. might have an advantage, because you could block/batch the header parsing and partition/broadcast, and overlap that with the dataset creation and writes. G.