Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ jobs:
AWS_DEFAULT_REGION: ${{ secrets.MGMT_AWS_DEFAULT_REGION }}
AWS_ACCESS_KEY_ID: ${{ secrets.MGMT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.MGMT_AWS_SECRET_ACCESS_KEY }}
SUITE_ID: greb3uy2wtq3
THING_ARN: arn:aws:iot:eu-west-1:274906834921:thing/mqttrust
CERTIFICATE_ARN: arn:aws:iot:eu-west-1:274906834921:cert/e7280d8d316b58da3058037a2c1730d9eb15de50e96f4d47e54ea655266b76db
SUITE_ID: 1gaev57dq6i5
THING_ARN: arn:aws:iot:eu-west-1:411974994697:thing/mqttrust
steps:
- name: Checkout
uses: actions/checkout@v1
Expand All @@ -89,7 +88,7 @@ jobs:
- name: Get AWS_HOSTNAME
id: hostname
run: |
hostname=$(aws iotdeviceadvisor get-endpoint --output text --query endpoint)
hostname=$(aws iotdeviceadvisor get-endpoint --thing-arn ${{ env.THING_ARN }} --output text --query endpoint)
ret=$?
echo "::set-output name=AWS_HOSTNAME::$hostname"
exit $ret
Expand All @@ -105,7 +104,7 @@ jobs:
- name: Start test suite
id: test_suite
run: |
suite_id=$(aws iotdeviceadvisor start-suite-run --suite-definition-id ${{ env.SUITE_ID }} --suite-run-configuration "primaryDevice={thingArn=${{ env.THING_ARN }},certificateArn=${{ env.CERTIFICATE_ARN }}}" --output text --query suiteRunId)
suite_id=$(aws iotdeviceadvisor start-suite-run --suite-definition-id ${{ env.SUITE_ID }} --suite-run-configuration "primaryDevice={thingArn=${{ env.THING_ARN }}},parallelRun=true" --output text --query suiteRunId)
ret=$?
echo "::set-output name=SUITE_RUN_ID::$suite_id"
exit $ret
Expand All @@ -121,9 +120,9 @@ jobs:

- name: Monitor test run
run: |
chmod +x ./.github/scripts/da_monitor.sh
chmod +x ./scripts/da_monitor.sh
echo ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }}
./.github/scripts/da_monitor.sh ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }}
./scripts/da_monitor.sh ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }}

- name: Kill test binary process
if: ${{ always() }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
.gdb_history
Cargo.lock
target/
device_advisor_integration.log
7 changes: 5 additions & 2 deletions mqttrust_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ defmt = { version = "^0.3", optional = true }
[dev-dependencies]
native-tls = { version = "^0.2" }
dns-lookup = "1.0.3"
env_logger = "0.9.0"
env_logger = "0.11"
static_cell = "2.1"

[features]
default = []

std = []

defmt-impl = ["defmt", "mqttrust/defmt-impl", "heapless/defmt-impl", "fugit/defmt"]
log = ["dep:log", "mqttrust/log"]

defmt-impl = ["dep:defmt", "mqttrust/defmt-impl", "heapless/defmt-impl", "fugit/defmt"]
67 changes: 45 additions & 22 deletions mqttrust_core/examples/aws_device_advisor.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,77 @@
mod common;

use mqttrust::encoding::v4::{Connack, ConnectReturnCode};
use mqttrust::{Mqtt, QoS, SubscribeTopic};
use mqttrust_core::bbqueue::BBBuffer;
use mqttrust_core::{EventLoop, MqttOptions, Notification};

use common::clock::SysClock;
use common::network::Network;
use native_tls::TlsConnector;
use static_cell::StaticCell;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread;

use crate::common::credentials;

static mut Q: BBBuffer<{ 1024 * 6 }> = BBBuffer::new();
static mut Q: BBBuffer<{ 1024 * 60 }> = BBBuffer::new();

fn main() {
env_logger::init();

let (p, c) = unsafe { Q.try_split_framed().unwrap() };

let hostname = credentials::HOSTNAME.unwrap();
static HOSTNAME: StaticCell<String> = StaticCell::new();
let hostname = HOSTNAME.init(credentials::hostname());

log::info!(
"Starting device advisor test on endpoint {}",
hostname.as_str()
);

let connector = TlsConnector::builder()
.identity(credentials::identity())
.add_root_certificate(credentials::root_ca())
.build()
.unwrap();

let mut network = Network::new_tls(connector, String::from(hostname));
let mut network = Network::new_tls(connector, hostname.clone());

let thing_name = "mqttrust";

let mut mqtt_eventloop = EventLoop::new(
c,
SysClock::new(),
MqttOptions::new(thing_name, hostname.into(), 8883),
MqttOptions::new(thing_name, hostname.as_str().into(), 8883),
);

let mqtt_client = mqttrust_core::Client::new(p, thing_name);

let connected = Arc::new(AtomicBool::new(false));
let con = connected.clone();

thread::Builder::new()
.name("eventloop".to_string())
.spawn(move || loop {
match nb::block!(mqtt_eventloop.connect(&mut network)) {
Err(_) => continue,
Ok(true) => {
log::info!("Successfully connected to broker");
Ok(Some(Notification::ConnAck(Connack {
session_present,
code: ConnectReturnCode::Accepted,
}))) => {
log::info!(
"Successfully connected to broker. session_present: {}",
session_present
);
con.store(true, std::sync::atomic::Ordering::Release);
}
Ok(n) => {
log::info!("Received {:?} during connect", n);
}
Ok(false) => {}
}

match mqtt_eventloop.yield_event(&mut network) {
match nb::block!(mqtt_eventloop.yield_event(&mut network)) {
Ok(Notification::Publish(_)) => {}
Ok(n) => {
log::trace!("{:?}", n);
Expand All @@ -62,19 +83,21 @@ fn main() {

loop {
thread::sleep(std::time::Duration::from_millis(5000));
mqtt_client
.subscribe(&[SubscribeTopic {
topic_path: format!("{}/device/advisor", thing_name).as_str(),
qos: QoS::AtLeastOnce,
}])
.unwrap();

mqtt_client
.publish(
format!("{}/device/advisor/hello", thing_name).as_str(),
format!("Hello from {}", thing_name).as_bytes(),
QoS::AtLeastOnce,
)
.unwrap();
if connected.load(std::sync::atomic::Ordering::Acquire) {
mqtt_client
.subscribe(&[SubscribeTopic {
topic_path: format!("plc/output/{}", thing_name).as_str(),
qos: QoS::AtLeastOnce,
}])
.unwrap();

mqtt_client
.publish(
format!("plc/input/{}", thing_name).as_str(),
format!("Hello from {}", thing_name).as_bytes(),
QoS::AtLeastOnce,
)
.unwrap();
}
}
}
2 changes: 0 additions & 2 deletions mqttrust_core/examples/common/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ use std::{
};
pub struct SysClock {
start_time: u32,
countdown_end: Option<u32>,
}

impl SysClock {
pub fn new() -> Self {
Self {
start_time: Self::epoch(),
countdown_end: None,
}
}

Expand Down
4 changes: 3 additions & 1 deletion mqttrust_core/examples/common/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ pub fn root_ca() -> Certificate {
Certificate::from_pem(include_bytes!("../secrets/root-ca.pem")).unwrap()
}

pub const HOSTNAME: Option<&'static str> = option_env!("AWS_HOSTNAME");
pub fn hostname() -> String {
env::var("AWS_HOSTNAME").unwrap()
}
Binary file modified mqttrust_core/examples/secrets/identity.pfx
Binary file not shown.
54 changes: 41 additions & 13 deletions mqttrust_core/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use core::convert::Infallible;
use core::ops::DerefMut;
use core::ops::RangeTo;
use embedded_nal::{AddrType, Dns, SocketAddr, TcpClientStack};
use fugit::ExtU32;
use fugit::{ExtU32, TimerDurationU32};
use heapless::{String, Vec};
use mqttrust::encoding::v4::{decode_slice, encode_slice, Connect, Packet, Protocol};

Expand All @@ -24,6 +24,7 @@ where
/// Request stream
pub(crate) requests: Option<FrameConsumer<'a, L>>,
network_handle: NetworkHandle<S>,
connect_counter: u8,
}

impl<'a, 'b, S, O, const TIMER_HZ: u32, const L: usize> EventLoop<'a, 'b, S, O, TIMER_HZ, L>
Expand All @@ -41,6 +42,7 @@ where
options,
requests: Some(requests),
network_handle: NetworkHandle::new(),
connect_counter: 0,
}
}

Expand All @@ -54,7 +56,7 @@ where
pub fn connect<N: Dns + TcpClientStack<TcpSocket = S> + ?Sized>(
&mut self,
network: &mut N,
) -> nb::Result<bool, EventError> {
) -> nb::Result<Option<Notification>, EventError> {
// connect to the broker
match self.network_handle.is_connected(network) {
Ok(false) => {
Expand Down Expand Up @@ -216,16 +218,23 @@ where
}
}

fn backoff(&self) -> TimerDurationU32<TIMER_HZ> {
let base_time_ms: u32 = 200;
let backoff = base_time_ms.saturating_mul(u32::pow(2, self.connect_counter as u32));

core::cmp::min(50.secs(), backoff.millis())
}

fn mqtt_connect<N: TcpClientStack<TcpSocket = S> + ?Sized>(
&mut self,
network: &mut N,
) -> nb::Result<bool, EventError> {
) -> nb::Result<Option<Notification>, EventError> {
match self.state.connection_status {
MqttConnectionStatus::Connected => Ok(false),
MqttConnectionStatus::Connected => Ok(None),
MqttConnectionStatus::Disconnected => {
info!("MQTT connecting..");
let now = self.last_outgoing_timer.now();
self.state.last_ping_entry().insert(now);
self.connect_counter += 1;

self.state.await_pingresp = false;
self.network_handle.rx_buf.init();
Expand All @@ -242,6 +251,12 @@ where
password,
});

info!(
"MQTT connecting.. Attempt: {}. Backoff time: {}",
self.connect_counter,
self.backoff().to_millis()
);

// mqtt connection with timeout
self.network_handle.send_packet(network, &connect)?;
self.state.handle_outgoing_connect();
Expand All @@ -250,25 +265,37 @@ where
MqttConnectionStatus::Handshake => {
let now = self.last_outgoing_timer.now();

let backoff_time = self.backoff();

if self
.state
.last_ping_entry()
.or_insert(now)
.has_elapsed(&now, 50.secs())
.has_elapsed(&now, backoff_time)
{
warn!("Timed out waiting for connect packet!");
return Err(nb::Error::Other(EventError::Timeout));
}

self.network_handle
let res = self
.network_handle
.receive(network)
.map_err(|e| e.map(EventError::Network))?
.decode(&mut self.state)
.and_then(|(n, p)| {
if n.is_none() && p.is_none() {
return Err(nb::Error::WouldBlock);
}
Ok(n.map(|n| n == Notification::ConnAck).unwrap_or(false))
})
Ok(n)
});

match res {
Ok(r) => {
self.connect_counter = 0;
Ok(r)
}
Err(e) => Err(e),
}
}
}
}
Expand Down Expand Up @@ -417,7 +444,7 @@ impl<S> NetworkHandle<S> {
#[derive(Debug)]
struct PacketBuffer {
range: RangeTo<usize>,
buffer: Vec<u8, 4096>,
buffer: Vec<u8, { 1024 * 16 }>,
}

impl PacketBuffer {
Expand Down Expand Up @@ -536,6 +563,7 @@ impl<'a> PacketDecoder<'a> {
Err(EventError::Encoding(e).into())
}
Ok(Some(packet)) => {
warn!("Got packet! {:?}", packet);
self.is_err.replace(false);
state
.handle_incoming_packet(packet)
Expand Down Expand Up @@ -699,12 +727,12 @@ mod tests {
rx_buf.range.end += connack_len;
let publish_len = encode_slice(&Packet::from(publish.clone()), rx_buf.buffer()).unwrap();
rx_buf.range.end += publish_len;
assert_eq!(rx_buf.range.end, rx_buf.buffer.capacity());
assert_eq!(rx_buf.range.end, 4096);

// Decode the first Connack packet on the Handshake state.
state.connection_status = MqttConnectionStatus::Handshake;
let (n, p) = PacketDecoder::new(&mut rx_buf).decode(&mut state).unwrap();
assert_eq!(n, Some(Notification::ConnAck));
assert!(matches!(n, Some(Notification::ConnAck(_))));
assert_eq!(p, None);

let mut pkg = SerializedPacket(&mut rx_buf.buffer[rx_buf.range]);
Expand Down Expand Up @@ -750,7 +778,7 @@ mod tests {
rx_buf.range.end += connack_malformed_len;
let publish_len = encode_slice(&Packet::from(publish.clone()), rx_buf.buffer()).unwrap();
rx_buf.range.end += publish_len;
assert_eq!(rx_buf.range.end, rx_buf.buffer.capacity());
assert_eq!(rx_buf.range.end, 4096);

// When a packet is malformed, we cannot tell its length. The decoder
// discards the entire buffer.
Expand Down
3 changes: 2 additions & 1 deletion mqttrust_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use client::Client;
use core::convert::TryFrom;
pub use eventloop::EventLoop;
use heapless::{String, Vec};
use mqttrust::encoding::v4::Connack;
pub use mqttrust::encoding::v4::{Pid, Publish, QoS, QosPid, Suback};
pub use mqttrust::*;
pub use options::{Broker, MqttOptions};
Expand All @@ -39,7 +40,7 @@ pub struct PublishNotification {
// #[cfg_attr(feature = "defmt-impl", derive(defmt::Format))]
pub enum Notification {
/// Incoming connection acknowledge
ConnAck,
ConnAck(Connack),
/// Incoming publish from the broker
#[cfg(not(feature = "std"))]
Publish(heapless::pool::singleton::Box<state::BoxedPublish, heapless::pool::Init>),
Expand Down
Loading