Hi, I wasn’t able to check out the link option yet but the but this comment got me a bit interested:
I wrote a small script that would write from multiple threads:
import h5py
import hdf5plugin
import h5pyd
import itertools
import concurrent.futures
from tqdm import tqdm
def get_axis_slice(offset, len, target):
upper_bnd = offset + target
if upper_bnd < len:
return slice(offset, upper_bnd)
else:
return slice(offset, None)
def get_axis_slice_list(length, slice_len):
offset = 0
axis_slices = []
while offset < length:
slc = get_axis_slice(offset, length, slice_len)
axis_slices.append(slc)
offset += slice_len
return axis_slices
def get_chunk_slices(h5_dset):
chunks = h5_dset.chunks
if chunks:
axis_slices = []
for i in range(len(chunks)):
axis_slice = get_axis_slice_list(h5_dset.shape[i], chunks[i])
axis_slices.append(axis_slice)
all_slices = list(itertools.product(*axis_slices))
return all_slices
else:
return [slice(0,None,None)]
def upload_slice(h5_dset, hsds_dset, slice):
hsds_dset[slice] = h5_dset[slice]
# print("uploaded",h5_dset.name, slice)
def upload_dset(h5_dset, hsds_dset):
slices = get_chunk_slices(h5_dset)
# Limit the number of threads
max_threads = 10 # Adjust this value based on your system
# Create a ThreadPoolExecutor to manage threads
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
# Submit each upload_slice task to the executor
futures = [executor.submit(upload_slice, h5_dset, hsds_dset, slice) for slice in slices]
# Optionally, wait for all threads to complete (this happens automatically when using 'with')
with tqdm(total=len(futures), desc="Uploading slices", unit="slice") as pbar:
for future in concurrent.futures.as_completed(futures):
# If you want to catch exceptions, you can use `future.result()`
try:
future.result() # This will re-raise any exceptions that occurred during execution
except Exception as e:
print(f"An error occurred: {e}")
pbar.update(1)
pass
def upload_file(input_path, hsds_path):
h5_file = h5py.File(input_path, "r")
hsds_file = h5pyd.File(hsds_path, "w")
for key in h5_file:
h5_dset = h5_file[key]
assert isinstance(h5_dset, h5py.Dataset)
if h5_dset.chunks:
chunks = h5_dset.chunks
else:
chunks = True
hsds_dset = hsds_file.create_dataset(key, shape = h5_dset.shape, chunks=chunks, dtype=h5_dset.dtype, compression="blosclz")
upload_dset(h5_dset, hsds_dset)
if __name__ == "__main__":
input_path = "example.h5"
hsds_path = "/home/admin/example.h5"
upload_file(input_path, hsds_path)
On a 2 node (2 cpu, 8 GB RAM)hsds cluster I would only get a 50% Improvement over hsload (1.5Gbit/s instead of 1 Gbit/s) But when scaling the cluster to 5 nodes I would get over 3Gbit/s compared to 1Gbit/s! Which is already pretty close to the connection speed of the virtual machine I use for ingesting.
It is necessary to not here, that the dataset I am using is highly compressible. So the data throughput to the premium blob storage is comparatively low.
I have noticed that picking upload slices that match the chunk shape of the hsds dataset improves performance noticably. Which makes sense because I assume that a partial write on a chunk requires reading the existing data and rewriting it again. I assume hsload does that by default.
I would love to make this a proper pull request but I wasn’t quite able to understand the h5pyd code to know where to put the changes in.