Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/scope/server/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ def __init__(
self._pending_node_params: list[tuple[str, dict[str, Any]]] = []
# The processor whose output we read in graph mode (legacy get() path)
self._sink_processor: PipelineProcessor | None = None
# First graph sink node. Its WebRTC output is the canonical stream used
# for generic perform-mode output_sinks when perform mode runs as a
# generated linear graph.
self._primary_sink_node_id: str | None = None

# Source manager (sources, source queues, hardware input)
self._source_manager = SourceManager()
Expand Down Expand Up @@ -543,7 +547,10 @@ def get_packet_from_sink(self, sink_node_id: str) -> VideoPacket | None:
return None
packet = self.sink_manager.get_packet_from_sink(sink_node_id)
if packet is not None:
self._frames_out += 1
if sink_node_id == self._primary_sink_node_id:
self._on_frame_output(packet)
else:
self._frames_out += 1
return packet

def get_from_sink(self, sink_node_id: str) -> torch.Tensor | None:
Expand Down Expand Up @@ -1007,6 +1014,9 @@ def _setup_graph(self, graph):
)

self._sink_processor = graph_run.sink_processor
self._primary_sink_node_id = (
graph_run.output_node_ids[0] if graph_run.output_node_ids else None
)
self.pipeline_processors = graph_run.processors
self.pipeline_ids = graph_run.pipeline_ids

Expand Down
57 changes: 57 additions & 0 deletions tests/test_frame_processor_sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import numpy as np
import torch

from scope.server.frame_processor import FrameProcessor
from scope.server.media_packets import VideoPacket


class _FakeSinkManager:
def __init__(self, packets: dict[str, VideoPacket]):
self._packets = dict(packets)
self.fanned_frames: list[np.ndarray] = []

@property
def has_generic_sinks(self) -> bool:
return True

def get_packet_from_sink(self, sink_node_id: str) -> VideoPacket | None:
return self._packets.pop(sink_node_id, None)

def fan_out_frame(self, frame_np: np.ndarray) -> None:
self.fanned_frames.append(frame_np.copy())


def _make_frame_processor(primary_sink_node_id: str) -> FrameProcessor:
processor = object.__new__(FrameProcessor)
processor.running = True
processor._primary_sink_node_id = primary_sink_node_id
processor._frames_out = 0
processor._playback_ready_emitted = True
return processor


def test_primary_graph_sink_feeds_generic_output_sinks():
frame = torch.full((2, 3, 3), 127, dtype=torch.uint8)
sink_manager = _FakeSinkManager({"output": VideoPacket(tensor=frame)})
processor = _make_frame_processor("output")
processor.sink_manager = sink_manager

packet = processor.get_packet_from_sink("output")

assert packet is not None
assert processor._frames_out == 1
assert len(sink_manager.fanned_frames) == 1
np.testing.assert_array_equal(sink_manager.fanned_frames[0], frame.numpy())


def test_secondary_graph_sink_does_not_duplicate_generic_output_sinks():
frame = torch.full((2, 3, 3), 255, dtype=torch.uint8)
sink_manager = _FakeSinkManager({"secondary": VideoPacket(tensor=frame)})
processor = _make_frame_processor("output")
processor.sink_manager = sink_manager

packet = processor.get_packet_from_sink("secondary")

assert packet is not None
assert processor._frames_out == 1
assert sink_manager.fanned_frames == []