diff --git a/modbus-mqtt/examples/sungrow-sh5.0rs-http.json b/modbus-mqtt/examples/sungrow-sh5.0rs-http.json index 4bd6d81..82e60c6 100644 --- a/modbus-mqtt/examples/sungrow-sh5.0rs-http.json +++ b/modbus-mqtt/examples/sungrow-sh5.0rs-http.json @@ -7,7 +7,7 @@ "address": 13000, "type": "u16", "name": "running_state", - "period": "3s" + "period": "1s" }, { "address": 13022, diff --git a/modbus-mqtt/src/modbus/connection.rs b/modbus-mqtt/src/modbus/connection.rs index 7db6e41..30d3e92 100644 --- a/modbus-mqtt/src/modbus/connection.rs +++ b/modbus-mqtt/src/modbus/connection.rs @@ -4,8 +4,8 @@ use crate::mqtt::Scopable; use crate::Error; use rust_decimal::prelude::Zero; use serde::Deserialize; -use tokio::sync::oneshot; -use tokio::{select, sync::mpsc}; +use tokio::select; +use tokio::sync::{mpsc, oneshot, watch}; use tokio_modbus::client::{rtu, tcp, Context as ModbusClient}; use tracing::{debug, error, warn}; @@ -18,7 +18,9 @@ pub(crate) async fn run( mqtt: mqtt::Handle, shutdown: Shutdown, ) -> crate::Result { - let (handle_tx, handle_rx) = tokio::sync::oneshot::channel(); + let (connection_is_ready, mut is_connection_ready) = watch::channel(()); + let (mut tx, mut rx) = mpsc::channel(32); + let handle = Handle { tx: tx.clone() }; tokio::spawn(async move { // Can unwrap because if MQTT handler is bad, we have nothing to do here. @@ -26,37 +28,53 @@ pub(crate) async fn run( let address_offset = config.address_offset; - match config.settings.connect(config.unit).await { - Ok(client) => { - // Can unwrap because if MQTT handler is bad, we have nothing to do here. - mqtt.publish("state", "connected").await.unwrap(); + const MAX_WAIT: usize = 300; + const START_WAIT: usize = 2; + let mut current_wait = START_WAIT; - let (tx, rx) = mpsc::channel(32); + loop { + match config.settings.connect(config.unit).await { + Ok(client) => { + // Can unwrap because if MQTT handler is bad, we have nothing to do here. + mqtt.publish("state", "connected").await.unwrap(); - let conn = Connection { - address_offset, - client, - mqtt: mqtt.clone(), - shutdown: shutdown.clone(), // Important, so that we can publish "disconnected" below - rx, - tx, - }; + let mut conn = Connection { + address_offset, + client, + mqtt: mqtt.clone(), + shutdown: shutdown.clone(), // Important, so that we can publish "disconnected" below + rx, + tx, + }; - handle_tx.send(Ok(conn.handle())).unwrap(); + let _ = connection_is_ready.send(()); - if let Err(error) = conn.run().await { - error!(?error, "Modbus connection failed"); + if let Err(error) = conn.run().await { + error!(?error, "Modbus connection failed"); + tokio::time::sleep(std::time::Duration::from_secs(current_wait as u64)) + .await; + + match conn { + Connection { rx: r, tx: t, .. } => { + rx = r; + tx = t; + } + }; + current_wait = (current_wait * 2).clamp(START_WAIT, MAX_WAIT); + } else { + // we are shutting down here, so don't care if this fails + let send = mqtt.publish("state", "disconnected").await; + debug!(?config, ?send, "shutting down modbus connection"); + break; + } } - - // we are shutting down here, so don't care if this fails - let send = mqtt.publish("state", "disconnected").await; - debug!(?config, ?send, "shutting down modbus connection"); + Err(error) => error!(?error), } - Err(error) => handle_tx.send(Err(error)).unwrap(), } }); - handle_rx.await.map_err(|_| crate::Error::RecvError)? + is_connection_ready.changed().await; + Ok(handle) } struct Connection { @@ -123,12 +141,12 @@ enum Command { } impl Connection { - pub async fn run(mut self) -> crate::Result<()> { + pub async fn run(&mut self) -> crate::Result<()> { let mut registers_rx = register::subscribe(&self.mqtt).await?; loop { select! { - Some(cmd) = self.rx.recv() => { self.process_command(cmd).await; }, + Some(cmd) = self.rx.recv() => { self.process_command(cmd).await?; }, Some(register) = registers_rx.recv() => { debug!(?register); @@ -185,7 +203,7 @@ impl Connection { } } - async fn process_command(&mut self, cmd: Command) { + async fn process_command(&mut self, cmd: Command) -> crate::Result<()> { use tokio_modbus::prelude::Reader; let (tx, response) = match cmd { @@ -233,13 +251,15 @@ impl Connection { // Os { code: 36, kind: Uncategorized, message: "Operation now in progress" }' // Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" } // - if let Err(error) = &response { + if let Err(ref error) = response { + use std::io::ErrorKind; match error.kind() { - std::io::ErrorKind::UnexpectedEof => { + ErrorKind::UnexpectedEof | ErrorKind::InvalidData => { // THIS happening feels like a bug either in how I am using tokio_modbus or in tokio_modbus. It seems // like the underlying buffers get all messed up and restarting doesn't always fix it unless I wait a // few seconds. I might need to get help from someone to figure it out. error!(?error, "Connection error, may not be recoverable"); + return Err(response.unwrap_err().into()); } _ => error!(?error), } @@ -249,6 +269,8 @@ impl Connection { if let Err(response) = tx.send(response.map_err(Into::into)) { warn!(?response, "error sending response"); } + + Ok(()) } } diff --git a/modbus-mqtt/src/modbus/connector.rs b/modbus-mqtt/src/modbus/connector.rs index 58efd76..3827f2f 100644 --- a/modbus-mqtt/src/modbus/connector.rs +++ b/modbus-mqtt/src/modbus/connector.rs @@ -82,10 +82,6 @@ async fn parse_and_connect( Ok(()) } async fn connect(config: Config, mqtt: mqtt::Handle, shutdown: Shutdown) -> crate::Result<()> { - if shutdown.is_shutdown() { - return Ok(()); - } - #[allow(deprecated)] let Config { connection: settings, diff --git a/modbus-mqtt/src/modbus/register.rs b/modbus-mqtt/src/modbus/register.rs index 6caaaeb..c24a601 100644 --- a/modbus-mqtt/src/modbus/register.rs +++ b/modbus-mqtt/src/modbus/register.rs @@ -41,10 +41,14 @@ impl Monitor { let value = self.register.parse_words(&words); - self.mqtt + if let Err(error) = self + .mqtt .publish("state", serde_json::to_vec(&value).unwrap()) .await - .unwrap(); + { + warn!(?error); + break; + } } } });