diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d05767e..1526897 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -72,9 +72,8 @@ jobs: AWS_DEFAULT_REGION: ${{ secrets.MGMT_AWS_DEFAULT_REGION }} AWS_ACCESS_KEY_ID: ${{ secrets.MGMT_AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.MGMT_AWS_SECRET_ACCESS_KEY }} - SUITE_ID: greb3uy2wtq3 - THING_ARN: arn:aws:iot:eu-west-1:274906834921:thing/mqttrust - CERTIFICATE_ARN: arn:aws:iot:eu-west-1:274906834921:cert/e7280d8d316b58da3058037a2c1730d9eb15de50e96f4d47e54ea655266b76db + SUITE_ID: 1gaev57dq6i5 + THING_ARN: arn:aws:iot:eu-west-1:411974994697:thing/mqttrust steps: - name: Checkout uses: actions/checkout@v1 @@ -89,7 +88,7 @@ jobs: - name: Get AWS_HOSTNAME id: hostname run: | - hostname=$(aws iotdeviceadvisor get-endpoint --output text --query endpoint) + hostname=$(aws iotdeviceadvisor get-endpoint --thing-arn ${{ env.THING_ARN }} --output text --query endpoint) ret=$? echo "::set-output name=AWS_HOSTNAME::$hostname" exit $ret @@ -105,7 +104,7 @@ jobs: - name: Start test suite id: test_suite run: | - suite_id=$(aws iotdeviceadvisor start-suite-run --suite-definition-id ${{ env.SUITE_ID }} --suite-run-configuration "primaryDevice={thingArn=${{ env.THING_ARN }},certificateArn=${{ env.CERTIFICATE_ARN }}}" --output text --query suiteRunId) + suite_id=$(aws iotdeviceadvisor start-suite-run --suite-definition-id ${{ env.SUITE_ID }} --suite-run-configuration "primaryDevice={thingArn=${{ env.THING_ARN }}},parallelRun=true" --output text --query suiteRunId) ret=$? echo "::set-output name=SUITE_RUN_ID::$suite_id" exit $ret @@ -121,9 +120,9 @@ jobs: - name: Monitor test run run: | - chmod +x ./.github/scripts/da_monitor.sh + chmod +x ./scripts/da_monitor.sh echo ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }} - ./.github/scripts/da_monitor.sh ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }} + ./scripts/da_monitor.sh ${{ env.SUITE_ID }} ${{ steps.test_suite.outputs.SUITE_RUN_ID }} ${{ steps.binary.outputs.PID }} - name: Kill test binary process if: ${{ always() }} diff --git a/.gitignore b/.gitignore index 59a4524..0d3b7c6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ .gdb_history Cargo.lock target/ +device_advisor_integration.log \ No newline at end of file diff --git a/mqttrust_core/Cargo.toml b/mqttrust_core/Cargo.toml index 16cade3..f6c54af 100644 --- a/mqttrust_core/Cargo.toml +++ b/mqttrust_core/Cargo.toml @@ -40,11 +40,14 @@ defmt = { version = "^0.3", optional = true } [dev-dependencies] native-tls = { version = "^0.2" } dns-lookup = "1.0.3" -env_logger = "0.9.0" +env_logger = "0.11" +static_cell = "2.1" [features] default = [] std = [] -defmt-impl = ["defmt", "mqttrust/defmt-impl", "heapless/defmt-impl", "fugit/defmt"] +log = ["dep:log", "mqttrust/log"] + +defmt-impl = ["dep:defmt", "mqttrust/defmt-impl", "heapless/defmt-impl", "fugit/defmt"] diff --git a/mqttrust_core/examples/aws_device_advisor.rs b/mqttrust_core/examples/aws_device_advisor.rs index 0d86e29..9840706 100644 --- a/mqttrust_core/examples/aws_device_advisor.rs +++ b/mqttrust_core/examples/aws_device_advisor.rs @@ -1,5 +1,6 @@ mod common; +use mqttrust::encoding::v4::{Connack, ConnectReturnCode}; use mqttrust::{Mqtt, QoS, SubscribeTopic}; use mqttrust_core::bbqueue::BBBuffer; use mqttrust_core::{EventLoop, MqttOptions, Notification}; @@ -7,19 +8,27 @@ use mqttrust_core::{EventLoop, MqttOptions, Notification}; use common::clock::SysClock; use common::network::Network; use native_tls::TlsConnector; +use static_cell::StaticCell; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::thread; use crate::common::credentials; -static mut Q: BBBuffer<{ 1024 * 6 }> = BBBuffer::new(); +static mut Q: BBBuffer<{ 1024 * 60 }> = BBBuffer::new(); fn main() { env_logger::init(); let (p, c) = unsafe { Q.try_split_framed().unwrap() }; - let hostname = credentials::HOSTNAME.unwrap(); + static HOSTNAME: StaticCell = StaticCell::new(); + let hostname = HOSTNAME.init(credentials::hostname()); + + log::info!( + "Starting device advisor test on endpoint {}", + hostname.as_str() + ); let connector = TlsConnector::builder() .identity(credentials::identity()) @@ -27,30 +36,42 @@ fn main() { .build() .unwrap(); - let mut network = Network::new_tls(connector, String::from(hostname)); + let mut network = Network::new_tls(connector, hostname.clone()); let thing_name = "mqttrust"; let mut mqtt_eventloop = EventLoop::new( c, SysClock::new(), - MqttOptions::new(thing_name, hostname.into(), 8883), + MqttOptions::new(thing_name, hostname.as_str().into(), 8883), ); let mqtt_client = mqttrust_core::Client::new(p, thing_name); + let connected = Arc::new(AtomicBool::new(false)); + let con = connected.clone(); + thread::Builder::new() .name("eventloop".to_string()) .spawn(move || loop { match nb::block!(mqtt_eventloop.connect(&mut network)) { Err(_) => continue, - Ok(true) => { - log::info!("Successfully connected to broker"); + Ok(Some(Notification::ConnAck(Connack { + session_present, + code: ConnectReturnCode::Accepted, + }))) => { + log::info!( + "Successfully connected to broker. session_present: {}", + session_present + ); + con.store(true, std::sync::atomic::Ordering::Release); + } + Ok(n) => { + log::info!("Received {:?} during connect", n); } - Ok(false) => {} } - match mqtt_eventloop.yield_event(&mut network) { + match nb::block!(mqtt_eventloop.yield_event(&mut network)) { Ok(Notification::Publish(_)) => {} Ok(n) => { log::trace!("{:?}", n); @@ -62,19 +83,21 @@ fn main() { loop { thread::sleep(std::time::Duration::from_millis(5000)); - mqtt_client - .subscribe(&[SubscribeTopic { - topic_path: format!("{}/device/advisor", thing_name).as_str(), - qos: QoS::AtLeastOnce, - }]) - .unwrap(); - - mqtt_client - .publish( - format!("{}/device/advisor/hello", thing_name).as_str(), - format!("Hello from {}", thing_name).as_bytes(), - QoS::AtLeastOnce, - ) - .unwrap(); + if connected.load(std::sync::atomic::Ordering::Acquire) { + mqtt_client + .subscribe(&[SubscribeTopic { + topic_path: format!("plc/output/{}", thing_name).as_str(), + qos: QoS::AtLeastOnce, + }]) + .unwrap(); + + mqtt_client + .publish( + format!("plc/input/{}", thing_name).as_str(), + format!("Hello from {}", thing_name).as_bytes(), + QoS::AtLeastOnce, + ) + .unwrap(); + } } } diff --git a/mqttrust_core/examples/common/clock.rs b/mqttrust_core/examples/common/clock.rs index 227f6e6..2d19c5e 100644 --- a/mqttrust_core/examples/common/clock.rs +++ b/mqttrust_core/examples/common/clock.rs @@ -5,14 +5,12 @@ use std::{ }; pub struct SysClock { start_time: u32, - countdown_end: Option, } impl SysClock { pub fn new() -> Self { Self { start_time: Self::epoch(), - countdown_end: None, } } diff --git a/mqttrust_core/examples/common/credentials.rs b/mqttrust_core/examples/common/credentials.rs index 3057593..22bacb9 100644 --- a/mqttrust_core/examples/common/credentials.rs +++ b/mqttrust_core/examples/common/credentials.rs @@ -11,4 +11,6 @@ pub fn root_ca() -> Certificate { Certificate::from_pem(include_bytes!("../secrets/root-ca.pem")).unwrap() } -pub const HOSTNAME: Option<&'static str> = option_env!("AWS_HOSTNAME"); +pub fn hostname() -> String { + env::var("AWS_HOSTNAME").unwrap() +} diff --git a/mqttrust_core/examples/secrets/identity.pfx b/mqttrust_core/examples/secrets/identity.pfx index 625ee6f..3fb464e 100644 Binary files a/mqttrust_core/examples/secrets/identity.pfx and b/mqttrust_core/examples/secrets/identity.pfx differ diff --git a/mqttrust_core/src/eventloop.rs b/mqttrust_core/src/eventloop.rs index b534663..a49c211 100644 --- a/mqttrust_core/src/eventloop.rs +++ b/mqttrust_core/src/eventloop.rs @@ -7,7 +7,7 @@ use core::convert::Infallible; use core::ops::DerefMut; use core::ops::RangeTo; use embedded_nal::{AddrType, Dns, SocketAddr, TcpClientStack}; -use fugit::ExtU32; +use fugit::{ExtU32, TimerDurationU32}; use heapless::{String, Vec}; use mqttrust::encoding::v4::{decode_slice, encode_slice, Connect, Packet, Protocol}; @@ -24,6 +24,7 @@ where /// Request stream pub(crate) requests: Option>, network_handle: NetworkHandle, + connect_counter: u8, } impl<'a, 'b, S, O, const TIMER_HZ: u32, const L: usize> EventLoop<'a, 'b, S, O, TIMER_HZ, L> @@ -41,6 +42,7 @@ where options, requests: Some(requests), network_handle: NetworkHandle::new(), + connect_counter: 0, } } @@ -54,7 +56,7 @@ where pub fn connect + ?Sized>( &mut self, network: &mut N, - ) -> nb::Result { + ) -> nb::Result, EventError> { // connect to the broker match self.network_handle.is_connected(network) { Ok(false) => { @@ -216,16 +218,23 @@ where } } + fn backoff(&self) -> TimerDurationU32 { + let base_time_ms: u32 = 200; + let backoff = base_time_ms.saturating_mul(u32::pow(2, self.connect_counter as u32)); + + core::cmp::min(50.secs(), backoff.millis()) + } + fn mqtt_connect + ?Sized>( &mut self, network: &mut N, - ) -> nb::Result { + ) -> nb::Result, EventError> { match self.state.connection_status { - MqttConnectionStatus::Connected => Ok(false), + MqttConnectionStatus::Connected => Ok(None), MqttConnectionStatus::Disconnected => { - info!("MQTT connecting.."); let now = self.last_outgoing_timer.now(); self.state.last_ping_entry().insert(now); + self.connect_counter += 1; self.state.await_pingresp = false; self.network_handle.rx_buf.init(); @@ -242,6 +251,12 @@ where password, }); + info!( + "MQTT connecting.. Attempt: {}. Backoff time: {}", + self.connect_counter, + self.backoff().to_millis() + ); + // mqtt connection with timeout self.network_handle.send_packet(network, &connect)?; self.state.handle_outgoing_connect(); @@ -250,16 +265,20 @@ where MqttConnectionStatus::Handshake => { let now = self.last_outgoing_timer.now(); + let backoff_time = self.backoff(); + if self .state .last_ping_entry() .or_insert(now) - .has_elapsed(&now, 50.secs()) + .has_elapsed(&now, backoff_time) { + warn!("Timed out waiting for connect packet!"); return Err(nb::Error::Other(EventError::Timeout)); } - self.network_handle + let res = self + .network_handle .receive(network) .map_err(|e| e.map(EventError::Network))? .decode(&mut self.state) @@ -267,8 +286,16 @@ where if n.is_none() && p.is_none() { return Err(nb::Error::WouldBlock); } - Ok(n.map(|n| n == Notification::ConnAck).unwrap_or(false)) - }) + Ok(n) + }); + + match res { + Ok(r) => { + self.connect_counter = 0; + Ok(r) + } + Err(e) => Err(e), + } } } } @@ -417,7 +444,7 @@ impl NetworkHandle { #[derive(Debug)] struct PacketBuffer { range: RangeTo, - buffer: Vec, + buffer: Vec, } impl PacketBuffer { @@ -536,6 +563,7 @@ impl<'a> PacketDecoder<'a> { Err(EventError::Encoding(e).into()) } Ok(Some(packet)) => { + warn!("Got packet! {:?}", packet); self.is_err.replace(false); state .handle_incoming_packet(packet) @@ -699,12 +727,12 @@ mod tests { rx_buf.range.end += connack_len; let publish_len = encode_slice(&Packet::from(publish.clone()), rx_buf.buffer()).unwrap(); rx_buf.range.end += publish_len; - assert_eq!(rx_buf.range.end, rx_buf.buffer.capacity()); + assert_eq!(rx_buf.range.end, 4096); // Decode the first Connack packet on the Handshake state. state.connection_status = MqttConnectionStatus::Handshake; let (n, p) = PacketDecoder::new(&mut rx_buf).decode(&mut state).unwrap(); - assert_eq!(n, Some(Notification::ConnAck)); + assert!(matches!(n, Some(Notification::ConnAck(_)))); assert_eq!(p, None); let mut pkg = SerializedPacket(&mut rx_buf.buffer[rx_buf.range]); @@ -750,7 +778,7 @@ mod tests { rx_buf.range.end += connack_malformed_len; let publish_len = encode_slice(&Packet::from(publish.clone()), rx_buf.buffer()).unwrap(); rx_buf.range.end += publish_len; - assert_eq!(rx_buf.range.end, rx_buf.buffer.capacity()); + assert_eq!(rx_buf.range.end, 4096); // When a packet is malformed, we cannot tell its length. The decoder // discards the entire buffer. diff --git a/mqttrust_core/src/lib.rs b/mqttrust_core/src/lib.rs index df72d34..285bb57 100644 --- a/mqttrust_core/src/lib.rs +++ b/mqttrust_core/src/lib.rs @@ -18,6 +18,7 @@ pub use client::Client; use core::convert::TryFrom; pub use eventloop::EventLoop; use heapless::{String, Vec}; +use mqttrust::encoding::v4::Connack; pub use mqttrust::encoding::v4::{Pid, Publish, QoS, QosPid, Suback}; pub use mqttrust::*; pub use options::{Broker, MqttOptions}; @@ -39,7 +40,7 @@ pub struct PublishNotification { // #[cfg_attr(feature = "defmt-impl", derive(defmt::Format))] pub enum Notification { /// Incoming connection acknowledge - ConnAck, + ConnAck(Connack), /// Incoming publish from the broker #[cfg(not(feature = "std"))] Publish(heapless::pool::singleton::Box), diff --git a/mqttrust_core/src/state.rs b/mqttrust_core/src/state.rs index f4d1087..441c472 100644 --- a/mqttrust_core/src/state.rs +++ b/mqttrust_core/src/state.rs @@ -145,7 +145,7 @@ impl MqttState { match packet { Packet::Connack(connack) => self .handle_incoming_connack(connack) - .map(|()| (Notification::ConnAck.into(), None)), + .map(|()| (Notification::ConnAck(connack).into(), None)), Packet::Pingresp => self.handle_incoming_pingresp(), Packet::Publish(publish) => self.handle_incoming_publish(publish), Packet::Suback(suback) => self.handle_incoming_suback(suback), @@ -269,12 +269,23 @@ impl MqttState { let qospid = (publish.qos, publish.pid); #[cfg(not(feature = "std"))] - let boxed_publish = BoxedPublish::alloc().unwrap(); - #[cfg(not(feature = "std"))] - let notification = Notification::Publish(boxed_publish.init(publish.try_into().unwrap())); + let notification = if publish.payload.len() > 4096 { + error!( + "Received payload larger the {}! Sending ACK but discarding payload notification!", + 4096 + ); + None + } else { + let boxed_publish = BoxedPublish::alloc().unwrap(); + Some(Notification::Publish( + boxed_publish.init(publish.try_into().unwrap()), + )) + }; #[cfg(feature = "std")] - let notification = Notification::Publish(std::boxed::Box::new(publish.try_into().unwrap())); + let notification = Some(Notification::Publish(std::boxed::Box::new( + publish.try_into().unwrap(), + ))); let request = match qospid { (QoS::AtMostOnce, _) => None, @@ -289,7 +300,7 @@ impl MqttState { } _ => return Err(StateError::InvalidHeader), }; - Ok((Some(notification), request)) + Ok((notification, request)) } fn handle_incoming_pubrel( diff --git a/.github/scripts/da_monitor.sh b/scripts/da_monitor.sh old mode 100644 new mode 100755 similarity index 90% rename from .github/scripts/da_monitor.sh rename to scripts/da_monitor.sh index 508b298..44ab76a --- a/.github/scripts/da_monitor.sh +++ b/scripts/da_monitor.sh @@ -44,6 +44,15 @@ function report_status { done } +function check_pid { + if [ -n "$pid" ]; then + ps -p $pid > /dev/null; + return $? + else + return 0 + fi +} + while test ${IN_PROGRESS} == 1; do # Fetch the current status and stash in /tmp so we can use it throughout the status fetch process. @@ -68,13 +77,16 @@ while test ${IN_PROGRESS} == 1; do elif test x"${overall_status}" == x${STATUS_PASS}; then MONITOR_STATUS=0 IN_PROGRESS=0 + elif test x"${overall_status}" == x${STATUS_PASS_WITH_WARNINGS}; then + MONITOR_STATUS=0 + IN_PROGRESS=0 elif test x"${overall_status}" == x${STATUS_STOPPING}; then MONITOR_STATUS=1 IN_PROGRESS=0 elif test x"${overall_status}" == x${STATUS_STOPPED}; then MONITOR_STATUS=1 IN_PROGRESS=0 - elif { ps -p $pid > /dev/null; }; [ "$?" = 1 ]; then + elif check_pid; [ "$?" = 1 ]; then echo Binary is not running any more? MONITOR_STATUS=1 diff --git a/scripts/da_run_test.sh b/scripts/da_run_test.sh new file mode 100755 index 0000000..8f350ea --- /dev/null +++ b/scripts/da_run_test.sh @@ -0,0 +1,49 @@ +#!/usr/bin/env bash + +if [[ -z "$DEVICE_ADVISOR_PASSWORD" ]]; then + echo "DEVICE_ADVISOR_PASSWORD environment variable must be set!" + return 1; +fi + +set -e + +DAEMONIZE=true + +THING_NAME="mqttrust" +SUITE_ID="1gaev57dq6i5" +export RUST_LOG=debug + +THING_ARN="arn:aws:iot:eu-west-1:411974994697:thing/$THING_NAME" +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" + + +export AWS_HOSTNAME=$(aws iotdeviceadvisor get-endpoint --thing-arn $THING_ARN --output text --query endpoint) + +cargo build --features=log --example aws_device_advisor --release + +SUITE_RUN_ID=$(aws iotdeviceadvisor start-suite-run --suite-definition-id $SUITE_ID --suite-run-configuration "primaryDevice={thingArn=$THING_ARN},parallelRun=true" --output text --query suiteRunId) +if $DAEMONIZE; then + nohup ./target/release/examples/aws_device_advisor > device_advisor_integration.log & + PID=$! +else + echo "You can now run 'DEVICE_ADVISOR_PASSWORD=$DEVICE_ADVISOR_PASSWORD AWS_HOSTNAME=$AWS_HOSTNAME ./target/release/examples/aws_device_advisor' in a seperate terminal" +fi + +always() { + kill $PID || true + cat device_advisor_integration.log +} + +on_failure() { + if $DAEMONIZE; then + always || true + fi + aws iotdeviceadvisor stop-suite-run --suite-definition-id $SUITE_ID --suite-run-id $SUITE_RUN_ID +} + +trap "on_failure" ERR INT + +$SCRIPT_DIR/da_monitor.sh $SUITE_ID $SUITE_RUN_ID $PID + +always + diff --git a/scripts/rotate_secrets.sh b/scripts/rotate_secrets.sh new file mode 100755 index 0000000..5a06044 --- /dev/null +++ b/scripts/rotate_secrets.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash + +if [[ -z "$DEVICE_ADVISOR_PASSWORD" ]]; then + echo "DEVICE_ADVISOR_PASSWORD environment variable must be set!" + return 1; +fi + +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" +SECRETS_DIR="$SCRIPT_DIR/../mqttrust_core/examples/secrets" +THING_NAME="mqttrust" + +CERT_PATH="$SECRETS_DIR/cert.pem" +PRIV_KEY_PATH="$SECRETS_DIR/priv.key.pem" + +CERT_ARN=$(aws iot create-keys-and-certificate --set-as-active --certificate-pem-outfile $CERT_PATH --private-key-outfile $PRIV_KEY_PATH | jq -r .certificateArn); +for OLD_CERT in $(aws iot list-thing-principals --thing-name $THING_NAME | jq -r '.principals[]' | xargs); do + CERT_ID=$(echo $OLD_CERT | cut -d "/" -f 2) + aws iot detach-thing-principal --thing-name $THING_NAME --principal $OLD_CERT + aws iot update-certificate --new-status INACTIVE --certificate-id $CERT_ID + aws iot delete-certificate --certificate-id $CERT_ID --force-delete +done +aws iot attach-thing-principal --thing-name $THING_NAME --principal $CERT_ARN > /dev/null 2>&1 +aws iot attach-policy --policy-name Connect --target $CERT_ARN > /dev/null 2>&1 +aws iot attach-policy --policy-name Input --target $CERT_ARN > /dev/null 2>&1 +aws iot attach-policy --policy-name Output --target $CERT_ARN > /dev/null 2>&1 + +rm $SECRETS_DIR/identity.pfx + +# Generate new identity.pfx +openssl pkcs12 -export -passout pass:"$DEVICE_ADVISOR_PASSWORD" -out $SECRETS_DIR/identity.pfx -inkey $PRIV_KEY_PATH -in $CERT_PATH + +rm $CERT_PATH +rm $PRIV_KEY_PATH \ No newline at end of file