I would like to record live time series data from sensors over long periods of time with Python. I have tried and was able to record a few lines of data. But as soon as I try to record longer, it fails. I think my approach is somehow wrong.
Please advise how to iteratively record live time series data for a few days at least.
import struct
import h5py
import pandas as pd
import numpy as np
import time
from pySerialTransfer import pySerialTransfer as txfer
import numpy.lib.recfunctions as rf
bufLen = 75
counter = 10 #Here, for example, it fails at counter number 75
hdfFile = 'dtype.hdf5'
dataTypes = np.dtype([('ts', '<u8'), ('r', '<i2'), ('x', '<f4'), ('y', '<f4'), ('z', '<f4')])
if __name__ == '__main__':
try:
link = txfer.SerialTransfer('COM3', 460800)
link.open()
time.sleep(3)
with h5py.File(hdfFile, 'w') as hf:
''' Create seperate datasets for each variable '''
for i in range(len(dataTypes)):
hf.create_dataset(dataTypes.names[i], (0,), dtype=dataTypes[i], chunks=True, compression="gzip", maxshape=(None,))
''' Write data in HDF5 file '''
# Test write counter
for i in range(counter):
#while(True):
dataList = [[0 for x in range(4)] for x in range(bufLen)]
# Collect data into buffer
for e in range(bufLen):
# Append values from serial port
if link.available():
dataList[e][0] = struct.unpack('h', bytes(link.rxBuff[0:2]))[0]
dataList[e][1] = struct.unpack('f', bytes(link.rxBuff[2:6]))[0]
dataList[e][2] = struct.unpack('f', bytes(link.rxBuff[6:10]))[0]
dataList[e][3] = struct.unpack('f', bytes(link.rxBuff[10:14]))[0]
dataList[e].insert(0, int(time.time() * 10**7))
elif link.status < 0:
print('ERROR: {}'.format(link.status))
# Create compound data frame
structData = rf.unstructured_to_structured(np.array(dataList), dataTypes)
df = pd.DataFrame(structData)
# Write data frame columns to HDF5 file in separate datasets
for colName in df.columns:
hf[colName].resize((hf[colName].shape[0] + len(df.index),))
hf[colName][-len(df.index):] = df.loc[:, colName]
except KeyboardInterrupt:
link.close()
P.S. You can ignore the part related to serial communication.
As someone who has written a fair amount of “roundabout” code, I recognize my handwriting! The unstructured_to_structured and DataFrame are completely superfluous and inefficient. You’re writing separate datasets (columns), right? Maybe start with writing one column and see what, if any, problems there might be?
Here is the link to my recent C++ prsentation. I tend to organise datasets for HFT irregular time series as compound datatype streams per day, roughly 30 000 000 events per market. The 2GB in original Pcap or packet capture format usually stored in compressed chunks. Then normalize it to RTS or regular time series with different intervals.
Traceback (most recent call last):
File ".\test.py", line 41, in <module>
structData = rf.unstructured_to_structured(np.array(dataList), dataTypes)
File "<__array_function__ internals>", line 5, in unstructured_to_structured
File "C:\Users\adam.insanoff\AppData\Local\Programs\Python\Python38\lib\site-packages\numpy\lib\recfunctions.py", line 1090, in unstructured_to_structured
raise ValueError('The length of the last dimension of arr must '
ValueError: The length of the last dimension of arr must be equal to the number of fields in dtype
Thank you Mr. Heber. Yes, columns. I also thought that unstructured_to_structured will be inefficient. Now I am working on it to simplify.
I think it is because of the timestamp: dataList[e].insert(0, int(time.time() * 10**7)). It would not work either. I eliminated it, and it become more stable. But now the problem is that after certain period it start writing zeros instead of values.
Well, I made everything unnecessarily too complicated.
Here is the crude but working code:
import struct
import h5py
import pandas as pd
import numpy as np
from pySerialTransfer import pySerialTransfer as txfer
import time
buffLen = 1000 # Length of single batch
hdfFile = 'dtype.hdf5'
dataTypes = ['<i2', '<f4', '<f4', '<f4']
dataSets = ['r', 'x', 'y', 'z']
if __name__ == '__main__':
try:
# Define the serial connection
link = txfer.SerialTransfer('COM3', 460800)
# Connect to port and check
if link.open():
# Wait serial to start
time.sleep(3)
# Create an HDF5 file
with h5py.File(hdfFile, 'w') as hf:
# Create HDF5 datasets for all sensors
for i in range(len(dataSets)):
hf.create_dataset(dataSets[i], (0,), dtype=dataTypes[i], chunks=True, compression="gzip", maxshape=(None,))
# Write N batches of data
while True:
# Batch buffer
buffer_r = []
buffer_x = []
buffer_y = []
buffer_z = []
# Fill the buffer with lists of values
for count in range(buffLen):
while len(buffer_r) < buffLen:
if link.available():
buffer_r.append(struct.unpack('h', bytes(link.rxBuff[0:2]))[0])
buffer_x.append(struct.unpack('f', bytes(link.rxBuff[2:6]))[0])
buffer_y.append(struct.unpack('f', bytes(link.rxBuff[6:10]))[0])
buffer_z.append(struct.unpack('f', bytes(link.rxBuff[10:14]))[0])
elif link.status < 0:
print('ERROR: {}'.format(link.status))
else:
continue
# Write buffer data frame to HDF5 file
hf["r"].resize((hf["r"].shape[0] + buffLen,))
hf["r"][-buffLen:] = buffer_r
hf["x"].resize((hf["x"].shape[0] + buffLen,))
hf["x"][-buffLen:] = buffer_x
hf["y"].resize((hf["y"].shape[0] + buffLen,))
hf["y"][-buffLen:] = buffer_y
hf["z"].resize((hf["z"].shape[0] + buffLen,))
hf["z"][-buffLen:] = buffer_z
except KeyboardInterrupt:
link.close()
except:
import traceback
traceback.print_exc()
link.close()