mstar.streaming.stream_buffer#

Classes

StreamBuffer(request_id, edge_name, ...[, ...])

Per-request, per-edge buffer on the CONSUMER worker.

StreamChunk(data, chunk_index[, ...])

A chunk of data popped from a StreamBuffer.

class mstar.streaming.stream_buffer.StreamBuffer(request_id, edge_name, from_partition, policy, _waiting_graph_edges=<factory>, _buffer=<factory>, _tensor_ids_in_order=<factory>, _id_to_tensor=<factory>, _consumed=0, _chunks_popped=0, producer_done=False)[source]#

Bases: object

Per-request, per-edge buffer on the CONSUMER worker.

Tensors arrive one-by-one via normal RDMA routing. The buffer accumulates them and applies a ChunkPolicy to decide when the consuming node has enough data to proceed.

For sliding-window policies the buffer keeps old items so that pop_chunk can return the full window while only advancing by stride.

Parameters:
edge_name: str#
from_partition: str#
has_chunk_ready()[source]#
Return type:

bool

policy: ChunkPolicy#
pop_chunk()[source]#

Pop the next chunk. Only call when has_chunk_ready() is True.

For sliding-window: returns window_size items, advances by stride items, discards items that have fallen out of the window. start_offset is the global position of the first item in the chunk.

Return type:

StreamChunk

pop_waiting_edge()[source]#
Return type:

GraphEdge | None

pre_read_register(tensor_id)[source]#
Parameters:

tensor_id (str)

producer_done: bool = False#
put(tensor_id, item)[source]#

Called when a tensor arrives via normal RDMA routing.

Parameters:
Return type:

None

request_id: str#
signal_done()[source]#

Producer signals no more items will arrive.

Return type:

None

store_uningested_edge(edge)[source]#
Parameters:

edge (GraphEdge)

class mstar.streaming.stream_buffer.StreamChunk(data, chunk_index, start_offset=0, is_final=False)[source]#

Bases: object

A chunk of data popped from a StreamBuffer.

Parameters:
chunk_index: int#
data: dict[str, Tensor | None]#
is_final: bool = False#
start_offset: int = 0#