Help Needed For Reading HDF5 files from AWS S3


#1

We are trying to read h5/ hdf5 files stored in S3 using the sample connector/ programs provided in https://www.hdfgroup.org/downloads/hdf5-enterprise-support/hdf5-connector-for-apache-spark/hdf-connector-for-apache-spark/ but getting the exception saying ch.systemsx.cisd.hdf5.exceptions.HDF5FileNotFoundException: Path does not exit. (/hd5_files/test.h5) at ch.systemsx.cisd.hdf5.HDF5BaseReader.openFile(HDF5BaseReader.java:221) at ch.systemsx.cisd.hdf5.HDF5BaseReader.<init>(HDF5BaseReader.java:177) at ch.systemsx.cisd.hdf5.HDF5BaseReader.<init>(HDF5BaseReader.java:155) at ch.systemsx.cisd.hdf5.HDF5ReaderConfigurator.reader(HDF5ReaderConfigurator.java:81) at ch.systemsx.cisd.hdf5.HDF5FactoryProvider$HDF5Factory.openForReading(HDF5FactoryProvider.java:55) at org.hdfgroup.spark.hdf5.reader.HDF5Reader.<init>(HDF5Reader.scala:26) at org.hdfgroup.spark.hdf5.ScanExecutor.openReader(ScanExecutor.scala:69) at org.hdfgroup.spark.hdf5.HDF5Relation$$anonfun$datasets$5.apply(HDF5Relation.scala:139) at org.hdfgroup.spark.hdf5.HDF5Relation$$anonfun$datasets$5.apply(HDF5Relation.scala:137) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.Map$Map2.foreach(Map.scala:137) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.hdfgroup.spark.hdf5.HDF5Relation.datasets$lzycompute(HDF5Relation.scala:137) at org.hdfgroup.spark.hdf5.HDF5Relation.datasets(HDF5Relation.scala:96) at org.hdfgroup.spark.hdf5.HDF5Relation.hdf5Schema$lzycompute(HDF5Relation.scala:153) at org.hdfgroup.spark.hdf5.HDF5Relation.hdf5Schema(HDF5Relation.scala:150) at org.hdfgroup.spark.hdf5.HDF5Relation.schema(HDF5Relation.scala:159) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:403) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748).
Also below is the python script which we tried executing from EMR cluster running “Spark 2.4.4” and has “Scala 2.11.12” ((OpenJDK 64-Bit Server VM, Java 1.8.0_242). Without changing the defaultFS and other params, we tried setting the ‘path’ to the s3 folder throws
IllegalArgumentException: u'Wrong FS: s3a://mthirani/hd5_files, expected: hdfs://ip-172-31-20-58.us-east-2.compute.internal:8020 so we tried setting the accesskey (fs.s3n.access.key) and secretkey (fs.s3n.secret.key) and “defaultFS” to one of the following cases but neither worked.

  • s3a://mthirani
  • s3://mthirani
  • s3://
  • s3:/
  • s3:///

Meanwhile we also tried reading the files from local storage in EMR cluster from the same program which was successful but we need to change the “defaultFS” to “file:/”.

Python Script for reading from S3:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import SQLContext

def main():
    print("Started...")

    sconf = SparkConf().setAppName("SparkToSF_SparkConfAppName")
    sc = SparkContext(appName="SparkToSF_SparkContextAppName", conf=sconf)
    sc._jsc.hadoopConfiguration().set("fs.defaultFS", "s3a://mthirani/")
    print(sc._jsc.hadoopConfiguration().get("fs.defaultFS"))
    sc._jsc.hadoopConfiguration().set("fs.s3n.access.key", "XXXXXX")
    sc._jsc.hadoopConfiguration().set("fs.s3n.secret.key", "XXXXXX")
    # sc._jsc.hadoopConfiguration().set("fs.s3n.endpoint", "s3a://mthirani/")
    # sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark = SQLContext(sc)

    # options
    # HDF5_SOURCE_NAME = 'gov.llnl.spark.hdf'
    HDF5_SOURCE_NAME = 'org.hdfgroup.spark.hdf5'
    opts = {
        'path': '/hd5_files',
        'dataset': 'sparky://files',
        'extension': 'hdf5, h5'
    }
    
    df = spark.read.format(HDF5_SOURCE_NAME).options(**opts).load()
    df.show()

    opts2 = {
        'path': '/hd5_files',
        'dataset': '/multi',
        'extension': 'hdf5, h5'
    }

    df2 = spark.read.format(HDF5_SOURCE_NAME).options(**opts2).load()
    df2.show()

    print("Ended")

if __name__ == "__main__":
    main()