Writing Custom Elements¶
This tutorial walks through writing custom time-series transforms and sinks. It assumes you've completed the first TS pipeline tutorial and understand buffers, frames, and pipeline connections.
A Simple Transform¶
The simplest transform takes one input and produces one output. Here's an element that squares every sample:
from dataclasses import dataclass
from sgn import validator
from sgnts.base import TSCollectFrame, TSFrame, TSTransform
from sgnts.decorators import transform
@dataclass
class Square(TSTransform):
@validator.one_to_one
def validate(self) -> None:
pass
@transform.one_to_one
def process(self, input_frame: TSFrame, output_frame: TSCollectFrame) -> None:
for buf in input_frame:
if not buf.is_gap:
buf = buf.copy(data=buf.data ** 2)
output_frame.append(buf)
Key points:
@dataclass— all elements are dataclasses.validate()— called at construction. The@validator.one_to_onedecorator checks that the element has exactly one sink pad and one source pad.process()— called each iteration with aligned input and output. The@transform.one_to_onedecorator unwraps the single input and output from their dictionaries, so you work with a singleTSFrameandTSCollectFramedirectly.buf.copy(data=...)— creates a new buffer with modified data but the same offset, sample rate, and shape.output_frame.append(buf)— adds the buffer to the output collector. The collector is closed automatically afterprocess()returns.
Use it in a pipeline:
from sgn import Pipeline
from sgnts.sources import FakeSeriesSource
from sgnts.sinks import TSFrameCollectSink
src = FakeSeriesSource(
name="src",
source_pad_names=["out"],
signal_type="const",
const=3.0,
rate=2048,
duration=1,
)
sq = Square(name="sq", sink_pad_names=["in"], source_pad_names=["out"])
snk = TSFrameCollectSink(name="snk", sink_pad_names=["out"])
Pipeline().connect(src, sq).connect(sq, snk).run()
frames = snk.out_frames()
for buf in frames["out"]:
if not buf.is_gap:
print(buf.data[:3]) # [9. 9. 9.]
A Simple Sink¶
Sinks consume data without producing output. Here's a sink that prints summary statistics:
from dataclasses import dataclass
import numpy as np
from sgn import validator
from sgnts.base import TSFrame, TSSink
from sgnts.decorators import sink
@dataclass
class StatsSink(TSSink):
@validator.single_pad
def validate(self) -> None:
pass
@sink.single_pad
def process(self, input_frame: TSFrame) -> None:
if input_frame.EOS:
self.mark_eos(self.sink_pads[0])
return
for buf in input_frame:
if not buf.is_gap:
print(f"offset={buf.offset} mean={np.mean(buf.data):.4f}")
The @sink.single_pad decorator unwraps the single input pad, so process()
receives a TSFrame directly. Call self.mark_eos() when EOS arrives to
signal the pipeline to stop.
Adding Parameters¶
Elements are dataclasses, so adding parameters is just adding fields:
@dataclass
class Scale(TSTransform):
factor: float = 1.0
offset: float = 0.0
@validator.one_to_one
def validate(self) -> None:
pass
@transform.one_to_one
def process(self, input_frame: TSFrame, output_frame: TSCollectFrame) -> None:
for buf in input_frame:
if not buf.is_gap:
buf = buf.copy(data=buf.data * self.factor + self.offset)
output_frame.append(buf)
scale = Scale(name="scale", factor=2.0, offset=-1.0, sink_pad_names=["in"], source_pad_names=["out"])
Using configure()¶
Override configure() for one-time setup that depends on pad configuration.
It runs after pads are created but before the pipeline starts:
@dataclass
class Normalizer(TSTransform):
"""Normalize each channel independently."""
def configure(self) -> None:
# Pad references are available here
self._running_sum = {}
self._count = {}
for pad in self.sink_pads:
self._running_sum[pad] = 0.0
self._count[pad] = 0
@validator.one_to_one
def validate(self) -> None:
pass
@transform.one_to_one
def process(self, input_frame: TSFrame, output_frame: TSCollectFrame) -> None:
for buf in input_frame:
if not buf.is_gap:
self._running_sum[self.sink_pads[0]] += np.sum(buf.data)
self._count[self.sink_pads[0]] += buf.data.size
mean = self._running_sum[self.sink_pads[0]] / self._count[self.sink_pads[0]]
buf = buf.copy(data=buf.data - mean)
output_frame.append(buf)
Next Steps¶
For more advanced patterns — many-to-one transforms, custom internal()
overrides, adapter configuration for streaming, and make_ts_element() — see
the Element Development guide.