mstar.streaming.chunk_policy#
Classes
Determines when a StreamBuffer has enough items for the consumer node. |
|
|
Release non-overlapping chunks of fixed size. |
|
Chunk policy for streaming vocoders with left-context overlap. |
|
Fixed-size sliding window that advances by a stride. |
- class mstar.streaming.chunk_policy.ChunkPolicy[source]#
Bases:
ABCDetermines when a StreamBuffer has enough items for the consumer node.
- continue_after_producer_done()[source]#
Whether the buffer should keep producing (empty) chunks after the producer signals done and all buffered items have been consumed.
Default
False: partition-done is propagated to the conductor after the last item is flushed.Set to
Truefor connections where the consumer must keep running after the producer finishes (e.g., Thinker→Talker: the Talker continues generating codec tokens after the Thinker hits text EOS). In this case the buffer produces empty chunks (_collate([])→{"data": None}), and the consumer’s partition-done is determined by its own model logic, not by the StreamBuffer.- Return type:
- abstractmethod is_ready(buffer_len)[source]#
Return True if the buffer has enough items for a chunk.
- class mstar.streaming.chunk_policy.FixedChunkPolicy(chunk_size, continue_after_done=False)[source]#
Bases:
ChunkPolicyRelease non-overlapping chunks of fixed size.
Each pop_chunk returns exactly chunk_size items and advances by chunk_size. No overlap, no sliding window.
- Parameters:
- continue_after_producer_done()[source]#
Whether the buffer should keep producing (empty) chunks after the producer signals done and all buffered items have been consumed.
Default
False: partition-done is propagated to the conductor after the last item is flushed.Set to
Truefor connections where the consumer must keep running after the producer finishes (e.g., Thinker→Talker: the Talker continues generating codec tokens after the Thinker hits text EOS). In this case the buffer produces empty chunks (_collate([])→{"data": None}), and the consumer’s partition-done is determined by its own model logic, not by the StreamBuffer.- Return type:
- class mstar.streaming.chunk_policy.LeftContextChunkPolicy(chunk, left_context)[source]#
Bases:
ChunkPolicyChunk policy for streaming vocoders with left-context overlap.
Matches HuggingFace’s
Qwen3OmniMoeCode2Wav.chunked_decodepattern:Iter 0: codes[0 : chunk] → emit all (no context) Iter 1: codes[chunk-ctx : 2*chunk] → trim first ctx, emit rest Iter 2: codes[2*chunk-ctx : 3*chunk] → trim first ctx, emit rest
The first pop returns
chunkitems (no context). Subsequent pops returnchunk + left_contextitems, where the leadingleft_contextitems OVERLAP with the tail of the previous chunk. This overlap allows the causal ConvNet vocoder to “warm up” its internal state on frames it has already processed, ensuring a smooth transition at chunk boundaries.The key invariant: the first pop advances by
chunk - left_context(notchunk), so the lastleft_contextitems of the first chunk remain in the buffer as overlap for the second pop. All subsequent pops advance bychunk.
- class mstar.streaming.chunk_policy.SlidingWindowChunkPolicy(window, stride)[source]#
Bases:
ChunkPolicyFixed-size sliding window that advances by a stride.
Each pop_chunk returns window items and advances the consumed pointer by stride. Old items before the window are discarded.
Example (Orpheus SNAC): window=28 tokens (4 frames), stride=7 (1 frame).