Skip to content

Commit bf942e5

Browse files
mdrssvshimunn
authored andcommitted
feat(rumqttd/broker): implemented filter for publish packets
BREAKING CHANGE: `Router::new` now takes an list of `PublishFilterRef` as additional parameter
1 parent 2377e4e commit bf942e5

File tree

5 files changed

+161
-10
lines changed

5 files changed

+161
-10
lines changed

rumqttd/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use tracing_subscriber::{
2121
pub use link::alerts;
2222
pub use link::local;
2323
pub use link::meters;
24-
pub use router::{Alert, Forward, IncomingMeter, Meter, Notification, OutgoingMeter};
24+
pub use router::{Alert, Forward, IncomingMeter, Meter, Notification, OutgoingMeter, PublishFilter, PublishFilterRef};
2525
use segments::Storage;
2626
pub use server::Broker;
2727

rumqttd/src/router/filter.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
use std::{fmt::Debug, ops::Deref, sync::Arc};
2+
3+
use crate::protocol::{Publish, PublishProperties};
4+
5+
/// Filter for [`Publish`] packets
6+
pub trait PublishFilter {
7+
/// Determines weather an [`Publish`] packet should be processed
8+
/// Arguments:
9+
/// * `packet`: to be published, may be modified if necessary
10+
/// * `properties`: received along with the packet, may be `None` for older MQTT versions
11+
/// Returns: [`bool`] indicating if the packet should be processed
12+
fn filter(&self, packet: &mut Publish, properties: Option<&mut PublishProperties>) -> bool;
13+
}
14+
15+
/// Container for either an owned [`PublishFilter`] or an `'static` reference
16+
#[derive(Clone)]
17+
pub enum PublishFilterRef {
18+
Owned(Arc<dyn PublishFilter + Send + Sync>),
19+
Static(&'static (dyn PublishFilter + Send + Sync)),
20+
}
21+
22+
impl Debug for PublishFilterRef {
23+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24+
match self {
25+
Self::Owned(_arg0) => f.debug_tuple("Owned").finish(),
26+
Self::Static(_arg0) => f.debug_tuple("Static").finish(),
27+
}
28+
}
29+
}
30+
31+
impl Deref for PublishFilterRef {
32+
type Target = dyn PublishFilter;
33+
34+
fn deref(&self) -> &Self::Target {
35+
match self {
36+
Self::Static(filter) => *filter,
37+
Self::Owned(filter) => &**filter,
38+
}
39+
}
40+
}
41+
42+
/// Implements [`PublishFilter`] for any ordinary function
43+
impl<F> PublishFilter for F
44+
where
45+
F: Fn(&mut Publish, Option<&mut PublishProperties>) -> bool + Send + Sync,
46+
{
47+
fn filter(&self, packet: &mut Publish, properties: Option<&mut PublishProperties>) -> bool {
48+
self(packet, properties)
49+
}
50+
}
51+
52+
/// Implements the conversion
53+
/// ```rust
54+
/// # use rumqttd::{protocol::{Publish, PublishProperties}, PublishFilterRef};
55+
/// fn filter_static(packet: &mut Publish, properties: Option<&mut PublishProperties>) -> bool {
56+
/// todo!()
57+
/// }
58+
///
59+
/// let filter = PublishFilterRef::from(&filter_static);
60+
/// # assert!(matches!(filter, PublishFilterRef::Static(_)));
61+
/// ```
62+
impl<F> From<&'static F> for PublishFilterRef
63+
where
64+
F: Fn(&mut Publish, Option<&mut PublishProperties>) -> bool + Send + Sync,
65+
{
66+
fn from(value: &'static F) -> Self {
67+
Self::Static(value)
68+
}
69+
}
70+
71+
/// Implements the conversion
72+
/// ```rust
73+
/// # use std::boxed::Box;
74+
/// # use rumqttd::{protocol::{Publish, PublishProperties}, PublishFilter, PublishFilterRef};
75+
/// #[derive(Clone)]
76+
/// struct MyFilter {}
77+
///
78+
/// impl PublishFilter for MyFilter {
79+
/// fn filter(&self, packet: &mut Publish, properties: Option<&mut PublishProperties>) -> bool {
80+
/// todo!()
81+
/// }
82+
/// }
83+
/// let boxed: Box<MyFilter> = Box::new(MyFilter {});
84+
///
85+
/// let filter = PublishFilterRef::from(boxed);
86+
/// # assert!(matches!(filter, PublishFilterRef::Owned(_)));
87+
/// ```
88+
impl<T> From<Arc<T>> for PublishFilterRef
89+
where
90+
T: PublishFilter + 'static + Send + Sync,
91+
{
92+
fn from(value: Arc<T>) -> Self {
93+
Self::Owned(value)
94+
}
95+
}
96+
97+
impl<T> From<Box<T>> for PublishFilterRef
98+
where
99+
T: PublishFilter + 'static + Send + Sync,
100+
{
101+
fn from(value: Box<T>) -> Self {
102+
Self::Owned(Arc::<T>::from(value))
103+
}
104+
}
105+
106+
#[cfg(test)]
107+
mod tests {
108+
use super::*;
109+
110+
fn filter_static(_packet: &mut Publish, _properties: Option<&mut PublishProperties>) -> bool {
111+
true
112+
}
113+
struct Prejudiced(bool);
114+
115+
impl PublishFilter for Prejudiced {
116+
fn filter(&self, _packet: &mut Publish,_propertiess: Option<&mut PublishProperties>) -> bool {
117+
self.0
118+
}
119+
}
120+
#[test]
121+
fn static_filter() {
122+
fn is_send<T: Send>(_: &T) {}
123+
fn takes_static_filter(filter: impl Into<PublishFilterRef>) {
124+
assert!(matches!(filter.into(), PublishFilterRef::Static(_)));
125+
}
126+
fn takes_owned_filter(filter: impl Into<PublishFilterRef>) {
127+
assert!(matches!(filter.into(), PublishFilterRef::Owned(_)));
128+
}
129+
takes_static_filter(&filter_static);
130+
let boxed: PublishFilterRef = Box::new(Prejudiced(false)).into();
131+
is_send(&boxed);
132+
takes_owned_filter(boxed);
133+
}
134+
}

rumqttd/src/router/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ mod routing;
2424
mod scheduler;
2525
pub(crate) mod shared_subs;
2626
mod waiters;
27+
mod filter;
2728

29+
pub use filter::{PublishFilter, PublishFilterRef};
2830
pub use alertlog::Alert;
2931
pub use connection::Connection;
3032
pub use routing::Router;

rumqttd/src/router/routing.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ pub struct Router {
7070
connections: Slab<Connection>,
7171
/// Connection map from device id to connection id
7272
connection_map: HashMap<String, ConnectionId>,
73+
/// Filters to be applied to an [`Publish`] packets payload
74+
publish_filters: Vec<PublishFilterRef>,
7375
/// Subscription map to interested connection ids
7476
subscription_map: HashMap<Filter, HashSet<ConnectionId>>,
7577
/// Incoming data grouped by connection
@@ -105,7 +107,7 @@ pub struct Router {
105107
}
106108

107109
impl Router {
108-
pub fn new(router_id: RouterId, config: RouterConfig) -> Router {
110+
pub fn new(router_id: RouterId, publish_filters: Vec<PublishFilterRef>, config: RouterConfig) -> Router {
109111
let (router_tx, router_rx) = bounded(1000);
110112

111113
let meters = Slab::with_capacity(10);
@@ -129,6 +131,7 @@ impl Router {
129131
alerts,
130132
connections,
131133
connection_map: Default::default(),
134+
publish_filters,
132135
subscription_map: Default::default(),
133136
ibufs,
134137
obufs,
@@ -557,13 +560,18 @@ impl Router {
557560

558561
for packet in packets.drain(0..) {
559562
match packet {
560-
Packet::Publish(publish, properties) => {
563+
Packet::Publish(mut publish, mut properties) => {
564+
println!("publish: {publish:?} payload: {:?}", publish.payload.to_vec());
561565
let span = tracing::error_span!("publish", topic = ?publish.topic, pkid = publish.pkid);
562566
let _guard = span.enter();
563567

564568
let qos = publish.qos;
565569
let pkid = publish.pkid;
566-
570+
571+
// Decide weather to keep or discard this packet
572+
// Packet will be discard if *at least one* filter returns *false*
573+
let keep = self.publish_filters.iter().fold(true,|keep,f| keep && f.filter(&mut publish, properties.as_mut())) ;
574+
567575
// Prepare acks for the above publish
568576
// If any of the publish in the batch results in force flush,
569577
// set global force flush flag. Force flush is triggered when the
@@ -577,12 +585,11 @@ impl Router {
577585
// coordinate using multiple offsets, and we don't have any idea how to do so right now.
578586
// Currently as we don't have replication, we just use a single offset, even when appending to
579587
// multiple commit logs.
580-
581588
match qos {
582589
QoS::AtLeastOnce => {
583590
let puback = PubAck {
584591
pkid,
585-
reason: PubAckReason::Success,
592+
reason: if keep { PubAckReason::Success } else { PubAckReason::PayloadFormatInvalid },
586593
};
587594

588595
let ackslog = self.ackslog.get_mut(id).unwrap();
@@ -592,7 +599,7 @@ impl Router {
592599
QoS::ExactlyOnce => {
593600
let pubrec = PubRec {
594601
pkid,
595-
reason: PubRecReason::Success,
602+
reason: if keep { PubRecReason::Success } else { PubRecReason::PayloadFormatInvalid },
596603
};
597604

598605
let ackslog = self.ackslog.get_mut(id).unwrap();
@@ -604,7 +611,9 @@ impl Router {
604611
// Do nothing
605612
}
606613
};
607-
614+
if !keep {
615+
break;
616+
}
608617
self.router_meters.total_publishes += 1;
609618

610619
// Try to append publish to commitlog

rumqttd/src/server/broker.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use std::{io, thread};
3535

3636
use crate::link::console;
3737
use crate::link::local::{self, LinkRx, LinkTx};
38-
use crate::router::{Event, Router};
38+
use crate::router::{Event, PublishFilterRef, Router};
3939
use crate::{Config, ConnectionId, ServerSettings};
4040

4141
use tokio::net::{TcpListener, TcpStream};
@@ -71,9 +71,13 @@ pub struct Broker {
7171

7272
impl Broker {
7373
pub fn new(config: Config) -> Broker {
74+
Self::with_filter(config, Vec::new())
75+
}
76+
77+
pub fn with_filter(config: Config, publish_filters: Vec<PublishFilterRef>) -> Broker {
7478
let config = Arc::new(config);
7579
let router_config = config.router.clone();
76-
let router: Router = Router::new(config.id, router_config);
80+
let router: Router = Router::new(config.id, publish_filters, router_config);
7781

7882
// Setup cluster if cluster settings are configured.
7983
match config.cluster.clone() {
@@ -96,6 +100,8 @@ impl Broker {
96100
}
97101
}
98102

103+
104+
99105
// pub fn new_local_cluster(
100106
// config: Config,
101107
// node_id: NodeId,

0 commit comments

Comments
 (0)