Skip to content

sgnts.sinks.collect

TSFrameCollectSink dataclass

Bases: make_ts_element(CollectSink)


              flowchart TD
              sgnts.sinks.collect.TSFrameCollectSink[TSFrameCollectSink]

                              sgnts.base.base.make_ts_element --> sgnts.sinks.collect.TSFrameCollectSink
                


              click sgnts.sinks.collect.TSFrameCollectSink href "" "sgnts.sinks.collect.TSFrameCollectSink"
            

Sink that collects input SeriesBuffers

sgn.CollectSink with an additional method out_frames that will return a dictionary, keyed by sink pad names, where the values are single TSFrames containing all buffers collected on the sink pads during pipeline operation.

Source code in src/sgnts/sinks/collect.py
@dataclass
class TSFrameCollectSink(make_ts_element(CollectSink)):  # type: ignore[misc]
    """Sink that collects input SeriesBuffers

    sgn.CollectSink with an additional method `out_frames` that will
    return a dictionary, keyed by sink pad names, where the values are
    single TSFrames containing all buffers collected on the sink pads
    during pipeline operation.

    """

    def __post_init__(self):
        self.extract_data = False
        self.skip_empty = False
        super().__post_init__()

    def out_frames(self) -> dict[str, TSFrame]:
        """The collected frames."""
        out = {}
        for pad_name, frames in self.collects.items():
            buffers = []
            for frame in frames:
                # Filter out heartbeat buffers (zero-duration buffers)
                buffers.extend([buf for buf in frame.buffers if buf.duration > 0])
            out[pad_name] = TSFrame(buffers=buffers)
        return out

out_frames()

The collected frames.

Source code in src/sgnts/sinks/collect.py
def out_frames(self) -> dict[str, TSFrame]:
    """The collected frames."""
    out = {}
    for pad_name, frames in self.collects.items():
        buffers = []
        for frame in frames:
            # Filter out heartbeat buffers (zero-duration buffers)
            buffers.extend([buf for buf in frame.buffers if buf.duration > 0])
        out[pad_name] = TSFrame(buffers=buffers)
    return out