Accelerating data ingest with h5pyd

Hi,

I have the usecase that my datasets are highly compressible. I am often dealing with a compression factor of 99%.
When uploading a .h5 file to hsds it seems to always decompress the data and upload them uncompressed. Is there a way to utilize the existing compression of the .h5 file, or an something in the realm of http compression? I saw there is an http compression option in the config.yml but it doesn’t seem to have an effect on uploading the data.

I saw there were some other people and also a whole article on data upload/ingest issues. Maybe someone has some other tips. :slight_smile:

Also a progressbar would be nice for hsload!

Some things I have tried:

I have already switched to ingesting the data from a virtual machine in the cloud because of the significantly faster connection. I don’t quite seem to be able to saturate it though. I achieve around 1Gbit/s of upload speed from the VM to the kubernetes cluster running the hsds server. about 1/4th of what i would expect the connection speed of the VM or the cluster to be. Also the CPU utilization for decompression and compression is barely measurable with the very cheap blosclz option that I am using. The Storage account should also be far from reaching it’s IOPs limit.

In azure I have already played with copying the hsds container from one storage account to another with az copy, which is by far the fastest way to migrate. Not only because I am only working with the compressed data but also because az copy can reach transfer speeds upwards of 10Gbit/s even on the small objects created by hsds.

Best Regards
Leonard

Hi Leonard!

Do you know which compression was used on the data in your HDF5 file?

Seems like .h5 compression isn’t being used,huge missed opportunity. Also, can we get a progress bar? My sanity would thank you

Originally it was gzip, which should also be supported with hsds. Later on I was switching to other compression options that are more efficient on decoding by setting --ignore filters flag in hsload.

Do you know if hsds should just work with the existing compression of an h5 file? I was not able to get it to work so far. I assume the chunk size also needs to be set the correct way in the .h5 file, otherwise hsds will rechunk the data so that it fits within the set chunk size limits.

Hey Leo,

Using http compression for PUT and POST requests is not commonly done, but it’s supported by the http protocol and make a lot of sense in your scenario. I’ll take a look at implementing it for h5pyd and hsds.

When last I did some performance testting with http compression for GET requests, my results showed that it was a bit slower for data that wasn’t very compressible. This was using gzip compression and as a result I made http_compression option disabled by default. Anyway, I’ll try it with blosclz compression and see how it does.

In any case, hsload is only sending one request at a time so it’s difficult to utilze a large percentage of the bandwidth. Have you tried running multiple hscopy’s at the same time? That should help.

Also, have you looked into using the hsload --link option? That will avoid the dataset data copy all together (but only makes sense if your HDF5 files are in the same storage account as your HSDS containe).

I’ll look into implementing the progress bar as well (unless anyone else would like to submit a PR!).

Re: using the existing compression of the h5 file…

If you use the --link option, hsds will just read the chunks directly from the h5 file. In this case, the min/max chunk sizes are ignored and hsds will just to the best it can with the given chunk layout (this is where the new feature of “hyperchunking” comes in play to make this more efficient when the h5 file uses lots of small chunks).

Thanks for the suggestions. I have read about the link option. From what I have seen the random read performance will be decreased. I will give it a try and see what the performance difference is.

Hi, I wasn’t able to check out the link option yet but the but this comment got me a bit interested:

I wrote a small script that would write from multiple threads:

import h5py
import hdf5plugin
import h5pyd
import itertools
import concurrent.futures
from tqdm import tqdm

def get_axis_slice(offset, len, target):
    upper_bnd = offset + target
    if upper_bnd < len:
        return slice(offset, upper_bnd)
    else:
        return slice(offset, None)


def get_axis_slice_list(length, slice_len):
    offset = 0
    axis_slices = []
    while offset < length:
        slc = get_axis_slice(offset, length, slice_len)
        axis_slices.append(slc)
        offset += slice_len
    return axis_slices



def get_chunk_slices(h5_dset):

    chunks = h5_dset.chunks
    if chunks:
        axis_slices = []
        for i in range(len(chunks)):
            axis_slice = get_axis_slice_list(h5_dset.shape[i], chunks[i])
            axis_slices.append(axis_slice)
        all_slices = list(itertools.product(*axis_slices))
        return all_slices
    else:
        return [slice(0,None,None)]


def upload_slice(h5_dset, hsds_dset, slice):
    hsds_dset[slice] = h5_dset[slice]
    # print("uploaded",h5_dset.name, slice)



def upload_dset(h5_dset, hsds_dset):
    slices = get_chunk_slices(h5_dset)
    # Limit the number of threads
    max_threads = 10  # Adjust this value based on your system

    # Create a ThreadPoolExecutor to manage threads
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
        # Submit each upload_slice task to the executor
        futures = [executor.submit(upload_slice, h5_dset, hsds_dset, slice) for slice in slices]

        # Optionally, wait for all threads to complete (this happens automatically when using 'with')
        with tqdm(total=len(futures), desc="Uploading slices", unit="slice") as pbar:
            for future in concurrent.futures.as_completed(futures):
                # If you want to catch exceptions, you can use `future.result()`
                try:
                    future.result()  # This will re-raise any exceptions that occurred during execution
                except Exception as e:
                    print(f"An error occurred: {e}")
                pbar.update(1)

    pass


def upload_file(input_path, hsds_path):
    h5_file = h5py.File(input_path, "r")
    hsds_file = h5pyd.File(hsds_path, "w")
    for key in h5_file:
        h5_dset = h5_file[key]
        assert isinstance(h5_dset, h5py.Dataset)
        if h5_dset.chunks:
            chunks = h5_dset.chunks
        else:
            chunks = True
        hsds_dset = hsds_file.create_dataset(key, shape = h5_dset.shape, chunks=chunks, dtype=h5_dset.dtype, compression="blosclz")
        upload_dset(h5_dset, hsds_dset)

if __name__ == "__main__":
    input_path = "example.h5"
    hsds_path = "/home/admin/example.h5"
    upload_file(input_path, hsds_path)

On a 2 node (2 cpu, 8 GB RAM)hsds cluster I would only get a 50% Improvement over hsload (1.5Gbit/s instead of 1 Gbit/s) But when scaling the cluster to 5 nodes I would get over 3Gbit/s compared to 1Gbit/s! Which is already pretty close to the connection speed of the virtual machine I use for ingesting.
It is necessary to not here, that the dataset I am using is highly compressible. So the data throughput to the premium blob storage is comparatively low.
I have noticed that picking upload slices that match the chunk shape of the hsds dataset improves performance noticably. Which makes sense because I assume that a partial write on a chunk requires reading the existing data and rewriting it again. I assume hsload does that by default.

I would love to make this a proper pull request but I wasn’t quite able to understand the h5pyd code to know where to put the changes in.

Hey Leo,

Good job on (almost) maxing out throughput! In most cases the limiting factor is server latency, so multithreading or multiprocessing can provide a big speedup.

There’s actually code in h5pyd that does work similar to your code snippet. See: h5pyd/examples/multi_mgr_benchmark.py at master · HDFGroup/h5pyd · GitHub. I’d be curious to hear if MultiManager works for you and if the performance is similar.

To determine if scaling up HSDS is useful or not a good trick is to run docker stats. If you see the DN containers hitting 100% CPU, running more DNs is likely to help. If you see the SN container hitting 100%, add more SNs. The later will require some adjustment to the HSDS endpoint for the clients since each SN will have a different port.

BTW, I haven’t had a chance to investigate http compression yet. I’ve been working on a fairly major revamp of h5pyd to get better performance when working with many groups/datasets/attributes. Already getting >10x speedup for selected use cases!

Hi John,
I am already utilizing the Multimanager for speeding up for random reads from multiple Datasets, which has been a significant speedup.
I assume there would be a nice way for testing with large slices. Writing a wrapper that opens the same dataset multiple times in read mode, creating a multimanager object from that and subdividing a large input slice into smaller slices of the number of datasets in the multimanager might be a hacky way of parallelizing sequential reads. I might give that a try :smiley:

With my upload script I was able to increase the ingest speed by overpopulating the kubernetes cluster nodes. It turned out that kubernetes was only spawning one pod per cluster because of the requested memory for each container. Reducing the “requests: memory” parameter in the k8s_deployment_azure.yml file to 0.5GB for the SNs and 1GB for the DNs allowed to spawn more pods per physical node (vm of 2vcpus and 8GB of RAM).

With these changes I was able to hit over 90% cpu utilization on all nodes at over 6Gbit/s sustained ingest speed.

I noticed, that 1 DN was never able to 100% utilize a single cpu core. Having more 50% more DNs than cpu cores definitely improved performance.

Oh, I had assumed you were using Docker. With kubernetes things are a bit different as each kubernetes pod has (at least with the standard deployment.yaml) one SN and one DN container, so you have just one scaling knob: the number of pods. And since Kuberentes is doing the load balancing for you, you don’t need to mess around with changing the endpoint for different threads/processes as with Docker.

That’s odd that one DN never got to a 100%, load should be distributed evenly for larger workloads. I’m hoping to setup a Graphana dashboard someday to get better insight as to what possible bottlenecks may be occurring.

1 Like