1
0
Fork 0

Close MQTT connection only after everything else has shutdown

refactor
Bo Jeanes 2022-09-08 18:31:05 +10:00
parent 34acb69ee1
commit addd7bd472
3 changed files with 27 additions and 33 deletions

View File

@ -33,7 +33,7 @@ pub(crate) async fn run(
address_offset,
client,
mqtt: mqtt.clone(),
shutdown,
shutdown: shutdown.clone(), // Important, so that we can publish "disconnected" below
rx,
tx,
};
@ -45,7 +45,8 @@ pub(crate) async fn run(
}
// we are shutting down here, so don't care if this fails
let _ = mqtt.publish("state", "disconnected").await;
let send = mqtt.publish("state", "disconnected").await;
debug!(?config, ?send, "shutting down modbus connection");
}
Err(error) => handle_tx.send(Err(error.into())).unwrap(),
}

View File

@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, future::Future};
use bytes::Bytes;
use rumqttc::{
@ -11,8 +11,6 @@ use tokio::{
};
use tracing::{debug, info, warn};
use crate::shutdown::Shutdown;
#[derive(Debug)]
pub struct Payload {
pub bytes: Bytes,
@ -26,7 +24,7 @@ pub enum Message {
Shutdown,
}
pub(crate) async fn new(options: MqttOptions, shutdown: Shutdown) -> Connection {
pub(crate) async fn new(options: MqttOptions) -> Connection {
let (client, event_loop) = AsyncClient::new(options, 32);
let (tx, rx) = channel(32);
@ -36,7 +34,6 @@ pub(crate) async fn new(options: MqttOptions, shutdown: Shutdown) -> Connection
subscriptions: HashMap::new(),
tx,
rx,
shutdown,
}
}
@ -48,7 +45,6 @@ pub(crate) struct Connection {
rx: Receiver<Message>,
client: AsyncClient,
event_loop: EventLoop,
shutdown: Shutdown,
}
impl Connection {
@ -61,14 +57,13 @@ impl Connection {
request = self.rx.recv() => {
match request {
None => return Ok(()),
Some(Message::Shutdown) => return Ok(()),
Some(Message::Shutdown) => {
info!("MQTT connection shutting down");
break;
}
Some(req) => self.handle_request(req).await?,
}
}
_ = self.shutdown.recv() => {
info!("MQTT connection shutting down");
break;
}
}
}
@ -223,6 +218,13 @@ impl Handle {
.map_err(|_| crate::Error::SendError)?;
Ok(())
}
pub async fn shutdown(self) -> crate::Result<()> {
self.tx
.send(Message::Shutdown)
.await
.map_err(|_| crate::Error::SendError)
}
}
pub(crate) trait Scopable {

View File

@ -1,4 +1,7 @@
use crate::{modbus, mqtt};
use crate::{
modbus,
mqtt::{self, Scopable},
};
use rumqttc::MqttOptions;
use std::{future::Future, time::Duration};
@ -18,26 +21,18 @@ pub async fn run<P: Into<String> + Send>(
let (notify_shutdown, _) = broadcast::channel(1);
let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel(1);
// TODO: make sure mqtt connection is last thing to shutdown, so other components can send final messages.
mqtt_options.set_last_will(rumqttc::LastWill {
topic: prefix.clone(),
message: "offline".into(),
qos: rumqttc::QoS::AtMostOnce,
retain: false,
});
let mut mqtt_connection = mqtt::new(
mqtt_options,
(notify_shutdown.subscribe(), shutdown_complete_tx.clone()).into(),
)
.await;
mqtt_connection
.handle()
.publish(prefix.clone(), "online")
.await?;
let mqtt = mqtt_connection.prefixed_handle(prefix)?;
let mut mqtt_connection = mqtt::new(mqtt_options).await;
let mqtt = mqtt_connection.handle();
mqtt.publish(prefix.clone(), "online").await?;
let mut connector = modbus::connector::new(
mqtt.clone(),
mqtt.scoped(prefix),
(notify_shutdown.subscribe(), shutdown_complete_tx.clone()).into(),
);
@ -57,13 +52,9 @@ pub async fn run<P: Into<String> + Send>(
drop(notify_shutdown);
drop(shutdown_complete_tx);
timeout(Duration::from_secs(5), shutdown_complete_rx.recv())
.await
.map_err(|_| {
crate::Error::Other("Shutdown didn't complete within 5 seconds; aborting".into())
})?;
info!("Shutdown.");
// We want MQTT to be the last thing to shutdown, so it gets shutdown after everything else
shutdown_complete_rx.recv().await;
mqtt.shutdown().await?;
Ok(())
}