So I have a certain HDF5 database I’d like to write to it a few datasets in parallel. Since I am a noob with mpi4py and I barely understand how it works, I asked for help from an AI,and together we sketched this minimal example Python script:
from mpi4py import MPI
import h5py
import numpy as np
import time
import os
# Monkey-patch h5py.Dataset with the append method
def append_h5py_dataset(self, value):
value = np.array(value)
if value.shape != self.shape[1:]:
raise ValueError(
f"value has shape {value.shape} while every line's shape in the "
f"data is {self.shape}."
)
self.resize(self.shape[0] + 1, axis=0)
self[-1] = value
h5py.Dataset.append = append_h5py_dataset
# Initialize MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Abort if not run with mpirun
if size == 1:
raise RuntimeError("This script must be run with mpirun or mpiexec.")
# List of integers between 10 and 20, size 6
integers = [12, 14, 16, 18, 11, 13]
# File path
file_path = "parallel_hdf5_example.h5"
# Remove the file if it exists (only rank 0 does this to avoid race conditions)
if rank == 0:
if os.path.exists(file_path):
os.remove(file_path)
comm.Barrier() # Synchronize all processes after file removal
# Function to process an integer
def process_integer(i):
rng = np.random.default_rng(i)
try:
with h5py.File(file_path, mode="a", driver='mpio', comm=comm) as h5file:
# Create dataset if it doesn't exist
if f"/{i}" not in h5file:
# Create dataset with a single float
h5file.create_dataset(f"/{i}", data=[rng.uniform(0, 1)], maxshape=(None,))
# Append data in a loop
dataset = h5file[f"/{i}"]
for _ in range(i): # Use `i` as the loop range
time.sleep(0.1) # Simulate intensive computation
new_data = rng.uniform(0, 1) # Single float
print(f"integer {i} gained new data {new_data}")
dataset.append(new_data) # Append directly (no list wrapping)
# Print completion message
print(f"Process {rank} finished processing integer {i}")
except Exception as e:
print(f"Process {rank} encountered an error while processing integer {i}: {e}")
# Distribute integers across processes
for idx, i in enumerate(integers):
if idx % size == rank:
print(f"Process {rank} processing integer {i}")
process_integer(i)
# Synchronize all processes
comm.Barrier()
if rank == 0:
print("All processes have finished.")
When running it, only the rank 0 process finishes processing it’s integer… If I add an h5file.flush() statement right after the for _ in range(i) loop, it hangs there and doesn’t print me the finished processing line no matter what is the rank… Also the except Exception thing doesn’t print anything…
Any idea why is that? Maybe it is due to the way I append to the datasets? Is it a known issue that resizing datasets in parallel with mpi can cause issues?
Installed versions info:
h5py==3.12.1(I know that 3.13.0 was just released but I haven’t tested it yet).mpi4py==4.0.1(I know that4.0.3was also recently released but it too wasn’t tested).
