Skip to content

Element Development

This guide covers the full range of patterns for writing custom SGN-TS elements. For a gentler introduction, see the Writing Custom Elements tutorial.

Element Lifecycle

Every element follows this lifecycle during construction:

  1. __post_init__() — dataclass initialization. Call super().__post_init__().
  2. configure() — one-time setup (store pad references, set adapter config).
  3. validate() — check pad counts and configuration.

During execution, each iteration calls:

  1. pull() — receives frames on sink pads (handled by TimeSeriesMixin).
  2. internal() — aligns frames, then calls process().
  3. new() — returns output frames on source pads (handled automatically).

You typically only implement configure(), validate(), and process().

The process() Method

Transform Signature

The full process() signature for transforms receives dictionaries of input and output frames keyed by pad:

from sgnts.base import TSCollectFrame, TSFrame, TSTransform

@dataclass
class MyTransform(TSTransform):
    def process(
        self,
        input_frames: dict[SinkPad, TSFrame],
        output_frames: dict[SourcePad, TSCollectFrame],
    ) -> None:
        for pad, frame in input_frames.items():
            ...

Sink Signature

Sinks receive only inputs (no output frames):

from sgnts.base import TSFrame, TSSink

@dataclass
class MySink(TSSink):
    def process(self, input_frames: dict[SinkPad, TSFrame]) -> None:
        for pad, frame in input_frames.items():
            if frame.EOS:
                self.mark_eos(pad)

Output Frame Construction

Transforms don't create output frames directly. Instead, each output pad receives a TSCollectFrame — a validated collector that builds a TSFrame incrementally. You append buffers to it, and on close it validates that the buffers are contiguous and span the expected offset range before committing them to the underlying frame.

In process(), the collector is closed automatically after your method returns:

def process(self, input_frames, output_frames):
    pad, frame = next(iter(input_frames.items()))
    out_pad, collector = next(iter(output_frames.items()))

    for buf in frame:
        collector.append(buf.copy(data=transform(buf.data)))
    # collector.close() is called automatically

Key rules:

  • append(buf) — adds a buffer to the collector. Each buffer must be contiguous with the previous one and fall within the frame's offset range.
  • close() — validates the full span and commits buffers to the parent frame. Called automatically in process(), so you rarely need to call it yourself.
  • Gap buffers — if a portion of the input is a gap, append the gap buffer unchanged. The collector accepts gap buffers just like data buffers.

Outside of process(), you can use TSCollectFrame via the TSFrame.fill() context manager:

from sgnts.base import TSFrame

frame = TSFrame(offset=0, noffset=16384)
with frame.fill() as collector:
    collector.append(buf1)
    collector.append(buf2)
# frame now contains buf1 and buf2

This pattern is useful in tests or when constructing frames manually. The context manager calls close() on exit and rolls back if an exception occurs.

Decorators

Decorators simplify process() signatures for common pad configurations.

@transform.one_to_one

For transforms with exactly one sink pad and one source pad. Unwraps the dictionaries so you work with a single frame and collector:

from sgnts.decorators import transform

@dataclass
class MyTransform(TSTransform):
    @transform.one_to_one
    def process(self, input_frame: TSFrame, output_frame: TSCollectFrame) -> None:
        for buf in input_frame:
            output_frame.append(buf)

@transform.many_to_one

For transforms with multiple sink pads but one source pad. Keeps the input dictionary but unwraps the single output:

@dataclass
class Combiner(TSTransform):
    @transform.many_to_one
    def process(
        self,
        input_frames: dict[SinkPad, TSFrame],
        output_frame: TSCollectFrame,
    ) -> None:
        # Combine inputs into single output
        ...

@sink.single_pad

For sinks with exactly one sink pad:

from sgnts.decorators import sink

@dataclass
class MySink(TSSink):
    @sink.single_pad
    def process(self, input_frame: TSFrame) -> None:
        if input_frame.EOS:
            self.mark_eos(self.sink_pads[0])

Validation

Use @validator decorators on the validate() method. These are from the sgn.validator module and check pad counts at construction time:

from sgn import validator

@dataclass
class MyTransform(TSTransform):
    @validator.one_to_one
    def validate(self) -> None:
        pass  # Decorator checks pad counts

Available validators:

Decorator Constraint
@validator.one_to_one 1 sink, 1 source
@validator.one_to_many 1 sink, any sources
@validator.many_to_one Any sinks, 1 source
@validator.single_pad 1 sink (for sinks)
@validator.pad_names_match Sink and source names match
@validator.num_pads(sink_pads=N, source_pads=M) Exact counts

Add custom assertions in the validate() body:

@dataclass(kw_only=True)
class Gate(TSTransform):
    control: str

    @validator.num_pads(sink_pads=2, source_pads=1)
    def validate(self) -> None:
        assert self.control in self.sink_pad_names, (
            f"Control pad '{self.control}' not in sink pads"
        )

Configuring the Audio Adapter

Override configure() to set up AdapterConfig for streaming transforms that need overlap, stride, or latency compensation. Use the builder methods alignment() and on_startup():

from sgnts.base import Offset, TSTransform

@dataclass(kw_only=True)
class MyFilter(TSTransform):
    sample_rate: int = 2048
    kernel_size: int = 128

    def configure(self) -> None:
        self.adapter_config.alignment(
            overlap=(Offset.fromsamples(self.kernel_size - 1, self.sample_rate), 0),
        )

With stride and latency compensation:

def configure(self) -> None:
    self.adapter_config.alignment(
        overlap=(Offset.fromsamples(self.kernel_size - 1, self.sample_rate), 0),
        stride=Offset.fromsamples(1024, self.sample_rate),
        shift=-Offset.fromsamples(self.latency, self.sample_rate),
    )
    self.adapter_config.on_startup(pad_zeros=True)

Overriding internal() Directly

For complex transforms where the process() abstraction doesn't fit, override internal() instead. Call super().internal() first to trigger alignment, then access frames via self.next_input() / self.next_output():

@dataclass
class CustomTransform(TSTransform):
    def internal(self) -> None:
        super().internal()

        _, input_frame = self.next_input()
        _, output_collector = self.next_output()

        for buf in input_frame:
            output_collector.append(buf)

        output_collector.close()  # Must close manually

When overriding internal(), you must call .close() on collectors yourself — the automatic close only applies to process().

Wrapping SGN Elements with make_ts_element()

make_ts_element() adds time-series alignment to an existing SGN element. Use it when you want an SGN element (e.g., CollectSink) to work in a TS pipeline with frame alignment:

from dataclasses import dataclass
from sgn import CollectSink
from sgnts.base.base import make_ts_element

@dataclass
class MyTSSink(make_ts_element(CollectSink)):
    def __post_init__(self):
        self.extract_data = False
        super().__post_init__()

The factory creates a class that:

  • Inherits from both TimeSeriesMixin and the SGN element
  • Queues incoming frames for alignment via pull()
  • Passes aligned frames to the original element's logic in internal()

This is how TSFrameCollectSink is implemented in SGN-TS.

Writing a Resource Source

TSResourceSource is the base class for sources that acquire data from an external resource (files, network streams, hardware devices). It runs data generation in a separate worker thread and feeds buffers into the pipeline via a queue.

Subclass it and override worker_process():

from dataclasses import dataclass
from sgnts.base import TSResourceSource, SeriesBuffer, Offset

@dataclass(kw_only=True)
class MyDataSource(TSResourceSource):
    device: str = "/dev/data0"

    def worker_process(self, context) -> None:
        """Runs in a worker thread. Send (pad, buffer) pairs to the output queue."""
        pad = self.srcs[self.source_pad_names[0]]

        while not context.should_stop():
            # Acquire data from your external resource
            timestamp, data = read_from_device(self.device)

            buf = SeriesBuffer(
                offset=Offset.fromsec(timestamp),
                sample_rate=2048,
                data=data,
            )
            context.output_queue.put((pad, buf))

Key points:

  • All parameters needed by the worker must be instance attributes (they are extracted automatically before the worker starts).
  • Use context.should_stop() to check for shutdown.
  • Use context.output_queue.put((pad, buf)) to send data to the pipeline.
  • Exceptions in the worker are caught by the framework and cause graceful termination.
  • Set start, end, or duration to control the source's time range.

Tips

Always handle gaps. Check buf.is_gap before accessing buf.data — gap buffers have data=None:

for buf in input_frame:
    if buf.is_gap:
        output_frame.append(buf)  # Pass gap through
        continue
    output_frame.append(buf.copy(data=transform(buf.data)))

Use buf.copy() instead of creating new buffers. It preserves offset, sample rate, and shape metadata while letting you replace specific fields.

Use configure() for pad-dependent setup. Pads are not available during __init__ — use configure() to store pad references or compute derived values from pad configuration:

def configure(self) -> None:
    self.sink_pad = self.sink_pads[0]
    self.source_pad = self.source_pads[0]

Mark EOS in sinks. Every sink must call self.mark_eos(pad) when it receives an EOS frame, or the pipeline will never stop.