Element Development¶
This guide covers the full range of patterns for writing custom SGN-TS elements. For a gentler introduction, see the Writing Custom Elements tutorial.
Element Lifecycle¶
Every element follows this lifecycle during construction:
__post_init__()— dataclass initialization. Callsuper().__post_init__().configure()— one-time setup (store pad references, set adapter config).validate()— check pad counts and configuration.
During execution, each iteration calls:
pull()— receives frames on sink pads (handled byTimeSeriesMixin).internal()— aligns frames, then callsprocess().new()— returns output frames on source pads (handled automatically).
You typically only implement configure(), validate(), and process().
The process() Method¶
Transform Signature¶
The full process() signature for transforms receives dictionaries of input
and output frames keyed by pad:
from sgnts.base import TSCollectFrame, TSFrame, TSTransform
@dataclass
class MyTransform(TSTransform):
def process(
self,
input_frames: dict[SinkPad, TSFrame],
output_frames: dict[SourcePad, TSCollectFrame],
) -> None:
for pad, frame in input_frames.items():
...
Sink Signature¶
Sinks receive only inputs (no output frames):
from sgnts.base import TSFrame, TSSink
@dataclass
class MySink(TSSink):
def process(self, input_frames: dict[SinkPad, TSFrame]) -> None:
for pad, frame in input_frames.items():
if frame.EOS:
self.mark_eos(pad)
Output Frame Construction¶
Transforms don't create output frames directly. Instead, each output pad
receives a TSCollectFrame — a validated collector that builds a TSFrame
incrementally. You append buffers to it, and on close it validates that the
buffers are contiguous and span the expected offset range before committing
them to the underlying frame.
In process(), the collector is closed automatically after your method
returns:
def process(self, input_frames, output_frames):
pad, frame = next(iter(input_frames.items()))
out_pad, collector = next(iter(output_frames.items()))
for buf in frame:
collector.append(buf.copy(data=transform(buf.data)))
# collector.close() is called automatically
Key rules:
append(buf)— adds a buffer to the collector. Each buffer must be contiguous with the previous one and fall within the frame's offset range.close()— validates the full span and commits buffers to the parent frame. Called automatically inprocess(), so you rarely need to call it yourself.- Gap buffers — if a portion of the input is a gap, append the gap buffer unchanged. The collector accepts gap buffers just like data buffers.
Outside of process(), you can use TSCollectFrame via the TSFrame.fill()
context manager:
from sgnts.base import TSFrame
frame = TSFrame(offset=0, noffset=16384)
with frame.fill() as collector:
collector.append(buf1)
collector.append(buf2)
# frame now contains buf1 and buf2
This pattern is useful in tests or when constructing frames manually. The
context manager calls close() on exit and rolls back if an exception
occurs.
Decorators¶
Decorators simplify process() signatures for common pad configurations.
@transform.one_to_one¶
For transforms with exactly one sink pad and one source pad. Unwraps the dictionaries so you work with a single frame and collector:
from sgnts.decorators import transform
@dataclass
class MyTransform(TSTransform):
@transform.one_to_one
def process(self, input_frame: TSFrame, output_frame: TSCollectFrame) -> None:
for buf in input_frame:
output_frame.append(buf)
@transform.many_to_one¶
For transforms with multiple sink pads but one source pad. Keeps the input dictionary but unwraps the single output:
@dataclass
class Combiner(TSTransform):
@transform.many_to_one
def process(
self,
input_frames: dict[SinkPad, TSFrame],
output_frame: TSCollectFrame,
) -> None:
# Combine inputs into single output
...
@sink.single_pad¶
For sinks with exactly one sink pad:
from sgnts.decorators import sink
@dataclass
class MySink(TSSink):
@sink.single_pad
def process(self, input_frame: TSFrame) -> None:
if input_frame.EOS:
self.mark_eos(self.sink_pads[0])
Validation¶
Use @validator decorators on the validate() method. These are from the
sgn.validator module and check pad counts at construction time:
from sgn import validator
@dataclass
class MyTransform(TSTransform):
@validator.one_to_one
def validate(self) -> None:
pass # Decorator checks pad counts
Available validators:
| Decorator | Constraint |
|---|---|
@validator.one_to_one |
1 sink, 1 source |
@validator.one_to_many |
1 sink, any sources |
@validator.many_to_one |
Any sinks, 1 source |
@validator.single_pad |
1 sink (for sinks) |
@validator.pad_names_match |
Sink and source names match |
@validator.num_pads(sink_pads=N, source_pads=M) |
Exact counts |
Add custom assertions in the validate() body:
@dataclass(kw_only=True)
class Gate(TSTransform):
control: str
@validator.num_pads(sink_pads=2, source_pads=1)
def validate(self) -> None:
assert self.control in self.sink_pad_names, (
f"Control pad '{self.control}' not in sink pads"
)
Configuring the Audio Adapter¶
Override configure() to set up AdapterConfig for streaming transforms that
need overlap, stride, or latency compensation. Use the builder methods
alignment() and on_startup():
from sgnts.base import Offset, TSTransform
@dataclass(kw_only=True)
class MyFilter(TSTransform):
sample_rate: int = 2048
kernel_size: int = 128
def configure(self) -> None:
self.adapter_config.alignment(
overlap=(Offset.fromsamples(self.kernel_size - 1, self.sample_rate), 0),
)
With stride and latency compensation:
def configure(self) -> None:
self.adapter_config.alignment(
overlap=(Offset.fromsamples(self.kernel_size - 1, self.sample_rate), 0),
stride=Offset.fromsamples(1024, self.sample_rate),
shift=-Offset.fromsamples(self.latency, self.sample_rate),
)
self.adapter_config.on_startup(pad_zeros=True)
Overriding internal() Directly¶
For complex transforms where the process() abstraction doesn't fit, override
internal() instead. Call super().internal() first to trigger alignment,
then access frames via self.next_input() / self.next_output():
@dataclass
class CustomTransform(TSTransform):
def internal(self) -> None:
super().internal()
_, input_frame = self.next_input()
_, output_collector = self.next_output()
for buf in input_frame:
output_collector.append(buf)
output_collector.close() # Must close manually
When overriding internal(), you must call .close() on collectors yourself —
the automatic close only applies to process().
Wrapping SGN Elements with make_ts_element()¶
make_ts_element() adds time-series alignment to an existing SGN element. Use
it when you want an SGN element (e.g., CollectSink) to work in a TS pipeline
with frame alignment:
from dataclasses import dataclass
from sgn import CollectSink
from sgnts.base.base import make_ts_element
@dataclass
class MyTSSink(make_ts_element(CollectSink)):
def __post_init__(self):
self.extract_data = False
super().__post_init__()
The factory creates a class that:
- Inherits from both
TimeSeriesMixinand the SGN element - Queues incoming frames for alignment via
pull() - Passes aligned frames to the original element's logic in
internal()
This is how TSFrameCollectSink is implemented in SGN-TS.
Writing a Resource Source¶
TSResourceSource is the base class for sources that acquire data from an
external resource (files, network streams, hardware devices). It runs data
generation in a separate worker thread and feeds buffers into the pipeline
via a queue.
Subclass it and override worker_process():
from dataclasses import dataclass
from sgnts.base import TSResourceSource, SeriesBuffer, Offset
@dataclass(kw_only=True)
class MyDataSource(TSResourceSource):
device: str = "/dev/data0"
def worker_process(self, context) -> None:
"""Runs in a worker thread. Send (pad, buffer) pairs to the output queue."""
pad = self.srcs[self.source_pad_names[0]]
while not context.should_stop():
# Acquire data from your external resource
timestamp, data = read_from_device(self.device)
buf = SeriesBuffer(
offset=Offset.fromsec(timestamp),
sample_rate=2048,
data=data,
)
context.output_queue.put((pad, buf))
Key points:
- All parameters needed by the worker must be instance attributes (they are extracted automatically before the worker starts).
- Use
context.should_stop()to check for shutdown. - Use
context.output_queue.put((pad, buf))to send data to the pipeline. - Exceptions in the worker are caught by the framework and cause graceful termination.
- Set
start,end, ordurationto control the source's time range.
Tips¶
Always handle gaps. Check buf.is_gap before accessing buf.data —
gap buffers have data=None:
for buf in input_frame:
if buf.is_gap:
output_frame.append(buf) # Pass gap through
continue
output_frame.append(buf.copy(data=transform(buf.data)))
Use buf.copy() instead of creating new buffers. It preserves offset,
sample rate, and shape metadata while letting you replace specific fields.
Use configure() for pad-dependent setup. Pads are not available during
__init__ — use configure() to store pad references or compute derived
values from pad configuration:
def configure(self) -> None:
self.sink_pad = self.sink_pads[0]
self.source_pad = self.source_pads[0]
Mark EOS in sinks. Every sink must call self.mark_eos(pad) when it
receives an EOS frame, or the pipeline will never stop.