Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 352 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
"examples/basic_room",
"examples/basic_text_stream",
"examples/encrypted_text_stream",
"examples/agent_audio_latency",
"examples/local_audio",
"examples/local_video",
"examples/mobile",
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ match event {
![](https://github.com/livekit/rust-sdks/blob/main/examples/images/simple-room-demo.gif)

- [basic room](https://github.com/livekit/rust-sdks/tree/main/examples/basic_room): simple example connecting to a room.
- [agent_audio_latency](https://github.com/livekit/rust-sdks/tree/main/examples/agent_audio_latency): audio-only mic/speaker example for talking to an agent and estimating response latency.
- [wgpu_room](https://github.com/livekit/rust-sdks/tree/main/examples/wgpu_room): complete example app with video rendering using wgpu and egui.
- [mobile](https://github.com/livekit/rust-sdks/tree/main/examples/mobile): mobile app targeting iOS and Android
- [play_from_disk](https://github.com/livekit/rust-sdks/tree/main/examples/play_from_disk): publish audio from a wav file
Expand Down
17 changes: 17 additions & 0 deletions examples/agent_audio_latency/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "agent_audio_latency"
version = "0.1.0"
edition.workspace = true
publish = false

[dependencies]
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
cpal = "0.15"
env_logger = { workspace = true }
futures-util = { workspace = true }
livekit = { workspace = true, features = ["rustls-tls-native-roots"] }
livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] }
log = { workspace = true }
rand = "0.8"
tokio = { workspace = true, features = ["full"] }
55 changes: 55 additions & 0 deletions examples/agent_audio_latency/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Agent Audio Latency Example

This example connects to a LiveKit room, publishes microphone audio with `cpal`, plays remote audio back to the default speaker, and optionally injects a short probe tone to estimate agent response latency.

It is audio-only. No video tracks are created.

## What the latency metric means

When `--benchmark` is enabled, the app waits for remote audio to be quiet, injects a short tone burst into the outgoing audio, and measures how long it takes before remote audio becomes active again.

That metric works well for:

- echo / loopback agents
- agents that immediately answer with speech or audio

It is not a codec-level mouth-to-ear measurement. It is an application-level "probe sent to first remote audio response" estimate.

## Usage

With a pre-minted participant token:

```bash
cargo run -p agent_audio_latency -- \
--url "$LIVEKIT_URL" \
--token "$LIVEKIT_TOKEN"
```

Or mint a token locally:

```bash
cargo run -p agent_audio_latency -- \
--url "$LIVEKIT_URL" \
--api-key "$LIVEKIT_API_KEY" \
--api-secret "$LIVEKIT_API_SECRET" \
--room-name my-room \
--identity rust-agent-client
```

Enable the latency benchmark:

```bash
cargo run -p agent_audio_latency -- \
--url "$LIVEKIT_URL" \
--token "$LIVEKIT_TOKEN" \
--benchmark
```

If you only want to listen to a specific agent participant:

```bash
cargo run -p agent_audio_latency -- \
--url "$LIVEKIT_URL" \
--token "$LIVEKIT_TOKEN" \
--agent-identity my-agent
```
152 changes: 152 additions & 0 deletions examples/agent_audio_latency/src/audio_capture.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use crate::audio_processing::SharedAudioProcessing;
use crate::latency::TurnLatencyBench;
use anyhow::{anyhow, Result};
use cpal::traits::{DeviceTrait, StreamTrait};
use cpal::{Device, SampleFormat, SizedSample, Stream, StreamConfig};
use log::{error, info, warn};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use tokio::sync::mpsc;

pub struct AudioCapture {
_stream: Stream,
is_running: Arc<AtomicBool>,
}

impl AudioCapture {
pub fn new(
device: Device,
config: StreamConfig,
sample_format: SampleFormat,
audio_tx: mpsc::UnboundedSender<Vec<i16>>,
channel_index: u32,
num_input_channels: u32,
apm: Option<Arc<SharedAudioProcessing>>,
benchmark: Option<Arc<Mutex<TurnLatencyBench>>>,
) -> Result<Self> {
info!("creating audio capture stream");
let is_running = Arc::new(AtomicBool::new(true));
info!("selected audio input device: {} with config: {:?}, channel: {}, num_input_channels: {}", device.name()?, config, channel_index, num_input_channels);
let stream = match sample_format {
SampleFormat::F32 => Self::create_input_stream::<f32>(
device,
config,
audio_tx,
is_running.clone(),
channel_index,
num_input_channels,
apm.clone(),
benchmark.clone(),
)?,
SampleFormat::I16 => Self::create_input_stream::<i16>(
device,
config,
audio_tx,
is_running.clone(),
channel_index,
num_input_channels,
apm.clone(),
benchmark.clone(),
)?,
SampleFormat::U16 => Self::create_input_stream::<u16>(
device,
config,
audio_tx,
is_running.clone(),
channel_index,
num_input_channels,
apm,
benchmark,
)?,
other => return Err(anyhow!("unsupported input sample format: {other:?}")),
};

info!("stream.play()");
stream.play()?;
info!("audio capture stream started");

Ok(Self { _stream: stream, is_running })
}

fn create_input_stream<T>(
device: Device,
config: StreamConfig,
audio_tx: mpsc::UnboundedSender<Vec<i16>>,
is_running: Arc<AtomicBool>,
channel_index: u32,
num_input_channels: u32,
apm: Option<Arc<SharedAudioProcessing>>,
benchmark: Option<Arc<Mutex<TurnLatencyBench>>>,
) -> Result<Stream>
where
T: SizedSample + Send + 'static,
{
// cpal runs the microphone callback on the platform's real-time audio thread.
// Keep this callback short and non-blocking: push samples into a channel and let
// the dedicated uplink runtime handle framing and SDK calls.
let stream = device.build_input_stream(
&config,
move |data: &[T], _: &cpal::InputCallbackInfo| {
if !is_running.load(Ordering::Relaxed) {
return;
}

let mut converted: Vec<i16> = data
.iter()
.skip(channel_index as usize)
.step_by(num_input_channels as usize)
.map(|&sample| convert_sample_to_i16(sample))
.collect();

if let Some(apm) = &apm {
// APM forward processing belongs on the capture thread so the mic signal
// is cleaned up before it enters the app's async/network pipeline.
apm.process_capture(&mut converted);
}

if let Some(benchmark) = &benchmark {
// Detect turn-end directly on the capture callback thread. For this
// benchmark, the audio callback timing is more meaningful than a later
// async task wakeup in the networking pipeline.
benchmark.lock().unwrap().observe_user_audio(&converted, config.sample_rate.0);
}

if let Err(err) = audio_tx.send(converted) {
warn!("failed to forward captured audio: {err}");
}
},
move |err| {
error!("audio input stream error: {err}");
},
None,
)?;

Ok(stream)
}

pub fn stop(&self) {
self.is_running.store(false, Ordering::Relaxed);
}
}

impl Drop for AudioCapture {
fn drop(&mut self) {
self.stop();
}
}

fn convert_sample_to_i16<T: SizedSample>(sample: T) -> i16 {
if std::mem::size_of::<T>() == std::mem::size_of::<f32>() {
let sample_f32 = unsafe { std::mem::transmute_copy::<T, f32>(&sample) };
(sample_f32.clamp(-1.0, 1.0) * i16::MAX as f32) as i16
} else if std::mem::size_of::<T>() == std::mem::size_of::<i16>() {
unsafe { std::mem::transmute_copy::<T, i16>(&sample) }
} else if std::mem::size_of::<T>() == std::mem::size_of::<u16>() {
let sample_u16 = unsafe { std::mem::transmute_copy::<T, u16>(&sample) };
((sample_u16 as i32) - (u16::MAX as i32 / 2)) as i16
} else {
0
}
}
42 changes: 42 additions & 0 deletions examples/agent_audio_latency/src/audio_mixer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

#[derive(Clone)]
pub struct AudioMixer {
buffer: Arc<Mutex<VecDeque<i16>>>,
volume: f32,
max_buffer_size: usize,
}

impl AudioMixer {
pub fn new(sample_rate: u32, channels: u32, volume: f32) -> Self {
Self {
buffer: Arc::new(Mutex::new(VecDeque::with_capacity(
sample_rate as usize * channels as usize,
))),
volume: volume.clamp(0.0, 1.0),
max_buffer_size: sample_rate as usize * channels as usize,
}
}

pub fn add_audio_data(&self, data: &[i16]) {
let mut buffer = self.buffer.lock().unwrap();
for &sample in data {
buffer.push_back((sample as f32 * self.volume) as i16);
if buffer.len() > self.max_buffer_size {
buffer.pop_front();
}
}
}

pub fn get_samples(&self, requested_samples: usize) -> Vec<i16> {
let mut buffer = self.buffer.lock().unwrap();
let mut result = Vec::with_capacity(requested_samples);

for _ in 0..requested_samples {
result.push(buffer.pop_front().unwrap_or(0));
}

result
}
}
Loading
Loading