1
0
Fork 0

Try to re-establish erroring Modbus connections

gh-action
Bo Jeanes 2022-09-10 08:09:45 +10:00
parent b2710949bf
commit 2ee2f597d0
4 changed files with 59 additions and 37 deletions

View File

@ -7,7 +7,7 @@
"address": 13000, "address": 13000,
"type": "u16", "type": "u16",
"name": "running_state", "name": "running_state",
"period": "3s" "period": "1s"
}, },
{ {
"address": 13022, "address": 13022,

View File

@ -4,8 +4,8 @@ use crate::mqtt::Scopable;
use crate::Error; use crate::Error;
use rust_decimal::prelude::Zero; use rust_decimal::prelude::Zero;
use serde::Deserialize; use serde::Deserialize;
use tokio::sync::oneshot; use tokio::select;
use tokio::{select, sync::mpsc}; use tokio::sync::{mpsc, oneshot, watch};
use tokio_modbus::client::{rtu, tcp, Context as ModbusClient}; use tokio_modbus::client::{rtu, tcp, Context as ModbusClient};
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
@ -18,7 +18,9 @@ pub(crate) async fn run(
mqtt: mqtt::Handle, mqtt: mqtt::Handle,
shutdown: Shutdown, shutdown: Shutdown,
) -> crate::Result<Handle> { ) -> crate::Result<Handle> {
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel(); let (connection_is_ready, mut is_connection_ready) = watch::channel(());
let (mut tx, mut rx) = mpsc::channel(32);
let handle = Handle { tx: tx.clone() };
tokio::spawn(async move { tokio::spawn(async move {
// Can unwrap because if MQTT handler is bad, we have nothing to do here. // Can unwrap because if MQTT handler is bad, we have nothing to do here.
@ -26,14 +28,17 @@ pub(crate) async fn run(
let address_offset = config.address_offset; let address_offset = config.address_offset;
const MAX_WAIT: usize = 300;
const START_WAIT: usize = 2;
let mut current_wait = START_WAIT;
loop {
match config.settings.connect(config.unit).await { match config.settings.connect(config.unit).await {
Ok(client) => { Ok(client) => {
// Can unwrap because if MQTT handler is bad, we have nothing to do here. // Can unwrap because if MQTT handler is bad, we have nothing to do here.
mqtt.publish("state", "connected").await.unwrap(); mqtt.publish("state", "connected").await.unwrap();
let (tx, rx) = mpsc::channel(32); let mut conn = Connection {
let conn = Connection {
address_offset, address_offset,
client, client,
mqtt: mqtt.clone(), mqtt: mqtt.clone(),
@ -42,21 +47,34 @@ pub(crate) async fn run(
tx, tx,
}; };
handle_tx.send(Ok(conn.handle())).unwrap(); let _ = connection_is_ready.send(());
if let Err(error) = conn.run().await { if let Err(error) = conn.run().await {
error!(?error, "Modbus connection failed"); error!(?error, "Modbus connection failed");
} tokio::time::sleep(std::time::Duration::from_secs(current_wait as u64))
.await;
match conn {
Connection { rx: r, tx: t, .. } => {
rx = r;
tx = t;
}
};
current_wait = (current_wait * 2).clamp(START_WAIT, MAX_WAIT);
} else {
// we are shutting down here, so don't care if this fails // we are shutting down here, so don't care if this fails
let send = mqtt.publish("state", "disconnected").await; let send = mqtt.publish("state", "disconnected").await;
debug!(?config, ?send, "shutting down modbus connection"); debug!(?config, ?send, "shutting down modbus connection");
break;
}
}
Err(error) => error!(?error),
} }
Err(error) => handle_tx.send(Err(error)).unwrap(),
} }
}); });
handle_rx.await.map_err(|_| crate::Error::RecvError)? is_connection_ready.changed().await;
Ok(handle)
} }
struct Connection { struct Connection {
@ -123,12 +141,12 @@ enum Command {
} }
impl Connection { impl Connection {
pub async fn run(mut self) -> crate::Result<()> { pub async fn run(&mut self) -> crate::Result<()> {
let mut registers_rx = register::subscribe(&self.mqtt).await?; let mut registers_rx = register::subscribe(&self.mqtt).await?;
loop { loop {
select! { select! {
Some(cmd) = self.rx.recv() => { self.process_command(cmd).await; }, Some(cmd) = self.rx.recv() => { self.process_command(cmd).await?; },
Some(register) = registers_rx.recv() => { Some(register) = registers_rx.recv() => {
debug!(?register); debug!(?register);
@ -185,7 +203,7 @@ impl Connection {
} }
} }
async fn process_command(&mut self, cmd: Command) { async fn process_command(&mut self, cmd: Command) -> crate::Result<()> {
use tokio_modbus::prelude::Reader; use tokio_modbus::prelude::Reader;
let (tx, response) = match cmd { let (tx, response) = match cmd {
@ -233,13 +251,15 @@ impl Connection {
// Os { code: 36, kind: Uncategorized, message: "Operation now in progress" }' // Os { code: 36, kind: Uncategorized, message: "Operation now in progress" }'
// Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" } // Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" }
// //
if let Err(error) = &response { if let Err(ref error) = response {
use std::io::ErrorKind;
match error.kind() { match error.kind() {
std::io::ErrorKind::UnexpectedEof => { ErrorKind::UnexpectedEof | ErrorKind::InvalidData => {
// THIS happening feels like a bug either in how I am using tokio_modbus or in tokio_modbus. It seems // THIS happening feels like a bug either in how I am using tokio_modbus or in tokio_modbus. It seems
// like the underlying buffers get all messed up and restarting doesn't always fix it unless I wait a // like the underlying buffers get all messed up and restarting doesn't always fix it unless I wait a
// few seconds. I might need to get help from someone to figure it out. // few seconds. I might need to get help from someone to figure it out.
error!(?error, "Connection error, may not be recoverable"); error!(?error, "Connection error, may not be recoverable");
return Err(response.unwrap_err().into());
} }
_ => error!(?error), _ => error!(?error),
} }
@ -249,6 +269,8 @@ impl Connection {
if let Err(response) = tx.send(response.map_err(Into::into)) { if let Err(response) = tx.send(response.map_err(Into::into)) {
warn!(?response, "error sending response"); warn!(?response, "error sending response");
} }
Ok(())
} }
} }

View File

@ -82,10 +82,6 @@ async fn parse_and_connect(
Ok(()) Ok(())
} }
async fn connect(config: Config, mqtt: mqtt::Handle, shutdown: Shutdown) -> crate::Result<()> { async fn connect(config: Config, mqtt: mqtt::Handle, shutdown: Shutdown) -> crate::Result<()> {
if shutdown.is_shutdown() {
return Ok(());
}
#[allow(deprecated)] #[allow(deprecated)]
let Config { let Config {
connection: settings, connection: settings,

View File

@ -41,10 +41,14 @@ impl Monitor {
let value = self.register.parse_words(&words); let value = self.register.parse_words(&words);
self.mqtt if let Err(error) = self
.mqtt
.publish("state", serde_json::to_vec(&value).unwrap()) .publish("state", serde_json::to_vec(&value).unwrap())
.await .await
.unwrap(); {
warn!(?error);
break;
}
} }
} }
}); });