From addd7bd472fc1df3ea7ae2e0fddded29c023a2e9 Mon Sep 17 00:00:00 2001 From: Bo Jeanes Date: Thu, 8 Sep 2022 18:31:05 +1000 Subject: [PATCH] Close MQTT connection only after everything else has shutdown --- modbus-mqtt/src/modbus/connection.rs | 5 +++-- modbus-mqtt/src/mqtt.rs | 24 +++++++++++---------- modbus-mqtt/src/server.rs | 31 ++++++++++------------------ 3 files changed, 27 insertions(+), 33 deletions(-) diff --git a/modbus-mqtt/src/modbus/connection.rs b/modbus-mqtt/src/modbus/connection.rs index 1561c69..f67d36e 100644 --- a/modbus-mqtt/src/modbus/connection.rs +++ b/modbus-mqtt/src/modbus/connection.rs @@ -33,7 +33,7 @@ pub(crate) async fn run( address_offset, client, mqtt: mqtt.clone(), - shutdown, + shutdown: shutdown.clone(), // Important, so that we can publish "disconnected" below rx, tx, }; @@ -45,7 +45,8 @@ pub(crate) async fn run( } // we are shutting down here, so don't care if this fails - let _ = mqtt.publish("state", "disconnected").await; + let send = mqtt.publish("state", "disconnected").await; + debug!(?config, ?send, "shutting down modbus connection"); } Err(error) => handle_tx.send(Err(error.into())).unwrap(), } diff --git a/modbus-mqtt/src/mqtt.rs b/modbus-mqtt/src/mqtt.rs index 8807758..1ed7ad4 100644 --- a/modbus-mqtt/src/mqtt.rs +++ b/modbus-mqtt/src/mqtt.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, future::Future}; use bytes::Bytes; use rumqttc::{ @@ -11,8 +11,6 @@ use tokio::{ }; use tracing::{debug, info, warn}; -use crate::shutdown::Shutdown; - #[derive(Debug)] pub struct Payload { pub bytes: Bytes, @@ -26,7 +24,7 @@ pub enum Message { Shutdown, } -pub(crate) async fn new(options: MqttOptions, shutdown: Shutdown) -> Connection { +pub(crate) async fn new(options: MqttOptions) -> Connection { let (client, event_loop) = AsyncClient::new(options, 32); let (tx, rx) = channel(32); @@ -36,7 +34,6 @@ pub(crate) async fn new(options: MqttOptions, shutdown: Shutdown) -> Connection subscriptions: HashMap::new(), tx, rx, - shutdown, } } @@ -48,7 +45,6 @@ pub(crate) struct Connection { rx: Receiver, client: AsyncClient, event_loop: EventLoop, - shutdown: Shutdown, } impl Connection { @@ -61,14 +57,13 @@ impl Connection { request = self.rx.recv() => { match request { None => return Ok(()), - Some(Message::Shutdown) => return Ok(()), + Some(Message::Shutdown) => { + info!("MQTT connection shutting down"); + break; + } Some(req) => self.handle_request(req).await?, } } - _ = self.shutdown.recv() => { - info!("MQTT connection shutting down"); - break; - } } } @@ -223,6 +218,13 @@ impl Handle { .map_err(|_| crate::Error::SendError)?; Ok(()) } + + pub async fn shutdown(self) -> crate::Result<()> { + self.tx + .send(Message::Shutdown) + .await + .map_err(|_| crate::Error::SendError) + } } pub(crate) trait Scopable { diff --git a/modbus-mqtt/src/server.rs b/modbus-mqtt/src/server.rs index 3ec80b0..6c3242e 100644 --- a/modbus-mqtt/src/server.rs +++ b/modbus-mqtt/src/server.rs @@ -1,4 +1,7 @@ -use crate::{modbus, mqtt}; +use crate::{ + modbus, + mqtt::{self, Scopable}, +}; use rumqttc::MqttOptions; use std::{future::Future, time::Duration}; @@ -18,26 +21,18 @@ pub async fn run + Send>( let (notify_shutdown, _) = broadcast::channel(1); let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1); - // TODO: make sure mqtt connection is last thing to shutdown, so other components can send final messages. mqtt_options.set_last_will(rumqttc::LastWill { topic: prefix.clone(), message: "offline".into(), qos: rumqttc::QoS::AtMostOnce, retain: false, }); - let mut mqtt_connection = mqtt::new( - mqtt_options, - (notify_shutdown.subscribe(), shutdown_complete_tx.clone()).into(), - ) - .await; - mqtt_connection - .handle() - .publish(prefix.clone(), "online") - .await?; - let mqtt = mqtt_connection.prefixed_handle(prefix)?; + let mut mqtt_connection = mqtt::new(mqtt_options).await; + let mqtt = mqtt_connection.handle(); + mqtt.publish(prefix.clone(), "online").await?; let mut connector = modbus::connector::new( - mqtt.clone(), + mqtt.scoped(prefix), (notify_shutdown.subscribe(), shutdown_complete_tx.clone()).into(), ); @@ -57,13 +52,9 @@ pub async fn run + Send>( drop(notify_shutdown); drop(shutdown_complete_tx); - timeout(Duration::from_secs(5), shutdown_complete_rx.recv()) - .await - .map_err(|_| { - crate::Error::Other("Shutdown didn't complete within 5 seconds; aborting".into()) - })?; - - info!("Shutdown."); + // We want MQTT to be the last thing to shutdown, so it gets shutdown after everything else + shutdown_complete_rx.recv().await; + mqtt.shutdown().await?; Ok(()) }