So I have a certain HDF5 database I’d like to write to it a few datasets in parallel. Since I am a noob with mpi4py and I barely understand how it works, I asked for help from an AI,and together we sketched this minimal example Python script:
from mpi4py import MPI
import h5py
import numpy as np
import time
import os
# Monkey-patch h5py.Dataset with the append method
def append_h5py_dataset(self, value):
value = np.array(value)
if value.shape != self.shape[1:]:
raise ValueError(
f"value has shape {value.shape} while every line's shape in the "
f"data is {self.shape}."
)
self.resize(self.shape[0] + 1, axis=0)
self[-1] = value
h5py.Dataset.append = append_h5py_dataset
# Initialize MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Abort if not run with mpirun
if size == 1:
raise RuntimeError("This script must be run with mpirun or mpiexec.")
# List of integers between 10 and 20, size 6
integers = [12, 14, 16, 18, 11, 13]
# File path
file_path = "parallel_hdf5_example.h5"
# Remove the file if it exists (only rank 0 does this to avoid race conditions)
if rank == 0:
if os.path.exists(file_path):
os.remove(file_path)
comm.Barrier() # Synchronize all processes after file removal
# Function to process an integer
def process_integer(i):
rng = np.random.default_rng(i)
try:
with h5py.File(file_path, mode="a", driver='mpio', comm=comm) as h5file:
# Create dataset if it doesn't exist
if f"/{i}" not in h5file:
# Create dataset with a single float
h5file.create_dataset(f"/{i}", data=[rng.uniform(0, 1)], maxshape=(None,))
# Append data in a loop
dataset = h5file[f"/{i}"]
for _ in range(i): # Use `i` as the loop range
time.sleep(0.1) # Simulate intensive computation
new_data = rng.uniform(0, 1) # Single float
print(f"integer {i} gained new data {new_data}")
dataset.append(new_data) # Append directly (no list wrapping)
# Print completion message
print(f"Process {rank} finished processing integer {i}")
except Exception as e:
print(f"Process {rank} encountered an error while processing integer {i}: {e}")
# Distribute integers across processes
for idx, i in enumerate(integers):
if idx % size == rank:
print(f"Process {rank} processing integer {i}")
process_integer(i)
# Synchronize all processes
comm.Barrier()
if rank == 0:
print("All processes have finished.")
When running it, only the rank 0 process finishes processing it’s integer… If I add an h5file.flush()
statement right after the for _ in range(i)
loop, it hangs there and doesn’t print me the finished processing
line no matter what is the rank
… Also the except Exception
thing doesn’t print anything…
Any idea why is that? Maybe it is due to the way I append to the datasets? Is it a known issue that resizing datasets in parallel with mpi can cause issues?
Installed versions info:
h5py==3.12.1
(I know that 3.13.0 was just released but I haven’t tested it yet).mpi4py==4.0.1
(I know that4.0.3
was also recently released but it too wasn’t tested).