Skip to content

Your First TS Pipeline

This tutorial builds a time-series pipeline step by step. It assumes you've read SGN's first pipeline tutorial and understand elements, pads, frames, and connect().

SeriesBuffer

A SeriesBuffer is the atomic unit of time-series data: a chunk of uniformly sampled data with a precise time reference.

import numpy as np
from sgnts.base import SeriesBuffer

buf = SeriesBuffer(offset=0, sample_rate=2048, data=np.zeros(2048))
print(buf.offset)       # 0
print(buf.offset_end)   # 16384
print(buf.sample_rate)  # 2048
print(buf.shape)        # (2048,)

The offset is not a timestamp in seconds — it's a sample count at the maximum sample rate (16384 Hz by default). One second of data at any sample rate always spans 16384 offset units. This avoids floating-point rounding errors. See TimeSeries Concepts for the full rationale.

A gap buffer has data=None but still carries shape and timing metadata:

from sgnts.base import SeriesBuffer

gap = SeriesBuffer(offset=0, sample_rate=2048, shape=(2048,), data=None)
print(gap.is_gap)  # True

TSFrame

A TSFrame wraps one or more contiguous SeriesBuffer objects into a single frame. All buffers must be contiguous — no gaps in offset ranges:

import numpy as np
from sgnts.base import SeriesBuffer, TSFrame

buf1 = SeriesBuffer(offset=0, sample_rate=2048, data=np.ones(2048))
buf2 = SeriesBuffer(offset=16384, sample_rate=2048, data=np.ones(2048))
frame = TSFrame(buffers=[buf1, buf2])

print(frame.offset)      # 0
print(frame.end_offset)  # 32768
print(len(frame))        # 2 buffers

Iterate over the buffers in a frame:

for buf in frame:
    print(buf.offset, buf.shape)

A Source and Sink

FakeSeriesSource generates synthetic time-series data. TSFrameCollectSink collects output frames in memory for inspection.

from sgn import Pipeline
from sgnts.sources import FakeSeriesSource
from sgnts.sinks import TSFrameCollectSink

src = FakeSeriesSource(
    name="src",
    source_pad_names=["out"],
    signal_type="white",
    rate=2048,
    duration=2,
)
snk = TSFrameCollectSink(name="snk", sink_pad_names=["out"])

Pipeline().connect(src, snk).run()

# Inspect collected data
frames = snk.out_frames()
for pad_name, frame in frames.items():
    print(f"{pad_name}: {frame.offset} to {frame.end_offset}")

FakeSeriesSource supports several signal types: "white" (noise), "sin", "impulse", and "const". The duration parameter sets how many seconds of data to produce before sending EOS.

Adding a Transform

Insert an Amplify transform to scale the data:

from sgn import Pipeline
from sgnts.sources import FakeSeriesSource
from sgnts.transforms import Amplify
from sgnts.sinks import TSFrameCollectSink

src = FakeSeriesSource(
    name="src",
    source_pad_names=["out"],
    signal_type="const",
    const=2.0,
    rate=2048,
    duration=1,
)
amp = Amplify(name="amp", factor=5.0, sink_pad_names=["in"], source_pad_names=["out"])
snk = TSFrameCollectSink(name="snk", sink_pad_names=["out"])

Pipeline().connect(src, amp).connect(amp, snk).run()

frames = snk.out_frames()
for buf in frames["out"]:
    if not buf.is_gap:
        print(buf.data[:5])  # [10. 10. 10. 10. 10.]

The constant value 2.0 multiplied by the factor 5.0 produces 10.0 in every sample.

Multiple Channels

Sources can produce data on multiple pads. Use the signals dictionary to configure each channel independently:

from sgn import Pipeline
from sgnts.sources import FakeSeriesSource
from sgnts.sinks import TSFrameCollectSink

src = FakeSeriesSource(
    name="src",
    source_pad_names=["H1", "L1"],
    signals={
        "H1": {"signal_type": "sin", "rate": 2048},
        "L1": {"signal_type": "white", "rate": 4096},
    },
    duration=1,
)
snk = TSFrameCollectSink(name="snk", sink_pad_names=["H1", "L1"])

Pipeline().connect(src, snk).run()

frames = snk.out_frames()
for name, frame in frames.items():
    print(f"{name}: rate={frame.sample_rate}, samples per buffer={frame[0].shape}")

Each channel can have a different sample rate. The offset system ensures that one second of data always spans the same offset range (16384 units) regardless of sample rate, making cross-channel alignment possible.

Next Steps