Skip to content

[speechlm2] Add streaming inference pipeline for NemotronVoiceChat#15571

Open
erastorgueva-nv wants to merge 35 commits intoNVIDIA-NeMo:mainfrom
erastorgueva-nv:duplex-realtime-inference-rebase
Open

[speechlm2] Add streaming inference pipeline for NemotronVoiceChat#15571
erastorgueva-nv wants to merge 35 commits intoNVIDIA-NeMo:mainfrom
erastorgueva-nv:duplex-realtime-inference-rebase

Conversation

@erastorgueva-nv
Copy link
Copy Markdown
Collaborator

Important

The Update branch button must only be pressed in very rare occassions.
An outdated branch is never blocking the merge of a PR.
Please reach out to the automation team before pressing that button.

What does this PR do ?

Add a streaming (real-time, chunk-by-chunk) inference pipeline for NemotronVoiceChat,
following the same architecture as the NeMo ASR Inference Pipelines.

Collection: speechlm2

Changelog

  • Add StreamingS2SPipeline with generate_step() API for both batch file processing and server integration
  • Add NemotronVoicechatInferenceWrapper with infer_one_step() implementing perception → LLM → TTS → codec decode
  • Add perception cache with optional CUDA graph support for cache-aware streaming encoding
  • Add S2SPipelineBuilder factory and Hydra config (s2s_streaming.yaml) for easy setup
  • Add state management: slot-based S2SContextManager for decode state lifecycle, S2SStreamingState for output accumulation
  • Add s2s_streaming_infer.py entry script for batch inference on files/manifests
  • Extend DuplexSTTModel: KV cache support for Nemotron hybrid Mamba/Attention (with monkey-patches for upstream HF bugs), save_pretrained with tokenizer export, function head, ASR logit boosts, cache_position forwarding
  • Speed up model loading: meta device initialization, skip codec silence token computation when codec has random weights
  • Fix sampling: nan/inf check before top-p filtering, vectorized repetition penalty
  • Fix byte-level BPE decoding and BOS/EOS preservation in text output
  • Refactor tests: shared conftest.py fixtures, offline-vs-streaming parity test, no-crash config sweep
  • Add docs: streaming_inference.rst with architecture, config reference, and server integration guide

Modifications to more general code - FYI @kevinhu-nv @Edresson

  • Extend NemotronVoiceChat: from_pretrained supports loading from HF-format checkpoint with llm_artifacts/
  • Extend EarTTSModel: vectorized depth-sum, precomputed RVQ schedule, optional torch.compile, subword cache
  • Patches: _patch_nemotron_cache_bugs and _patch_nemotron_block_forward methods in DuplexSTTModel are patching bugs in the HF Nemotron model code so we can get the KV caching to work. The patches seem to work for me, though I wonder if we can use more up-to-date code that doesn't have the patches.

Usage

python examples/speechlm2/nemo_inference_pipelines/s2s_streaming_infer.py \
    audio_file=/path/to/audio.wav \
    s2s.model_path=/path/to/checkpoint \
    s2s.speaker_name="<name>" \
    s2s.engine_type="native" \
    streaming.chunk_size_in_secs=0.08 \
    streaming.buffer_size_in_secs=1.68
from nemo.collections.speechlm2.inference import S2SPipelineBuilder

pipeline = S2SPipelineBuilder.build_pipeline(cfg)
output = pipeline.run(audio_filepaths, options=options)

GitHub Actions CI

The Jenkins CI system has been replaced by GitHub Actions self-hosted runners.

The GitHub Actions CI will run automatically when the "Run CICD" label is added to the PR.
To re-run CI remove and add the label again.
To run CI on an untrusted fork, a NeMo user with write access must first click "Approve and run".

Before your PR is "Ready for review"

Pre checks:

  • Make sure you read and followed Contributor guidelines
  • Did you write any new necessary tests?
  • Did you add or update any necessary documentation?
  • Does the PR affect components that are optional to install? (Ex: Numba, Pynini, Apex etc)
    • Reviewer: Does the PR have correct import guards for all optional libraries?

PR Type:

  • New Feature
  • Bugfix
  • Documentation

If you haven't finished some of the above items you can still open "Draft" PR.

Who can review?

Anyone in the community is free to review the PR once the checks have passed.
Contributor guidelines contains specific people who can review PRs to various areas.

Additional Information

  • Related to # (issue)

# Cache results for future lookups
if not self.training and self.use_tts_subword_cache:
valid_embeds = subword_embeds[subword_mask].detach()
for idx, sid in enumerate(valid_ids):

Check failure

Code scanning / CodeQL

Potentially uninitialized local variable Error

Local variable 'valid_ids' may be used before it is initialized.
"ignore_eos": True,
"guidance_scale": guidance_scale,
}
self.sampling_params = SamplingParams(**default_sampling)

Check warning

Code scanning / CodeQL

Overwriting attribute in super-class or sub-class Warning

Assignment overwrites attribute sampling_params, which was previously defined in superclass
LLMStreamingEngine
.
tts_model_cfg = cfg['model']['speech_generation']['model']
tts_model_cfg['pretrained_model'] = None
tts_model_cfg['pretrained_codec_model'] = None
except (KeyError, TypeError):

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.
"""Cleanup on deletion."""
try:
self.shutdown()
except Exception:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.
# Try to abort cleanly first
try:
await self.engine.abort_generation(request_id)
except Exception:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.
tid = tokenizer.convert_tokens_to_ids(token)
if isinstance(tid, int):
special_ids.add(tid)
except Exception:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.
…model.py modification for function_head

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…with patches

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…le, optional torch.compile & subword cache

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…g - adjusted infer_one_step code so operations will match offline

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…nce wrapper loading

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…s which will be ignored anyway

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…_history_size parameter

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…StepResult etc

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…ogit comparison

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…ep, add docs

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
… for parity

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…tic parity

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…kens_to_str_raw

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…ering

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
@erastorgueva-nv erastorgueva-nv force-pushed the duplex-realtime-inference-rebase branch from 150aab1 to 81a752e Compare April 1, 2026 06:33
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
…ing params

Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
"""

import copy
import time

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'time' is not used.
import os
import json
import torch
import asyncio

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'asyncio' is not used.
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Signed-off-by: Elena Rastorgueva <erastorgueva@nvidia.com>
Copy link
Copy Markdown
Collaborator

@pzelasko pzelasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review up to streaming_s2s_pipeline.py at line 511 (note to self where to pick up later)

from nemo.collections.speechlm2.inference import S2SPipelineBuilder

pipeline = S2SPipelineBuilder.build_pipeline(cfg)
output = pipeline.run(audio_filepaths, options=options)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this assume a single-turn evaluation? Or the audio file can have multiple turns and the agent is expected to handle that correctly? Let's clarify this in the docs.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean - it's full-duplex, so it just generates one frame of output for every frame of audio input. Audio input can contain single-turn, muti-turn, whatever.

Or if you're asking about "evaluation" - the code doesn't support detailed "evaluation". We just generate text & audio for the full audio file (plus with an option to add silence padding at the end, so the agent can finish speaking). The one bit of "evaluation" we have is WER

.. code-block:: bash

python examples/speechlm2/nemo_inference_pipelines/s2s_streaming_infer.py \
audio_file=/path/to/audio \
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both examples here showcase audio_file. We need to mention how to perform live streaming inference (using mic or other streaming audio input connector) if it is supported by this API; or that it is not supported.

from nemo.collections.speechlm2.inference import S2SPipelineBuilder

pipeline = S2SPipelineBuilder.build_pipeline(cfg)
output = pipeline.run(audio_filepaths, options=options)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there another entry-point with a streaming input connector (mic)? We should mention.

.. code-block:: python

pipeline.open_session()
for frames in streamer:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we show how streamer is constructed? You'd normally refer the user to ASR pipelines documentation but it doesn't exist yet in main IIRC, so we need to describe at least basic concepts / APIs.


pipeline.open_session()
for frames in streamer:
pipeline.generate_step(frames)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this emit some intermediate results? Can we show that?

asr_predicted_text_strs = self._tokens_to_strings(asr_predicted_tokens)

logging.info(f'frame {frame_idx}: USER asr: {asr_predicted_text_strs}')
logging.info(f'frame {frame_idx}: AGENT txt: {predicted_text_strs}')
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to logging.debug, this will be spamming the logs a lot

# infer_one_step sub-stages
# ------------------------------------------------------------------

def _build_input_embedding(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this method should live in DuplexSTT class? It's exposing inner workings of input construction to a high-level inference API.

If we build DuplexSTTv2 which does it completely differently, we don't want to re-write this wrapper - we should just call stt_model.build_input_embedding()


return emb

def _run_llm_step(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should be split to two and live in native / vllm LLM class



@dataclass
class PerceptionCUDAGraphState:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this (partially) live in ASR collection? Could we re-use your work here to accelerate streaming models like nemotron-speech-asr?

state.static_cache_channel_len_in = cache_last_channel_len.clone()

logging.info(f" Warming up encoder for CUDA graph capture...")
for _ in range(3):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's magical about 3?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants