Iterative recording of real-time data over long time periods


#1

Dear All,

I would like to record live time series data from sensors over long periods of time with Python. I have tried and was able to record a few lines of data. But as soon as I try to record longer, it fails. I think my approach is somehow wrong.

Please advise how to iteratively record live time series data for a few days at least.

import struct
import h5py
import pandas as pd
import numpy as np
import time
from pySerialTransfer import pySerialTransfer as txfer
import numpy.lib.recfunctions as rf

bufLen = 75 
counter = 10 #Here, for example, it fails at counter number 75
hdfFile = 'dtype.hdf5'
dataTypes = np.dtype([('ts', '<u8'), ('r', '<i2'), ('x', '<f4'), ('y', '<f4'), ('z', '<f4')])

if __name__ == '__main__':
    try:
        link = txfer.SerialTransfer('COM3', 460800)
        link.open()
        time.sleep(3)

        with h5py.File(hdfFile, 'w') as hf:
            ''' Create seperate datasets for each variable '''
            for i in range(len(dataTypes)):
                hf.create_dataset(dataTypes.names[i], (0,), dtype=dataTypes[i], chunks=True, compression="gzip", maxshape=(None,))

            ''' Write data in HDF5 file '''
            # Test write counter
            for i in range(counter):
            #while(True):
                dataList = [[0 for x in range(4)] for x in range(bufLen)]
                # Collect data into buffer
                for e in range(bufLen):
                    # Append values from serial port
                    if link.available():
                        dataList[e][0] = struct.unpack('h', bytes(link.rxBuff[0:2]))[0]
                        dataList[e][1] = struct.unpack('f', bytes(link.rxBuff[2:6]))[0]
                        dataList[e][2] = struct.unpack('f', bytes(link.rxBuff[6:10]))[0]
                        dataList[e][3] = struct.unpack('f', bytes(link.rxBuff[10:14]))[0]
                        dataList[e].insert(0, int(time.time() * 10**7))
                    elif link.status < 0:
                        print('ERROR: {}'.format(link.status))
                # Create compound data frame
                structData = rf.unstructured_to_structured(np.array(dataList), dataTypes)
                df = pd.DataFrame(structData)
                # Write data frame columns to HDF5 file in separate datasets
                for colName in df.columns:
                    hf[colName].resize((hf[colName].shape[0] + len(df.index),))
                    hf[colName][-len(df.index):] = df.loc[:, colName]

    except KeyboardInterrupt:
        link.close()

P.S. You can ignore the part related to serial communication.


#2

How does it fail?

As someone who has written a fair amount of “roundabout” code, I recognize my handwriting! The unstructured_to_structured and DataFrame are completely superfluous and inefficient. You’re writing separate datasets (columns), right? Maybe start with writing one column and see what, if any, problems there might be?

G.


#3

Here is the link to my recent C++ prsentation. I tend to organise datasets for HFT irregular time series as compound datatype streams per day, roughly 30 000 000 events per market. The 2GB in original Pcap or packet capture format usually stored in compressed chunks. Then normalize it to RTS or regular time series with different intervals.

Hope it helps
Steve


#4

:+1: :+1: for the marker and whiteboard. G.


#5

Yeah, I should include it also:

Traceback (most recent call last):
  File ".\test.py", line 41, in <module>
    structData = rf.unstructured_to_structured(np.array(dataList), dataTypes)
  File "<__array_function__ internals>", line 5, in unstructured_to_structured
  File "C:\Users\adam.insanoff\AppData\Local\Programs\Python\Python38\lib\site-packages\numpy\lib\recfunctions.py", line 1090, in unstructured_to_structured
    raise ValueError('The length of the last dimension of arr must '
ValueError: The length of the last dimension of arr must be equal to the number of fields in dtype

Thank you Mr. Heber. Yes, columns. I also thought that unstructured_to_structured will be inefficient. Now I am working on it to simplify.
I think it is because of the timestamp: dataList[e].insert(0, int(time.time() * 10**7)). It would not work either. I eliminated it, and it become more stable. But now the problem is that after certain period it start writing zeros instead of values.

I will try to study a bit and come back later.


#6

Well, I made everything unnecessarily too complicated.
Here is the crude but working code:

import struct
import h5py
import pandas as pd
import numpy as np
from pySerialTransfer import pySerialTransfer as txfer
import time

buffLen = 1000 # Length of single batch
hdfFile = 'dtype.hdf5'
dataTypes = ['<i2', '<f4', '<f4', '<f4']
dataSets = ['r', 'x', 'y', 'z']

if __name__ == '__main__':
    try:
        # Define the serial connection
        link = txfer.SerialTransfer('COM3', 460800)
        # Connect to port and check
        if link.open():
            # Wait serial to start
            time.sleep(3)
            # Create an HDF5 file
            with h5py.File(hdfFile, 'w') as hf:
                # Create HDF5 datasets for all sensors
                for i in range(len(dataSets)):
                    hf.create_dataset(dataSets[i], (0,), dtype=dataTypes[i], chunks=True, compression="gzip", maxshape=(None,))

                # Write N batches of data
                while True:
                    # Batch buffer
                    buffer_r = []
                    buffer_x = []
                    buffer_y = []
                    buffer_z = []
                    # Fill the buffer with lists of values
                    for count in range(buffLen):
                        while len(buffer_r) < buffLen:
                            if link.available():
                                buffer_r.append(struct.unpack('h', bytes(link.rxBuff[0:2]))[0])
                                buffer_x.append(struct.unpack('f', bytes(link.rxBuff[2:6]))[0])
                                buffer_y.append(struct.unpack('f', bytes(link.rxBuff[6:10]))[0])
                                buffer_z.append(struct.unpack('f', bytes(link.rxBuff[10:14]))[0])
                            elif link.status < 0:
                                print('ERROR: {}'.format(link.status))
                            else:
                                continue
                    # Write buffer data frame to HDF5 file
                    hf["r"].resize((hf["r"].shape[0] + buffLen,))
                    hf["r"][-buffLen:] = buffer_r

                    hf["x"].resize((hf["x"].shape[0] + buffLen,))
                    hf["x"][-buffLen:] = buffer_x

                    hf["y"].resize((hf["y"].shape[0] + buffLen,))
                    hf["y"][-buffLen:] = buffer_y

                    hf["z"].resize((hf["z"].shape[0] + buffLen,))
                    hf["z"][-buffLen:] = buffer_z

    except KeyboardInterrupt:
        link.close()
    except:
        import traceback
        traceback.print_exc()

        link.close()


#7

Congratulations and thank you for posting the solution. Best, G.