Skip to content

Writing Custom Elements

This tutorial walks through writing custom time-series transforms and sinks. It assumes you've completed the first TS pipeline tutorial and understand buffers, frames, and pipeline connections.

A Simple Transform

The simplest transform takes one input and produces one output. Here's an element that squares every sample:

from dataclasses import dataclass

from sgn import validator
from sgnts.base import TSCollectFrame, TSFrame, TSTransform
from sgnts.decorators import transform


@dataclass
class Square(TSTransform):

    @validator.one_to_one
    def validate(self) -> None:
        pass

    @transform.one_to_one
    def process(self, input_frame: TSFrame, output_frame: TSCollectFrame) -> None:
        for buf in input_frame:
            if not buf.is_gap:
                buf = buf.copy(data=buf.data ** 2)
            output_frame.append(buf)

Key points:

  • @dataclass — all elements are dataclasses.
  • validate() — called at construction. The @validator.one_to_one decorator checks that the element has exactly one sink pad and one source pad.
  • process() — called each iteration with aligned input and output. The @transform.one_to_one decorator unwraps the single input and output from their dictionaries, so you work with a single TSFrame and TSCollectFrame directly.
  • buf.copy(data=...) — creates a new buffer with modified data but the same offset, sample rate, and shape.
  • output_frame.append(buf) — adds the buffer to the output collector. The collector is closed automatically after process() returns.

Use it in a pipeline:

from sgn import Pipeline
from sgnts.sources import FakeSeriesSource
from sgnts.sinks import TSFrameCollectSink

src = FakeSeriesSource(
    name="src",
    source_pad_names=["out"],
    signal_type="const",
    const=3.0,
    rate=2048,
    duration=1,
)
sq = Square(name="sq", sink_pad_names=["in"], source_pad_names=["out"])
snk = TSFrameCollectSink(name="snk", sink_pad_names=["out"])

Pipeline().connect(src, sq).connect(sq, snk).run()

frames = snk.out_frames()
for buf in frames["out"]:
    if not buf.is_gap:
        print(buf.data[:3])  # [9. 9. 9.]

A Simple Sink

Sinks consume data without producing output. Here's a sink that prints summary statistics:

from dataclasses import dataclass

import numpy as np
from sgn import validator
from sgnts.base import TSFrame, TSSink
from sgnts.decorators import sink


@dataclass
class StatsSink(TSSink):

    @validator.single_pad
    def validate(self) -> None:
        pass

    @sink.single_pad
    def process(self, input_frame: TSFrame) -> None:
        if input_frame.EOS:
            self.mark_eos(self.sink_pads[0])
            return
        for buf in input_frame:
            if not buf.is_gap:
                print(f"offset={buf.offset} mean={np.mean(buf.data):.4f}")

The @sink.single_pad decorator unwraps the single input pad, so process() receives a TSFrame directly. Call self.mark_eos() when EOS arrives to signal the pipeline to stop.

Adding Parameters

Elements are dataclasses, so adding parameters is just adding fields:

@dataclass
class Scale(TSTransform):
    factor: float = 1.0
    offset: float = 0.0

    @validator.one_to_one
    def validate(self) -> None:
        pass

    @transform.one_to_one
    def process(self, input_frame: TSFrame, output_frame: TSCollectFrame) -> None:
        for buf in input_frame:
            if not buf.is_gap:
                buf = buf.copy(data=buf.data * self.factor + self.offset)
            output_frame.append(buf)
scale = Scale(name="scale", factor=2.0, offset=-1.0, sink_pad_names=["in"], source_pad_names=["out"])

Using configure()

Override configure() for one-time setup that depends on pad configuration. It runs after pads are created but before the pipeline starts:

@dataclass
class Normalizer(TSTransform):
    """Normalize each channel independently."""

    def configure(self) -> None:
        # Pad references are available here
        self._running_sum = {}
        self._count = {}
        for pad in self.sink_pads:
            self._running_sum[pad] = 0.0
            self._count[pad] = 0

    @validator.one_to_one
    def validate(self) -> None:
        pass

    @transform.one_to_one
    def process(self, input_frame: TSFrame, output_frame: TSCollectFrame) -> None:
        for buf in input_frame:
            if not buf.is_gap:
                self._running_sum[self.sink_pads[0]] += np.sum(buf.data)
                self._count[self.sink_pads[0]] += buf.data.size
                mean = self._running_sum[self.sink_pads[0]] / self._count[self.sink_pads[0]]
                buf = buf.copy(data=buf.data - mean)
            output_frame.append(buf)

Next Steps

For more advanced patterns — many-to-one transforms, custom internal() overrides, adapter configuration for streaming, and make_ts_element() — see the Element Development guide.