Skip to content

Commit 324cd8c

Browse files
committed
fix: update Appender trait to include flush_event_datetime method
1 parent a3770bc commit 324cd8c

1 file changed

Lines changed: 20 additions & 6 deletions

File tree

crates/runtime/src/logger.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use async_trait::async_trait;
22
use bytes::Bytes;
3+
use chrono::{DateTime, Utc};
34
use std::any::Any;
45
use std::collections::HashMap;
56
use std::io::IoSlice;
@@ -21,20 +22,24 @@ pub struct Logger {
2122
}
2223

2324
pub trait AppenderBuilder {
24-
fn build(&self, properties: HashMap<String, String>) -> Box<dyn AsyncWrite + Send + Sync>;
25+
fn build(&self, properties: HashMap<String, String>) -> Box<dyn Appender>;
26+
}
27+
28+
pub trait Appender: AsyncWrite + Send + Sync {
29+
fn flush_event_datetime(self: Pin<&mut Self>, time: DateTime<Utc>);
2530
}
2631

2732
/// Fans out writes sequentially to multiple [`AsyncWrite`] sinks.
2833
struct MultiWriter {
29-
writers: Vec<Box<dyn AsyncWrite + Send + Sync>>,
34+
writers: Vec<Box<dyn Appender>>,
3035
write_current: usize,
3136
write_n: usize,
3237
flush_current: usize,
3338
shutdown_current: usize,
3439
}
3540

3641
impl MultiWriter {
37-
fn new(writers: Vec<Box<dyn AsyncWrite + Send + Sync>>) -> Self {
42+
fn new(writers: Vec<Box<dyn Appender>>) -> Self {
3843
Self {
3944
writers,
4045
write_current: 0,
@@ -96,10 +101,11 @@ impl AsyncWrite for MultiWriter {
96101

97102
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
98103
let this = self.get_mut();
99-
104+
let time = Utc::now();
100105
while this.flush_current < this.writers.len() {
101106
let idx = this.flush_current;
102107
// SAFETY: writers are heap-allocated (Box) and won't move.
108+
unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.flush_event_datetime(time);
103109
match unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.poll_flush(cx) {
104110
Poll::Ready(_) => this.flush_current += 1,
105111
Poll::Pending => return Poll::Pending,
@@ -197,7 +203,7 @@ impl Extend<(String, String)> for Logger {
197203
struct NullAppender;
198204

199205
impl AppenderBuilder for NullAppender {
200-
fn build(&self, _fields: HashMap<String, String>) -> Box<dyn AsyncWrite + Send + Sync> {
206+
fn build(&self, _fields: HashMap<String, String>) -> Box<dyn Appender> {
201207
Box::new(NullAppender)
202208
}
203209
}
@@ -207,6 +213,10 @@ impl Pollable for NullAppender {
207213
async fn ready(&mut self) {}
208214
}
209215

216+
impl Appender for NullAppender {
217+
fn flush_event_datetime(self: Pin<&mut Self>, _time: DateTime<Utc>) {}
218+
}
219+
210220
impl AsyncWrite for NullAppender {
211221
fn poll_write(
212222
self: Pin<&mut Self>,
@@ -295,7 +305,7 @@ impl Default for Console {
295305
}
296306

297307
impl AppenderBuilder for Console {
298-
fn build(&self, _fields: HashMap<String, String>) -> Box<dyn AsyncWrite + Send + Sync> {
308+
fn build(&self, _fields: HashMap<String, String>) -> Box<dyn Appender> {
299309
Box::new(Console::default())
300310
}
301311
}
@@ -305,6 +315,10 @@ impl Pollable for Console {
305315
async fn ready(&mut self) {}
306316
}
307317

318+
impl Appender for Console {
319+
fn flush_event_datetime(self: Pin<&mut Self>, _time: DateTime<Utc>) {}
320+
}
321+
308322
impl AsyncWrite for Console {
309323
fn poll_write(
310324
mut self: Pin<&mut Self>,

0 commit comments

Comments
 (0)