Skip to content

Drift Package With Time Series

Here you can see how to compress time series data with WaveletBuffer and build a Drift Package with meta information in Python.

python/examples/payload_with_timeseries_example.py
from pathlib import Path
import numpy as np

from drift_protocol.common import DriftPackage, DataPayload, StatusCode
from drift_protocol.meta import MetaInfo, TimeSeriesInfo
from wavelet_buffer import denoise, WaveletType, WaveletBuffer

from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.any_pb2 import Any

IMG_PATH = Path(__file__).parent / "pandas.jpg"

if __name__ == "__main__":
    pb_time = Timestamp()
    pb_time.GetCurrentTime()

    SIGNAL = np.array(
        [0.1, 0.2, 0.5, 0.1, 0.2, 0.1, 0.6, 0.1, 0.1, 0.2], dtype=np.float32
    )

    # Create a package and serialize it
    original = DriftPackage()
    original.id = pb_time.ToMilliseconds()
    original.source_timestamp.CopyFrom(pb_time)
    original.publish_timestamp.CopyFrom(pb_time)
    original.status = StatusCode.GOOD

    # Fill meta data
    info = TimeSeriesInfo()
    info.start_timestamp.FromMilliseconds(pb_time.ToMilliseconds() - 1_000_000)
    info.stop_timestamp.CopyFrom(pb_time)
    info.size = len(SIGNAL)
    info.first = SIGNAL[0]
    info.last = SIGNAL[-1]
    info.min = np.min(SIGNAL)
    info.max = np.max(SIGNAL)
    info.mean = np.mean(SIGNAL)

    original.meta.type = MetaInfo.TIME_SERIES
    original.meta.time_series_info.CopyFrom(info)

    # Decompose and compress signal
    buffer = WaveletBuffer(
        signal_shape=[len(SIGNAL)],
        signal_number=1,
        decomposition_steps=2,
        wavelet_type=WaveletType.DB1,
    )
    buffer.decompose(SIGNAL, denoise.Threshold(0, 0.1))

    # Prepare payload
    payload = DataPayload()
    payload.data = buffer.serialize(compression_level=16)

    msg = Any()
    msg.Pack(payload)
    original.data.append(msg)

    # Serialize package to message
    message = original.SerializeToString()

    # Parse the package
    new_pacakge = DriftPackage()
    new_pacakge.ParseFromString(message)
    print(
        f"Package ID={new_pacakge.id} type={MetaInfo.DataType.Name(new_pacakge.meta.type)}"
    )

    payload = DataPayload()
    new_pacakge.data[0].Unpack(payload)

    buffer = WaveletBuffer.parse(payload.data)
    print(f"WaveletBuffer {buffer.compose()}")