Hi,
I’m trying to use a HDF5 file for exchanging numeric as well as string data between a writer and reader process.
I’m using h5py and the SWMR mode for that as that seems to support exactly this use case.
While reading numeric values concurrently while the wirter procerss fills the file works, reading string fails with an error:
OSError(“Can’t synchronously read data (address of object past end of allocation)”)
I adapted the multiprocessing SWMR example from the h5py documentation to see if that works at least.
But that fails with a different but very similiar error:
OSError: Can’t synchronously read data (len not positive after adjustment for EOA)
Note: Im using Windows 11
Has anyone a idea what goes wrong and how to fix it?
Used h5py version info:
h5py 3.12.1
HDF5 1.14.2
Python 3.12.2 (tags/v3.12.2:6abddd9, Feb 6 2024, 21:26:36) [MSC v.1937 64 bit (AMD64)]
sys.platform win32
sys.maxsize 9223372036854775807
numpy 1.26.4
cython (built with) 3.0.11
numpy (built against) 2.1.1
HDF5 (built against) 1.14.2
Adapted SWMR multiprocessing code:
import sys
import time
import h5py
import numpy as np
import logging
from multiprocessing import Process, Event
class SwmrReader(Process):
def __init__(self, event, fname, dsetname, timeout = 2.0):
super().__init__()
self._event = event
self._fname = fname
self._dsetname = dsetname
self._timeout = timeout
def run(self):
self.log = logging.getLogger('reader')
print("Reader: Waiting for initial event")
assert self._event.wait( self._timeout )
self._event.clear()
print(f"Reader: Opening file {self._fname}")
f = h5py.File(self._fname, 'r', libver='latest', swmr=True, locking=False)
assert f.swmr_mode
dset = f[self._dsetname]
text_dset = f['text_data']
try:
# monitor and read loop
while self._event.wait( self._timeout ):
self._event.clear()
print("Reader: Refreshing dataset")
dset.refresh()
text_dset.refresh()
shape = dset.shape
print("Reader: Read dset shape: %s"%str(shape))
print(f"Reader: Text dataset shape: {text_dset.shape}")
for i in range(text_dset.shape[0]):
print(text_dset[i])
finally:
f.close()
class SwmrWriter(Process):
def __init__(self, event, fname, dsetname):
super().__init__()
self._event = event
self._fname = fname
self._dsetname = dsetname
def run(self):
self.log = logging.getLogger('writer')
self.log.info("Creating file %s", self._fname)
f = h5py.File(self._fname, 'w', libver='latest')
try:
arr = np.array([1,2,3,4])
dset = f.create_dataset(self._dsetname, chunks=(2,), maxshape=(None,), data=arr)
text_dset = f.create_dataset('text_data', (0,), maxshape=(None,), dtype=h5py.string_dtype())
assert not f.swmr_mode
print("Writer: SWMR mode")
f.swmr_mode = True
assert f.swmr_mode
print("Writer: Sending initial event")
self._event.set()
# Write loop
for i in range(5):
time.sleep(1)
new_shape = ((i+1) * len(arr), )
print("Writer: Resizing dset shape: %s"%str(new_shape))
dset.resize( new_shape )
print("Writer: Writing data")
dset[i*len(arr):] = arr
text_dset.resize((text_dset.shape[0] + 1,))
text_dset[-1] = f"Sample text {i}"
#dset.write_direct( arr, np.s_[:], np.s_[i*len(arr):] )
print("Writer: Flushing data")
dset.flush()
text_dset.flush()
print("Writer: Sending event")
self._event.set()
finally:
f.close()
if __name__ == "__main__":
logging.basicConfig(format='%(levelname)10s %(asctime)s %(name)10s %(message)s',level=logging.INFO)
fname = 'swmrmp.h5'
dsetname = 'data'
if len(sys.argv) > 1:
fname = sys.argv[1]
if len(sys.argv) > 2:
dsetname = sys.argv[2]
event = Event()
reader = SwmrReader(event, fname, dsetname)
writer = SwmrWriter(event, fname, dsetname)
logging.info("Starting reader")
reader.start()
logging.info("Starting writer")
writer.start()
logging.info("Waiting for writer to finish")
writer.join()
logging.info("Waiting for reader to finish")
reader.join()