Skip to content

Commit 4030cd2

Browse files
committed
Make Rust bindings compatiple with MXL 0.8.x
Signed-off-by: Pavel Cernohorsky <pavel.cernohorsky@appear.net>
1 parent 63601f8 commit 4030cd2

6 files changed

Lines changed: 25 additions & 19 deletions

File tree

rust/mxl/examples/flow-reader.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,27 +83,27 @@ fn read_samples(
8383
) -> Result<(), mxl::Error> {
8484
let flow_id = flow_info.common_flow_info().id().to_string();
8585
let sample_rate = flow_info.continuous_flow_info()?.sampleRate;
86-
let continous_flow_info = flow_info.continuous_flow_info()?;
86+
let common_flow_info = flow_info.common_flow_info();
8787
let batch_size = if let Some(batch_size) = batch_size {
88-
if continous_flow_info.commitBatchSize != 0
89-
&& batch_size != continous_flow_info.commitBatchSize as u64
88+
if common_flow_info.max_commit_batch_size_hint() != 0
89+
&& batch_size != common_flow_info.max_commit_batch_size_hint() as u64
9090
{
9191
warn!(
9292
"Writer batch size is set to {}, but sample batch size is provided, using the \
93-
latter.",
94-
continous_flow_info.commitBatchSize
93+
latter.",
94+
common_flow_info.max_commit_batch_size_hint()
9595
);
9696
}
9797
batch_size as usize
98-
} else if continous_flow_info.commitBatchSize == 0 {
98+
} else if common_flow_info.max_commit_batch_size_hint() == 0 {
9999
let batch_size = (sample_rate.numerator / (100 * sample_rate.denominator)) as usize;
100100
warn!(
101101
"Writer batch size not available, using fallback value of {}.",
102102
batch_size
103103
);
104104
batch_size
105105
} else {
106-
continous_flow_info.commitBatchSize as usize
106+
common_flow_info.max_commit_batch_size_hint() as usize
107107
};
108108
let mut read_head = reader.get_info()?.continuous_flow_info()?.headIndex;
109109
let mut read_head_valid_at = mxl_instance.get_time();

rust/mxl/examples/flow-writer.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,19 @@ pub fn write_grains(
8686
}
8787

8888
let mut grain_writer_access = writer.open_grain(grain_index)?;
89+
let total_slices = grain_writer_access.total_slices();
8990
let payload = grain_writer_access.payload_mut();
9091
let payload_len = payload.len();
9192
for (i, byte) in payload.iter_mut().enumerate() {
9293
*byte = ((i as u64 + grain_index) % 256) as u8;
9394
}
94-
grain_writer_access.commit(payload_len as u32)?;
95+
grain_writer_access.commit(total_slices)?;
9596

9697
let timestamp = mxl_instance.index_to_timestamp(grain_index + 1, &grain_rate)?;
9798
let sleep_duration = mxl_instance.get_duration_until_index(grain_index + 1, &grain_rate)?;
9899
info!(
99-
"Finished writing {payload_len} bytes into grain {grain_index}, will sleep for {:?} until timestamp {timestamp}.",
100+
"Finished writing {payload_len} bytes ({total_slices} slices) into grain {grain_index}, will sleep \
101+
for {:?} until timestamp {timestamp}.",
100102
sleep_duration
101103
);
102104
grain_index += 1;

rust/mxl/src/flow.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,8 @@ impl CommonFlowInfo<'_> {
7575
pub fn id(&self) -> Uuid {
7676
Uuid::from_bytes(self.0.id)
7777
}
78+
79+
pub fn max_commit_batch_size_hint(&self) -> u32 {
80+
self.0.maxCommitBatchSizeHint
81+
}
7882
}

rust/mxl/src/grain/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl GrainReader {
4949
&mut payload_ptr,
5050
))?;
5151
}
52-
if grain_info.commitedSize != grain_info.grainSize {
52+
if grain_info.validSlices != grain_info.totalSlices {
5353
// We don't need partial grains. Wait for the grain to be complete.
5454
continue;
5555
}

rust/mxl/src/grain/write_access.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,20 @@ impl<'a> GrainWriteAccess<'a> {
5151
self.grain_info.grainSize
5252
}
5353

54-
pub fn committed_size(&self) -> u32 {
55-
self.grain_info.commitedSize
54+
pub fn total_slices(&self) -> u16 {
55+
self.grain_info.totalSlices
5656
}
5757

58-
pub fn commit(mut self, commited_size: u32) -> Result<()> {
58+
pub fn commit(mut self, valid_slices: u16) -> Result<()> {
5959
self.committed_or_canceled = true;
6060

61-
if commited_size > self.grain_info.grainSize {
61+
if valid_slices > self.grain_info.totalSlices {
6262
return Err(Error::Other(format!(
63-
"Commited size {} cannot exceed grain size {}.",
64-
commited_size, self.grain_info.grainSize
63+
"Valid slices {} cannot exceed total slices {}.",
64+
valid_slices, self.grain_info.totalSlices
6565
)));
6666
}
67-
self.grain_info.commitedSize = commited_size;
67+
self.grain_info.validSlices = valid_slices;
6868

6969
unsafe {
7070
Error::from_status(

rust/mxl/tests/basic_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ fn basic_mxl_grain_writing_reading() {
7474
let rate = flow_info.discrete_flow_info().unwrap().grainRate;
7575
let current_index = mxl_instance.get_current_index(&rate);
7676
let grain_write_access = grain_writer.open_grain(current_index).unwrap();
77-
let grain_size = grain_write_access.max_size();
78-
grain_write_access.commit(grain_size).unwrap();
77+
let total_slices = grain_write_access.total_slices();
78+
grain_write_access.commit(total_slices).unwrap();
7979
let grain_data = grain_reader
8080
.get_complete_grain(current_index, Duration::from_secs(5))
8181
.unwrap();

0 commit comments

Comments
 (0)