Skip to content

sgnts.sinks.null

NullSeriesSink dataclass

Bases: TSSink


              flowchart TD
              sgnts.sinks.null.NullSeriesSink[NullSeriesSink]
              sgnts.base.base.TSSink[TSSink]
              sgnts.base.base.TimeSeriesMixin[TimeSeriesMixin]

                              sgnts.base.base.TSSink --> sgnts.sinks.null.NullSeriesSink
                                sgnts.base.base.TimeSeriesMixin --> sgnts.base.base.TSSink
                



              click sgnts.sinks.null.NullSeriesSink href "" "sgnts.sinks.null.NullSeriesSink"
              click sgnts.base.base.TSSink href "" "sgnts.base.base.TSSink"
              click sgnts.base.base.TimeSeriesMixin href "" "sgnts.base.base.TimeSeriesMixin"
            

A series sink that does precisely nothing.

Parameters:

Name Type Description Default
verbose bool

bool, print frames as they pass through the internal pad

False
Source code in src/sgnts/sinks/null.py
@dataclass
class NullSeriesSink(TSSink):
    """A series sink that does precisely nothing.

    Args:
        verbose:
            bool, print frames as they pass through the internal pad

    """

    verbose: bool = False

    def process(self, input_frames: dict[SinkPad, TSFrame]) -> None:
        """Print frames if verbose."""
        for sink_pad, frame in input_frames.items():
            if frame.EOS:
                self.mark_eos(sink_pad)
            if self.verbose:
                print(f"{sink_pad.name}:")
                print(f"  {frame}")
                latency = gpsnow() - Offset.tosec(
                    frame.offset + Offset.SAMPLE_STRIDE_AT_MAX_RATE
                )
                print(f"  latency: {latency} s")

process(input_frames)

Print frames if verbose.

Source code in src/sgnts/sinks/null.py
def process(self, input_frames: dict[SinkPad, TSFrame]) -> None:
    """Print frames if verbose."""
    for sink_pad, frame in input_frames.items():
        if frame.EOS:
            self.mark_eos(sink_pad)
        if self.verbose:
            print(f"{sink_pad.name}:")
            print(f"  {frame}")
            latency = gpsnow() - Offset.tosec(
                frame.offset + Offset.SAMPLE_STRIDE_AT_MAX_RATE
            )
            print(f"  latency: {latency} s")