Skip to content

Drift Package With Trigger

Here you can see how to build a Drift Package with a trigger message for a time interval in Python.

python/examples/trigger_message_example.py
from google.protobuf.any_pb2 import Any
from google.protobuf.timestamp_pb2 import Timestamp

from drift_protocol.common import DriftPackage, StatusCode
from drift_protocol.trigger_service import IntervalTriggerMessage

if __name__ == "__main__":
    pb_time = Timestamp()
    pb_time.GetCurrentTime()

    # Create a package and serialize it
    original = DriftPackage()
    original.id = pb_time.ToMilliseconds()
    original.source_timestamp.CopyFrom(pb_time)
    original.publish_timestamp.CopyFrom(pb_time)
    original.status = StatusCode.GOOD

    # Trigger message for 1 second interval
    trigger = IntervalTriggerMessage()
    trigger.start_timestamp.FromMilliseconds(pb_time.ToMilliseconds() - 1_000_000)
    trigger.stop_timestamp.CopyFrom(pb_time)

    msg = Any()
    msg.Pack(trigger)
    original.data.append(msg)

    # Serialize package to message
    message = original.SerializeToString()

    # Parse the package
    new_pacakge = DriftPackage()
    new_pacakge.ParseFromString(message)
    print(f"Package ID={new_pacakge.id}")

    trigger = IntervalTriggerMessage()
    new_pacakge.data[0].Unpack(trigger)
    print(
        f"Trigger interval from from {trigger.start_timestamp.ToDatetime()} to {trigger.stop_timestamp.ToDatetime()}"
    )