Align Transform¶
The Align
transform ensures that frames from multiple sources are properly aligned in time.
Overview¶
Align
is a special transform that:
- Maps frames directly from its sink pads to the corresponding source pads
- Ensures that time-aligned frames from multiple sources stay in sync
- Passes through frames without modifying their content
Basic Usage¶
# Basic usage of Align (not tested by mkdocs)
"""
from sgnts.transforms import Align
from sgnts.sources import FakeSeriesSource
# Create two sources with the same rate
source1 = FakeSeriesSource(
rate=2048,
signal_type="sine",
fsin=10.0,
name="sine_source"
)
source2 = FakeSeriesSource(
rate=2048,
signal_type="white",
name="noise_source"
)
# Create an align transform with matching source and sink pad names
align = Align(
source_pad_names=["channel1", "channel2"],
sink_pad_names=["channel1", "channel2"],
name="aligner"
)
# Create and get pads
align_sink_pad1 = align.sink_pad_dict["aligner:snk:channel1"]
align_sink_pad2 = align.sink_pad_dict["aligner:snk:channel2"]
align_source_pad1 = align.source_pad_dict["aligner:src:channel1"]
align_source_pad2 = align.source_pad_dict["aligner:src:channel2"]
# Connect sources to align transform
source1.add_dest(align, align_sink_pad1)
source2.add_dest(align, align_sink_pad2)
# Connect align transform to downstream elements (not shown)
# Process data
for _ in range(5):
source1.process()
source2.process()
align.process()
"""
How Align Works¶
The Align
transform creates a direct mapping between its sink pads and source pads:
# Understanding Align (not tested by mkdocs)
"""
from sgnts.transforms import Align
# Create an align transform
align = Align(
source_pad_names=["signal1", "signal2"],
sink_pad_names=["signal1", "signal2"],
name="align_transform"
)
# During initialization, a pad map is created:
# self.pad_map = {
# source_pad1: sink_pad1,
# source_pad2: sink_pad2,
# }
# When new() is called for a source pad, it retrieves the frame
# from the corresponding sink pad and returns it unchanged
"""
Synchronizing Multiple Data Streams¶
A common use case for Align
is to synchronize multiple data streams before combining them:
# Synchronizing data streams (not tested by mkdocs)
"""
from sgnts.transforms import Align, Adder
from sgnts.sources import FakeSeriesSource
# Create two sources that might produce data at slightly different times
source1 = FakeSeriesSource(
rate=2048,
signal_type="sine",
fsin=10.0,
real_time=True,
name="sine_source"
)
source2 = FakeSeriesSource(
rate=2048,
signal_type="sine",
fsin=5.0,
real_time=True,
name="another_sine_source"
)
# Create an align transform
align = Align(
source_pad_names=["signal1", "signal2"],
sink_pad_names=["signal1", "signal2"],
name="aligner"
)
# Create an adder to combine the aligned signals
adder = Adder(name="signal_adder")
# Connect sources to aligner
# (Pad creation code omitted for brevity)
source1.add_dest(align, align.sink_pad_dict["aligner:snk:signal1"])
source2.add_dest(align, align.sink_pad_dict["aligner:snk:signal2"])
# Connect aligned outputs to adder
align.add_dest(adder, from_pad=align.source_pad_dict["aligner:src:signal1"])
align.add_dest(adder, from_pad=align.source_pad_dict["aligner:src:signal2"])
# Process data
for _ in range(10):
source1.process()
source2.process()
align.process()
adder.process()
"""
Using Align in Complex Pipelines¶
The Align
transform is particularly useful in complex pipelines where multiple branches need to be synchronized:
# Complex pipeline example (not tested by mkdocs)
"""
from sgnts.transforms import Align, AmplifyTransform
from sgnts.sources import FakeSeriesSource
from sgnts.sinks import DumpSeriesSink
# Create two sources
source1 = FakeSeriesSource(rate=2048, signal_type="sine", fsin=10.0)
source2 = FakeSeriesSource(rate=2048, signal_type="white")
# Create transforms for each branch
amplify1 = AmplifyTransform(factor=2.0, name="amplify1")
amplify2 = AmplifyTransform(factor=0.5, name="amplify2")
# Create an align transform
align = Align(
source_pad_names=["branch1", "branch2"],
sink_pad_names=["branch1", "branch2"],
name="aligner"
)
# Create sinks
sink1 = DumpSeriesSink(fname="aligned_branch1.txt", name="sink1")
sink2 = DumpSeriesSink(fname="aligned_branch2.txt", name="sink2")
# Connect the pipeline
source1.add_dest(amplify1)
source2.add_dest(amplify2)
amplify1.add_dest(align, align.sink_pad_dict["aligner:snk:branch1"])
amplify2.add_dest(align, align.sink_pad_dict["aligner:snk:branch2"])
align.add_dest(sink1, from_pad=align.source_pad_dict["aligner:src:branch1"])
align.add_dest(sink2, from_pad=align.source_pad_dict["aligner:src:branch2"])
# Process data
for _ in range(10):
source1.process()
source2.process()
amplify1.process()
amplify2.process()
align.process()
sink1.process()
sink2.process()
"""
Best Practices¶
When using Align
:
-
Match pad names - ensure that source_pad_names and sink_pad_names contain the same set of names
-
Keep track of pads - use the pad dictionaries to access the pads by name
-
Process in order - ensure that all upstream elements are processed before the align transform
-
Consider buffer sizes - all sources feeding into an align transform should produce compatible buffer sizes
-
Understand the pass-through nature - Align doesn't modify data, it just ensures synchronization