From cd51cadd4a39b151a7deffd8b91ae82d4f4db993 Mon Sep 17 00:00:00 2001 From: Simao Fonseca Date: Fri, 6 Mar 2026 17:00:44 +0000 Subject: [PATCH] mxl-sink: Write video and audio correctly with network streams Signed-off-by: Simao Fonseca --- rust/Cargo.lock | 81 ++++++- rust/gst-mxl-rs/Cargo.toml | 1 + rust/gst-mxl-rs/src/mxlsink/imp.rs | 55 +++-- rust/gst-mxl-rs/src/mxlsink/render_audio.rs | 250 ++++++++++++++------ rust/gst-mxl-rs/src/mxlsink/render_video.rs | 142 +++++++++-- rust/gst-mxl-rs/src/mxlsink/state.rs | 145 +++++++++--- 6 files changed, 506 insertions(+), 168 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index fd81dd048..8cfefa413 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -191,6 +191,62 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "either" version = "1.15.0" @@ -356,6 +412,7 @@ dependencies = [ name = "gst-mxl-rs" version = "0.1.0" dependencies = [ + "crossbeam", "glib", "gst-plugin-version-helper", "gstreamer", @@ -407,9 +464,9 @@ dependencies = [ [[package]] name = "gstreamer-audio" -version = "0.24.4" +version = "0.24.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92829dbca7c59ed4bf0c9154dd8c0cf3185d6bf9dad821b058b801d9671fa763" +checksum = "76c058cce8d32bfb6dd578a3d6d1d874b855a638b738d8bb34cd7aedd65aeddd" dependencies = [ "cfg-if", "glib", @@ -422,9 +479,9 @@ dependencies = [ [[package]] name = "gstreamer-audio-sys" -version = "0.24.4" +version = "0.24.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6acd80847b78122c45983597f74a29071d63273c1eded14be5f7381301711475" +checksum = "807e476f555c4e7409d8c8fe4fd10fa9989b8e8f2898762d9551e1adfde98a2d" dependencies = [ "glib-sys", "gobject-sys", @@ -436,9 +493,9 @@ dependencies = [ [[package]] name = "gstreamer-base" -version = "0.24.4" +version = "0.24.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dd15c7e37d306573766834a5cbdd8ee711265f217b060f40a9a8eda45298488" +checksum = "9375f9a12120a8ee17b765c816c9b23861ce258def77b0ee40a05acb00c74972" dependencies = [ "atomic_refcell", "cfg-if", @@ -450,9 +507,9 @@ dependencies = [ [[package]] name = "gstreamer-base-sys" -version = "0.24.4" +version = "0.24.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27a2eda2c61e13c11883bf19b290d07ea6b53d04fd8bfeb7af64b6006c6c9ee6" +checksum = "1b844f3559b6ab0379b4b771261643783ae4e0ffa71d5f5f46e33b7acf66b752" dependencies = [ "glib-sys", "gobject-sys", @@ -602,9 +659,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.6" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "minimal-lexical" @@ -904,9 +961,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "syn" -version = "2.0.111" +version = "2.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" dependencies = [ "proc-macro2", "quote", diff --git a/rust/gst-mxl-rs/Cargo.toml b/rust/gst-mxl-rs/Cargo.toml index 94090ed7e..9c4677c64 100644 --- a/rust/gst-mxl-rs/Cargo.toml +++ b/rust/gst-mxl-rs/Cargo.toml @@ -14,6 +14,7 @@ license.workspace = true ignored = ["regex"] [dependencies] +crossbeam = "0.8.4" glib = "0.21.5" gstreamer = "0.24.4" gstreamer-audio = "0.24.4" diff --git a/rust/gst-mxl-rs/src/mxlsink/imp.rs b/rust/gst-mxl-rs/src/mxlsink/imp.rs index b696b7b10..c871c531d 100644 --- a/rust/gst-mxl-rs/src/mxlsink/imp.rs +++ b/rust/gst-mxl-rs/src/mxlsink/imp.rs @@ -252,11 +252,20 @@ impl BaseSinkImpl for MxlSink { ) })?; let instance = init_mxl_instance(&settings)?; + let pipeline = self + .obj() + .parent() + .and_then(|p| p.downcast::().ok()) + .ok_or(gst::error_msg!( + gst::CoreError::Failed, + ["Failed to get pipeline"] + ))?; context.state = Some(State { instance, flow: None, video: None, audio: None, + pipeline, }); Ok(()) @@ -276,29 +285,33 @@ impl BaseSinkImpl for MxlSink { ))?; if state.audio.is_some() { - state - .audio - .take() - .ok_or(gst::error_msg!( - gst::CoreError::Failed, - ["Failed to get audio state"] - ))? - .writer - .destroy() - .map_err(|_| gst::error_msg!(gst::CoreError::Failed, ["Failed to destroy flow"]))? - }; + let audio = state.audio.take().ok_or(gst::error_msg!( + gst::CoreError::Failed, + ["Failed to get audio state"] + ))?; + let (lock, cvar) = &*audio.sleep_flag; + + let mut flag = lock.lock().map_err(|_| { + gst::error_msg!(gst::CoreError::Failed, ["Failed to get audio sleep lock"]) + })?; + *flag = true; + cvar.notify_all(); + drop(audio.tx); + } if state.video.is_some() { - state - .video - .take() - .ok_or(gst::error_msg!( - gst::CoreError::Failed, - ["Failed to get video state"] - ))? - .writer - .destroy() - .map_err(|_| gst::error_msg!(gst::CoreError::Failed, ["Failed to destroy flow"]))? + let video = state.video.take().ok_or(gst::error_msg!( + gst::CoreError::Failed, + ["Failed to get video state"] + ))?; + let (lock, cvar) = &*video.sleep_flag; + + let mut flag = lock.lock().map_err(|_| { + gst::error_msg!(gst::CoreError::Failed, ["Failed to get video sleep lock"]) + })?; + *flag = true; + cvar.notify_all(); + drop(video.tx); } gst::info!(CAT, imp = self, "Stopped"); diff --git a/rust/gst-mxl-rs/src/mxlsink/render_audio.rs b/rust/gst-mxl-rs/src/mxlsink/render_audio.rs index 8573c7fe9..d38928390 100644 --- a/rust/gst-mxl-rs/src/mxlsink/render_audio.rs +++ b/rust/gst-mxl-rs/src/mxlsink/render_audio.rs @@ -1,119 +1,130 @@ // SPDX-FileCopyrightText: 2025 2025 Contributors to the Media eXchange Layer project. // SPDX-License-Identifier: Apache-2.0 -use crate::mxlsink; +use std::time::Duration; -use gstreamer::{self as gst}; +use crate::mxlsink::{ + self, + state::{AudioCommand, AudioEngine, InitialTime}, +}; + +use gstreamer::{ + self as gst, + prelude::{ClockExt, ElementExt}, +}; +use mxl::Rational; use tracing::trace; +pub struct WriteSampleData { + chunk: Vec, + num_channels: usize, + bytes_per_sample: usize, + chunk_samples: usize, + index: u64, +} + +pub struct WriteGrainData { + pub buf: Vec, + pub index: u64, +} + +const LATENCY_CUSHION: u64 = 5_000_000; + pub(crate) fn audio( state: &mut mxlsink::state::State, buffer: &gst::Buffer, ) -> Result { let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; let src = map.as_slice(); + let flow_info = state + .flow + .as_ref() + .ok_or(gst::FlowError::Error)? + .continuous() + .map_err(|_| gst::FlowError::Error)?; + let buffer_length = flow_info.bufferLength as u64; + let max_chunk = (buffer_length / 2) as usize; + let clock = state.pipeline.clock().ok_or(gst::FlowError::Error)?; + let gst_now = clock.time(); let audio_state = state.audio.as_mut().ok_or(gst::FlowError::Error)?; - let bytes_per_sample = (audio_state.flow_def.bit_depth / 8) as usize; - trace!( - "received buffer size: {}, channel count: {}, bit-depth: {}, bytes-per-sample: {}", - src.len(), - audio_state.flow_def.channel_count, - audio_state.bit_depth, - bytes_per_sample - ); - let samples_per_buffer = src.len() / (audio_state.flow_def.channel_count as usize * bytes_per_sample); - audio_state.batch_size = samples_per_buffer; + let gst_pts = buffer.pts().ok_or(gst::FlowError::Error)?; + trace!("GST BUFFER PTS: {:#?}", gst_pts); + let mxl_now = state.instance.get_time(); - let flow = state.flow.as_ref().ok_or(gst::FlowError::Error)?; - let flow_info = flow.continuous().map_err(|_| gst::FlowError::Error)?; - let sample_rate = flow - .common() - .sample_rate() - .map_err(|_| gst::FlowError::Error)?; - let buffer_length = flow_info.bufferLength as u64; + let initial = audio_state.initial_time.get_or_insert(InitialTime { + mxl_pts_offset: mxl_now - gst_now.nseconds(), + }); - let mut write_index = match audio_state.next_write_index { - Some(idx) => idx, - None => { - let current_index = state.instance.get_current_index(&sample_rate); - audio_state.next_write_index = Some(current_index); - current_index - } + let initial_pts_offset = initial.mxl_pts_offset; + let sample_rate = Rational { + numerator: audio_state.flow_def.sample_rate.numerator as i64, + denominator: audio_state.flow_def.sample_rate.denominator as i64, }; - trace!( - "Writing audio batch starting at index {}, sample_rate {}/{}", - write_index, sample_rate.numerator, sample_rate.denominator - ); - - let max_chunk = (buffer_length / 2) as usize; let num_channels = audio_state.flow_def.channel_count as usize; - let samples_total = samples_per_buffer; - let mut remaining = samples_total; + let mut remaining = samples_per_buffer; let mut src_offset_samples = 0; + let mut base_pts = gst_pts.nseconds() + initial_pts_offset; while remaining > 0 { - let (chunk_samples, chunk_bytes, mut access, samples_per_channel) = - compute_samples_per_channel( - audio_state, - bytes_per_sample, - write_index, - max_chunk, - num_channels, - remaining, - )?; - - let src_chunk = compute_chunk( - src, + let mxl_pts = base_pts; + let chunk_samples = remaining.min(max_chunk); + let chunk_bytes = chunk_samples * num_channels * bytes_per_sample; + + let chunk = compute_chunk( + &map, bytes_per_sample, num_channels, src_offset_samples, chunk_bytes, - ); + ) + .to_vec(); + let chunk_duration_ns = + (chunk_samples as u128 * sample_rate.denominator as u128 * 1_000_000_000u128) + / sample_rate.numerator as u128; - write_samples_per_channel( - bytes_per_sample, - num_channels, - &mut access, - samples_per_channel, - src_chunk, - )?; - - access.commit().map_err(|_| gst::FlowError::Error)?; + base_pts += chunk_duration_ns as u64; trace!( - "Committed chunk: {} samples at index {} ({} bytes)", - chunk_samples, write_index, chunk_bytes + "CHUNK WITH SAMPLES {:#?} WITH MXL PTS: {:#?}", + samples_per_buffer, mxl_pts ); - write_index = write_index.wrapping_add(chunk_samples as u64); + let latency_ns = samples_to_ns(audio_state.latency, &sample_rate); + let mut pts = mxl_pts + latency_ns; + let mxl_now = state.instance.get_time(); + if pts < mxl_now { + let diff_ns = mxl_now - pts; + let diff_samples = ns_to_samples(diff_ns, &sample_rate); + audio_state.latency += diff_samples; + trace!("AUDIO Latency increased by {:#?} samples", diff_samples); + let latency_ns = samples_to_ns(audio_state.latency, &sample_rate); + pts = mxl_pts + latency_ns + LATENCY_CUSHION; + } + let mxl_index = state + .instance + .timestamp_to_index(pts, &sample_rate) + .map_err(|_| gst::FlowError::Error)?; + + let data = WriteSampleData { + chunk, + bytes_per_sample, + num_channels, + chunk_samples, + index: mxl_index, + }; + audio_state + .tx + .send(AudioCommand::Write { data }) + .map_err(|_| gst::FlowError::Error)?; src_offset_samples += chunk_samples; remaining -= chunk_samples; } - audio_state.next_write_index = Some(write_index); Ok(gst::FlowSuccess::Ok) } -fn compute_samples_per_channel( - audio_state: &mut mxlsink::state::AudioState, - bytes_per_sample: usize, - write_index: u64, - max_chunk: usize, - num_channels: usize, - remaining: usize, -) -> Result<(usize, usize, mxl::SamplesWriteAccess<'_>, usize), gst::FlowError> { - let chunk_samples = remaining.min(max_chunk); - let chunk_bytes = chunk_samples * num_channels * bytes_per_sample; - let access = audio_state - .writer - .open_samples(write_index, chunk_samples) - .map_err(|_| gst::FlowError::Error)?; - let samples_per_channel = chunk_samples; - Ok((chunk_samples, chunk_bytes, access, samples_per_channel)) -} - fn write_samples_per_channel( bytes_per_sample: usize, num_channels: usize, @@ -181,3 +192,86 @@ fn compute_chunk( &src[src_offset_samples * num_channels * bytes_per_sample ..src_offset_samples * num_channels * bytes_per_sample + chunk_bytes] } + +pub fn await_audio_buffer( + engine: &mut AudioEngine, + rx: crossbeam::channel::Receiver, +) -> Result<(), gst::FlowError> { + while let Ok(cmd) = rx.recv() { + match cmd { + AudioCommand::Write { data } => { + if let Err(e) = write_buffer(engine, data) { + trace!("Audio engine error: {:?}", e); + } + } + } + } + trace!("DELETING AUDIO FLOW"); + engine + .writer + .take() + .ok_or(gst::FlowError::Error)? + .destroy() + .map_err(|_| gst::FlowError::Error)?; + Ok(()) +} + +fn write_buffer(engine: &mut AudioEngine, data: WriteSampleData) -> Result<(), gst::FlowError> { + let writer = engine.writer.as_mut().ok_or(gst::FlowError::Error)?; + let sample_rate = engine.sample_rate; + let mxl_index = data.index; + + let pts = engine + .instance + .index_to_timestamp(mxl_index, &sample_rate) + .map_err(|_| gst::FlowError::Error)?; + let mxl_now = engine.instance.get_time(); + if pts > mxl_now { + trace!("Audio sleeping: {:#?}", Duration::from_nanos(pts - mxl_now)); + let (lock, cvar) = &*engine.sleep_flag; + + let shutdown_guard = lock.lock().map_err(|_| gst::FlowError::Error)?; + + let mut shutdown_guard = shutdown_guard; + + while !*shutdown_guard && pts > mxl_now { + let remaining = Duration::from_nanos(pts - mxl_now); + + let (guard, timeout_result) = cvar + .wait_timeout(shutdown_guard, remaining) + .map_err(|_| gst::FlowError::Error)?; + + shutdown_guard = guard; + + if timeout_result.timed_out() { + break; + } + } + + if *shutdown_guard { + return Ok(()); + } + } + + let mut access = writer + .open_samples(mxl_index, data.chunk_samples) + .map_err(|_| gst::FlowError::Error)?; + write_samples_per_channel( + data.bytes_per_sample, + data.num_channels, + &mut access, + data.chunk_samples, + data.chunk.as_slice(), + )?; + access.commit().map_err(|_| gst::FlowError::Error)?; + Ok(()) +} + +fn samples_to_ns(samples: u64, rate: &Rational) -> u64 { + ((samples as u128 * 1_000_000_000u128 * rate.denominator as u128) / rate.numerator as u128) + as u64 +} + +fn ns_to_samples(ns: u64, rate: &Rational) -> u64 { + ((ns as u128 * rate.numerator as u128) / (rate.denominator as u128 * 1_000_000_000u128)) as u64 +} diff --git a/rust/gst-mxl-rs/src/mxlsink/render_video.rs b/rust/gst-mxl-rs/src/mxlsink/render_video.rs index a1d5e7cfb..584649e0b 100644 --- a/rust/gst-mxl-rs/src/mxlsink/render_video.rs +++ b/rust/gst-mxl-rs/src/mxlsink/render_video.rs @@ -1,41 +1,63 @@ // SPDX-FileCopyrightText: 2025 2025 Contributors to the Media eXchange Layer project. // SPDX-License-Identifier: Apache-2.0 -use crate::mxlsink; +use std::time::Duration; -use gstreamer as gst; +use crate::mxlsink::{ + self, + render_audio::WriteGrainData, + state::{InitialTime, VideoCommand, VideoEngine}, +}; + +use gstreamer::{ + self as gst, + prelude::{ClockExt, ElementExt}, +}; +use mxl::Rational; use tracing::trace; +const LATENCY_CUSHION: u64 = 5_000_000; + pub(crate) fn video( state: &mut mxlsink::state::State, buffer: &gst::Buffer, ) -> Result { + let video_state = state.video.as_mut().ok_or(gst::FlowError::Error)?; let flow = state.flow.as_ref().ok_or(gst::FlowError::Error)?; let grain_rate = flow .common() .grain_rate() .map_err(|_| gst::FlowError::Error)?; - let current_index = state.instance.get_current_index(&grain_rate); - - let video_state = state.video.as_mut().ok_or(gst::FlowError::Error)?; + let clock = state.pipeline.clock().ok_or(gst::FlowError::Error)?; + let gst_now = clock.time(); + let mxl_now = state.instance.get_time(); + let initial = video_state.initial_time.get_or_insert(InitialTime { + mxl_pts_offset: mxl_now - gst_now.nseconds(), + }); - let buffer_size = video_state.grain_count as u64; - let safe_window = buffer_size / 2; + let initial_pts_offset = initial.mxl_pts_offset; - let next_index = video_state.grain_index; - - let diff = next_index as i128 - current_index as i128; - - if diff >= safe_window as i128 { - return Ok(gst::FlowSuccess::Ok); + let gst_pts = buffer.pts().ok_or(gst::FlowError::Error)?; + let mxl_pts = gst_pts.nseconds() + initial_pts_offset; + let mut pts = mxl_pts + grains_to_ns(video_state.latency, &grain_rate); + if pts < mxl_now { + let diff_ns = mxl_now - pts; + let diff_grains = ns_to_grains(diff_ns, &grain_rate); + video_state.latency += diff_grains; + let latency_ns = grains_to_ns(video_state.latency, &grain_rate); + pts = mxl_pts + latency_ns + LATENCY_CUSHION; } - - commit_buffer(buffer, video_state, next_index)?; + trace!("VIDEO gst PTS: {:#?}", gst_pts); + trace!("VIDEO mapped PTS: {:#?}", pts); + let mxl_index = state + .instance + .timestamp_to_index(pts, &grain_rate) + .map_err(|_| gst::FlowError::Error)?; + trace!("VIDEO mapped mxl_index from pts: {:#?}", mxl_index); + commit_buffer(buffer, video_state, mxl_index)?; video_state.grain_index = video_state.grain_index.wrapping_add(1); - trace!("Committed grain {}", next_index); - Ok(gst::FlowSuccess::Ok) } @@ -45,17 +67,93 @@ fn commit_buffer( index: u64, ) -> Result<(), gst::FlowError> { let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; - let data = map.as_slice(); - let mut access = video_state + let buf = map.to_vec(); + let data = WriteGrainData { buf, index }; + video_state + .tx + .send(VideoCommand::Write { data }) + .map_err(|_| gst::FlowError::Error)?; + Ok(()) +} + +pub fn await_video_buffer( + engine: &mut VideoEngine, + rx: crossbeam::channel::Receiver, +) -> Result<(), gst::FlowError> { + while let Ok(cmd) = rx.recv() { + match cmd { + VideoCommand::Write { data } => { + if let Err(e) = write_buffer(engine, data) { + trace!("Video engine error: {:?}", e); + } + } + } + } + trace!("DELETING VIDEO FLOW"); + engine .writer - .open_grain(index) + .take() + .ok_or(gst::FlowError::Error)? + .destroy() + .map_err(|_| gst::FlowError::Error)?; + Ok(()) +} + +fn write_buffer(engine: &mut VideoEngine, data: WriteGrainData) -> Result<(), gst::FlowError> { + trace!("VIDEO THREAD IS WRITING A BUFFER"); + let mut access = engine + .writer + .as_ref() + .ok_or(gst::FlowError::Error)? + .open_grain(data.index) .map_err(|_| gst::FlowError::Error)?; let payload = access.payload_mut(); - let copy_len = std::cmp::min(payload.len(), data.len()); - payload[..copy_len].copy_from_slice(&data[..copy_len]); + let copy_len = std::cmp::min(payload.len(), data.buf.len()); + payload[..copy_len].copy_from_slice(&data.buf[..copy_len]); let total_slices = access.total_slices(); + let mxl_now = engine.instance.get_time(); + let pts = engine + .instance + .index_to_timestamp(data.index, &engine.grain_rate) + .map_err(|_| gst::FlowError::Error)?; + + if pts > mxl_now { + trace!("Video sleeping: {:#?}", Duration::from_nanos(pts - mxl_now)); + let (lock, cvar) = &*engine.sleep_flag; + + let shutdown_guard = lock.lock().map_err(|_| gst::FlowError::Error)?; + + let mut shutdown_guard = shutdown_guard; + + while !*shutdown_guard && pts > mxl_now { + let remaining = Duration::from_nanos(pts - mxl_now); + + let (guard, timeout_result) = cvar + .wait_timeout(shutdown_guard, remaining) + .map_err(|_| gst::FlowError::Error)?; + + shutdown_guard = guard; + + if timeout_result.timed_out() { + break; + } + } + + if *shutdown_guard { + return Ok(()); + } + } access .commit(total_slices) .map_err(|_| gst::FlowError::Error)?; Ok(()) } + +fn grains_to_ns(grains: u64, rate: &Rational) -> u64 { + ((grains as u128 * 1_000_000_000u128 * rate.denominator as u128) / rate.numerator as u128) + as u64 +} + +fn ns_to_grains(ns: u64, rate: &Rational) -> u64 { + ((ns as u128 * rate.numerator as u128) / (rate.denominator as u128 * 1_000_000_000u128)) as u64 +} diff --git a/rust/gst-mxl-rs/src/mxlsink/state.rs b/rust/gst-mxl-rs/src/mxlsink/state.rs index 58cdcc98b..eb0557d9b 100644 --- a/rust/gst-mxl-rs/src/mxlsink/state.rs +++ b/rust/gst-mxl-rs/src/mxlsink/state.rs @@ -1,15 +1,26 @@ // SPDX-FileCopyrightText: 2025 2025 Contributors to the Media eXchange Layer project. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashMap, process, str::FromStr}; +use std::{ + collections::HashMap, + process, + str::FromStr, + sync::{Arc, Condvar, Mutex}, + thread, +}; -use crate::mxlsink::imp::CAT; +use crate::mxlsink::{ + imp::CAT, + render_audio::{WriteGrainData, WriteSampleData, await_audio_buffer}, + render_video::await_video_buffer, +}; +use crossbeam::channel::bounded; use gst::StructureRef; use gst_audio::AudioInfo; use gstreamer as gst; use gstreamer_audio as gst_audio; use mxl::{ - FlowConfigInfo, GrainWriter, MxlInstance, SamplesWriter, + FlowConfigInfo, GrainWriter, MxlInstance, Rational, SamplesWriter, flowdef::{ Component, FlowDef, FlowDefAudio, FlowDefDetails, FlowDefVideo, InterlaceMode, Rate, }, @@ -20,6 +31,7 @@ use uuid::Uuid; pub(crate) const DEFAULT_FLOW_ID: &str = ""; pub(crate) const DEFAULT_DOMAIN: &str = ""; +const MAX_CHANNEL_SIZE: usize = 50; #[derive(Debug, Clone)] pub(crate) struct Settings { @@ -41,20 +53,32 @@ pub(crate) struct State { pub flow: Option, pub video: Option, pub audio: Option, + pub pipeline: gst::Pipeline, } pub(crate) struct VideoState { - pub writer: GrainWriter, + pub tx: crossbeam::channel::Sender, pub grain_index: u64, - pub grain_count: u32, + pub initial_time: Option, + pub latency: u64, + pub sleep_flag: Arc<(Mutex, Condvar)>, } +#[derive(Debug, Clone)] pub(crate) struct AudioState { - pub writer: SamplesWriter, - pub bit_depth: u8, - pub batch_size: usize, + pub tx: crossbeam::channel::Sender, pub flow_def: FlowDefAudio, - pub next_write_index: Option, + pub initial_time: Option, + pub latency: u64, + pub sleep_flag: Arc<(Mutex, Condvar)>, +} + +pub enum AudioCommand { + Write { data: WriteSampleData }, +} + +pub enum VideoCommand { + Write { data: WriteGrainData }, } #[derive(Default)] @@ -62,6 +86,25 @@ pub(crate) struct Context { pub state: Option, } +pub struct AudioEngine { + pub writer: Option, + pub instance: MxlInstance, + pub sample_rate: Rational, + pub sleep_flag: Arc<(Mutex, Condvar)>, +} + +pub struct VideoEngine { + pub writer: Option, + pub instance: MxlInstance, + pub grain_rate: Rational, + pub sleep_flag: Arc<(Mutex, Condvar)>, +} + +#[derive(Debug, Clone)] +pub struct InitialTime { + pub mxl_pts_offset: u64, +} + pub(crate) fn init_state_with_audio( state: &mut State, info: AudioInfo, @@ -99,9 +142,8 @@ pub(crate) fn init_state_with_audio( details: FlowDefDetails::Audio(flow_def_details.clone()), }; - let instance = &state.instance; - - let (flow_writer, flow, is_created) = instance + let (flow_writer, flow, is_created) = state + .instance .create_flow_writer( serde_json::to_string(&flow_def) .map_err(|e| gst::loggable_error!(CAT, "Failed to convert: {}", e))? @@ -109,27 +151,46 @@ pub(crate) fn init_state_with_audio( None, ) .map_err(|e| gst::loggable_error!(CAT, "Failed to create flow writer: {}", e))?; - - let writer = flow_writer - .to_samples_writer() - .map_err(|e| gst::loggable_error!(CAT, "Failed to create grain writer: {}", e))?; - + let writer = Some( + flow_writer + .to_samples_writer() + .map_err(|e| gst::loggable_error!(CAT, "Failed to create grain writer: {}", e))?, + ); + let (tx, rx) = bounded::(MAX_CHANNEL_SIZE); + let instance = state.instance.clone(); if !is_created { return Err(gst::loggable_error!( CAT, "The writer could not be created, the UUID belongs to a flow with another active writer" )); } - - state.audio = Some(AudioState { - writer, - bit_depth, - batch_size: flow.common().max_commit_batch_size_hint() as usize, + let sleep_flag_init = Arc::new((Mutex::new(false), Condvar::new())); + let sleep_flag = sleep_flag_init.clone(); + let audio_state = AudioState { + tx, flow_def: flow_def_details, - next_write_index: None, - }); + initial_time: None, + latency: 0, + sleep_flag, + }; + state.audio = Some(audio_state.clone()); state.flow = Some(flow); + let sample_rate = Rational { + numerator: rate as i64, + denominator: 1, + }; + + let sleep_flag = sleep_flag_init.clone(); + thread::spawn(move || { + let mut engine = AudioEngine { + writer, + instance, + sample_rate, + sleep_flag, + }; + await_audio_buffer(&mut engine, rx) + }); trace!( "Made it to the end of set_caps with format {}, channel_count {}, sample_rate {}, bit_depth {}", format, channels, rate, bit_depth @@ -229,22 +290,36 @@ pub(crate) fn init_state_with_video( "The writer could not be created, the UUID belongs to a flow with another active writer" )); } - let writer = flow_writer - .to_grain_writer() - .map_err(|e| gst::loggable_error!(CAT, "Failed to create grain writer: {}", e))?; - let grain_count = flow - .discrete() - .map_err(|e| gst::loggable_error!(CAT, "Failed to get grain count: {}", e))? - .grainCount; - let rate = flow + let writer = Some( + flow_writer + .to_grain_writer() + .map_err(|e| gst::loggable_error!(CAT, "Failed to create grain writer: {}", e))?, + ); + let grain_rate = flow .common() .grain_rate() .map_err(|e| gst::loggable_error!(CAT, "Failed to get grain rate: {}", e))?; - let index = instance.get_current_index(&rate); + let index = instance.get_current_index(&grain_rate); + let instance = instance.clone(); + let (tx, rx) = bounded::(MAX_CHANNEL_SIZE); + let sleep_flag_init = Arc::new((Mutex::new(false), Condvar::new())); + let sleep_flag = sleep_flag_init.clone(); + thread::spawn(move || { + let mut engine = VideoEngine { + writer, + instance, + grain_rate, + sleep_flag, + }; + await_video_buffer(&mut engine, rx) + }); + let sleep_flag = sleep_flag_init.clone(); state.video = Some(VideoState { - writer, grain_index: index, - grain_count, + initial_time: None, + latency: 0, + tx, + sleep_flag, }); state.flow = Some(flow);