From 34acb69ee11e826fc0edc819ee1a98846e1bff61 Mon Sep 17 00:00:00 2001 From: Bo Jeanes Date: Thu, 8 Sep 2022 17:52:49 +1000 Subject: [PATCH] wip --- Cargo.lock | 2 +- modbus-mqtt/Cargo.toml | 2 +- modbus-mqtt/src/lib.rs | 251 --------------------------- modbus-mqtt/src/modbus/connection.rs | 177 ++++++++++++++++++- modbus-mqtt/src/modbus/connector.rs | 8 +- modbus-mqtt/src/mqtt.rs | 3 + 6 files changed, 180 insertions(+), 263 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 385c40c..8741c5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -766,7 +766,7 @@ dependencies = [ [[package]] name = "modbus-mqtt" -version = "0.1.0" +version = "0.2.0" dependencies = [ "bytes", "clap", diff --git a/modbus-mqtt/Cargo.toml b/modbus-mqtt/Cargo.toml index 01441d7..f69dacf 100644 --- a/modbus-mqtt/Cargo.toml +++ b/modbus-mqtt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "modbus-mqtt" -version = "0.1.0" +version = "0.2.0" edition = "2021" authors = ["Bo Jeanes "] description = "A bridge between Modbus devices and MQTT" diff --git a/modbus-mqtt/src/lib.rs b/modbus-mqtt/src/lib.rs index 98cc0c3..588c009 100644 --- a/modbus-mqtt/src/lib.rs +++ b/modbus-mqtt/src/lib.rs @@ -61,255 +61,8 @@ impl From<&'static str> for Error { pub type Result = std::result::Result; -// #[derive(Debug)] -// enum DispatchCommand { -// Publish { topic: String, payload: Vec }, -// } -// #[tracing::instrument(level = "debug")] -// async fn mqtt_dispatcher( -// mut options: MqttOptions, -// prefix: String, -// registry: mpsc::Sender, -// mut rx: mpsc::Receiver, -// ) { -// info!("Connecting to MQTT broker..."); - -// options.set_last_will(LastWill { -// topic: format!("{}/status", prefix), -// message: serde_json::to_vec(&json!({ -// "status": MainStatus::Stopped, -// })) -// .unwrap() -// .into(), -// qos: QoS::AtMostOnce, -// retain: false, -// }); - -// let (client, mut eventloop) = AsyncClient::new(options, 10); - -// client -// .publish( -// format!("{}/status", prefix), -// QoS::AtMostOnce, -// false, -// serde_json::to_vec(&json!({ -// "status": MainStatus::Running, -// })) -// .unwrap(), -// ) -// .await -// .unwrap(); - -// client -// .subscribe(format!("{}/connect/#", prefix), QoS::AtMostOnce) -// .await -// .unwrap(); - -// let rx_loop_handler = { -// let client = client.clone(); -// tokio::spawn(async move { -// info!("Start dispatcher rx loop"); -// while let Some(command) = rx.recv().await { -// match command { -// DispatchCommand::Publish { topic, payload } => { -// client -// .publish(topic, QoS::AtMostOnce, false, payload) -// .await -// .unwrap(); -// } -// } -// } -// }) -// }; - -// while let Ok(event) = eventloop.poll().await { -// use Event::{Incoming as In, Outgoing as Out}; - -// match event { -// Out(_) => (), -// In(Incoming::ConnAck(_)) => info!("Connected to MQTT!"), -// In(Incoming::PingResp | Incoming::SubAck(_)) => (), - -// In(Incoming::Publish(Publish { topic, payload, .. })) => { -// debug!("{} -> {:?}", &topic, &payload); - -// match topic.split('/').collect::>()[..] { -// [p, "connect", conn_name] if p == prefix.as_str() => { -// registry -// .send(RegistryCommand::Connect { -// id: conn_name.to_string(), -// details: payload, -// }) -// .await -// .unwrap(); -// } -// _ => (), -// }; -// } -// _ => { -// debug!("{:?}", event); -// } -// } -// } - -// rx_loop_handler.await.unwrap(); -// } - -// type ConnectionId = String; - -// #[derive(Debug)] -// enum RegistryCommand { -// Connect { -// id: ConnectionId, -// details: bytes::Bytes, -// }, -// Disconnect(ConnectionId), -// } - -// type RegistryDb = HashMap>; - -// #[tracing::instrument(level = "debug")] -// async fn connection_registry( -// prefix: String, -// dispatcher: mpsc::Sender, -// mut rx: mpsc::Receiver, -// ) { -// info!("Starting connection registry..."); -// let mut db: RegistryDb = HashMap::new(); - -// while let Some(command) = rx.recv().await { -// use RegistryCommand::*; -// match command { -// Disconnect(id) => { -// if let Some(handle) = db.remove(&id) { -// handle.abort(); -// } -// } -// Connect { id, details } => { -// info!(id, payload = ?details, "Establishing connection"); -// let prefix = prefix.clone(); -// let dispatcher = dispatcher.clone(); - -// if let Some(handle) = db.remove(&id) { -// handle.abort(); -// } - -// db.insert( -// id.clone(), -// tokio::spawn(handle_connect(dispatcher, id, prefix, details)), -// ); -// } -// _ => error!("unimplemented"), -// } -// } -// } - -// #[derive(Clone, Copy, Debug)] -// enum ModbusReadType { -// Input, -// Hold, -// } - -// #[derive(Debug)] -// enum ModbusCommand { -// Read(ModbusReadType, u16, u8, ModbusResponse), -// Write(u16, Vec, ModbusResponse), -// } - -// type ModbusResponse = oneshot::Sender>>; - -// #[tracing::instrument(level = "debug")] -// async fn handle_connect( -// dispatcher: mpsc::Sender, -// id: ConnectionId, -// topic_prefix: String, -// payload: bytes::Bytes, -// ) { -// use modbus::config::*; -// use modbus::ConnectState; -// info!("Starting connection handler for {}", id); -// match serde_json::from_slice::(&payload) { -// Ok(connect) => { -// let unit = connect.unit; - -// let mut modbus: tokio_modbus::client::Context = match connect.settings { -// #[cfg(feature = "winet-s")] -// ModbusProto::SungrowWiNetS { ref host } => { -// tokio_modbus_winets::connect_slave(host, unit) -// .await -// .unwrap() -// } -// #[cfg(feature = "tcp")] -// ModbusProto::Tcp { ref host, port } => { -// let socket_addr = format!("{}:{}", host, port).parse().unwrap(); -// tcp::connect_slave(socket_addr, unit).await.unwrap() -// } -// #[cfg(feature = "rtu")] -// ModbusProto::Rtu { -// ref tty, -// baud_rate, -// data_bits, -// stop_bits, -// flow_control, -// parity, -// } => { -// let builder = tokio_serial::new(tty, baud_rate) -// .data_bits(data_bits) -// .flow_control(flow_control) -// .parity(parity) -// .stop_bits(stop_bits); -// let port = tokio_serial::SerialStream::open(&builder).unwrap(); -// rtu::connect_slave(port, unit).await.unwrap() -// } -// ModbusProto::Unknown => { -// error!("Unrecognised protocol"); -// return; -// } -// }; -// let status = modbus::ConnectStatus { -// connect: connect.clone(), -// status: ConnectState::Connected, -// }; -// dispatcher -// .send(DispatchCommand::Publish { -// topic: format!("{}/status/{}", topic_prefix, id), -// payload: serde_json::to_vec(&status).unwrap(), -// }) -// .await -// .unwrap(); - -// let (modbus_tx, mut modbus_rx) = mpsc::channel::(32); // tokio::spawn(async move { // while let Some(command) = modbus_rx.recv().await { -// match command { -// ModbusCommand::Read(read_type, address, count, responder) => { -// let response = match read_type { -// ModbusReadType::Input => { -// modbus.read_input_registers(address, count as u16) -// } -// ModbusReadType::Hold => { -// modbus.read_holding_registers(address, count as u16) -// } -// }; - -// responder.send(response.await.map_err(Into::into)).unwrap(); -// } -// ModbusCommand::Write(address, data, responder) => { -// responder -// .send( -// modbus -// .read_write_multiple_registers( -// address, -// data.len() as u16, -// address, -// &data[..], -// ) -// .await -// .map_err(Into::into), -// ) -// .unwrap(); -// } -// } // } // }); @@ -398,10 +151,6 @@ pub type Result = std::result::Result; // // FIXME: definitely getting errors here that need to be handled // // -// // thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Error { kind: UnexpectedEof, message: "failed to fill whole buffer" }' -// // thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: InvalidData, error: "Invalid data length: 0" }' -// // thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 36, kind: Uncategorized, message: "Operation now in progress" }' -// // thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" } // // // // Splitting out the two awaits so I can see if all of the above panics come from the same await or some from one vs the other: // let response = rx.await.unwrap(); // await may have errorer on receiving diff --git a/modbus-mqtt/src/modbus/connection.rs b/modbus-mqtt/src/modbus/connection.rs index 5a52f0f..1561c69 100644 --- a/modbus-mqtt/src/modbus/connection.rs +++ b/modbus-mqtt/src/modbus/connection.rs @@ -1,9 +1,11 @@ use crate::modbus::{self}; use crate::Error; +use rust_decimal::prelude::Zero; use serde::Deserialize; +use tokio::sync::oneshot; use tokio::{select, sync::mpsc}; use tokio_modbus::client::{rtu, tcp, Context as ModbusClient}; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::{mqtt, shutdown::Shutdown}; @@ -18,25 +20,32 @@ pub(crate) async fn run( // Can unwrap because if MQTT handler is bad, we have nothing to do here. mqtt.publish("state", "connecting").await.unwrap(); + let address_offset = config.address_offset; + match config.settings.connect(config.unit).await { Ok(client) => { // Can unwrap because if MQTT handler is bad, we have nothing to do here. mqtt.publish("state", "connected").await.unwrap(); - // Create handle and send to caller let (tx, rx) = mpsc::channel(32); - handle_tx.send(Ok(Handle { tx })).unwrap(); let conn = Connection { + address_offset, client, - mqtt, + mqtt: mqtt.clone(), shutdown, rx, + tx, }; + handle_tx.send(Ok(conn.handle())).unwrap(); + if let Err(error) = conn.run().await { error!(?error, "Modbus connection failed"); } + + // we are shutting down here, so don't care if this fails + let _ = mqtt.publish("state", "disconnected").await; } Err(error) => handle_tx.send(Err(error.into())).unwrap(), } @@ -47,30 +56,182 @@ pub(crate) async fn run( struct Connection { client: ModbusClient, + address_offset: i8, mqtt: mqtt::Handle, shutdown: Shutdown, - rx: mpsc::Receiver, + rx: mpsc::Receiver, + tx: mpsc::Sender, } #[derive(Debug)] pub struct Handle { - tx: mpsc::Sender, + tx: mpsc::Sender, +} + +impl Handle { + pub async fn write_register(&self, address: u16, data: Vec) -> crate::Result> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(Command::Write(address, data, tx)) + .await + .map_err(|_| Error::SendError)?; + rx.await.map_err(|_| Error::RecvError)? + } + pub async fn read_input_register( + &self, + address: u16, + quantity: u8, + ) -> crate::Result> { + self.read_register(ReadType::Input, address, quantity).await + } + pub async fn read_holding_register( + &self, + address: u16, + quantity: u8, + ) -> crate::Result> { + self.read_register(ReadType::Holding, address, quantity) + .await + } + + async fn read_register( + &self, + reg_type: ReadType, + address: u16, + quantity: u8, + ) -> crate::Result> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(Command::Read(reg_type, address, quantity, tx)) + .await + .map_err(|_| Error::SendError)?; + rx.await.map_err(|_| Error::RecvError)? + } +} + +type Word = u16; +type Response = oneshot::Sender>>; + +#[derive(Clone, Copy, Debug)] +enum ReadType { + Input, + Holding, } #[derive(Debug)] -enum Message {} +enum Command { + Read(ReadType, u16, u8, Response), + Write(u16, Vec, Response), +} impl Connection { pub async fn run(mut self) -> crate::Result<()> { + let mut input_registers = self.mqtt.subscribe("input/+").await?; + let mut holding_registers = self.mqtt.subscribe("holding/+").await?; + + // TODO: if we get a new register definition for an existing register, how do we avoid redundant (and possibly + // conflicting) tasks? Should MQTT component only allow one subscriber per topic filter, replacing the old one + // when it gets a new subscribe request? + loop { select! { - Some(msg) = self.rx.recv() => { debug!(?msg); }, + Some(reg) = input_registers.recv() => {}, + Some(reg) = holding_registers.recv() => {}, + Some(cmd) = self.rx.recv() => { self.process_command(cmd).await; }, _ = self.shutdown.recv() => { return Ok(()); } } } } + + fn handle(&self) -> Handle { + Handle { + tx: self.tx.clone(), + } + } + + /// Apply address offset to address. + /// + /// Panics if offset would overflow or underflow the address. + fn adjust_address(&self, address: u16) -> u16 { + if self.address_offset.is_zero() { + return address; + } + + // TODO: use `checked_add_signed()` once stabilised: https://doc.rust-lang.org/std/primitive.u16.html#method.checked_add_signed + let adjusted_address = if self.address_offset >= 0 { + address.checked_add(self.address_offset as u16) + } else { + address.checked_sub(self.address_offset.unsigned_abs() as u16) + }; + + if let Some(address) = adjusted_address { + address + } else { + error!(address, offset = self.address_offset,); + address + // panic!("Address offset would underflow/overflow") + } + } + + #[tracing::instrument(skip(self))] + async fn process_command(&mut self, cmd: Command) { + use tokio_modbus::prelude::Reader; + + let (tx, response) = match cmd { + Command::Read(ReadType::Input, address, count, tx) => { + let address = self.adjust_address(address); + ( + tx, + self.client + .read_input_registers(address, count as u16) + .await, + ) + } + Command::Read(ReadType::Holding, address, count, tx) => { + let address = self.adjust_address(address); + ( + tx, + self.client + .read_holding_registers(address, count as u16) + .await, + ) + } + Command::Write(address, data, tx) => { + let address = self.adjust_address(address); + ( + tx, + self.client + .read_write_multiple_registers( + address, + data.len() as u16, + address, + &data[..], + ) + .await, + ) + } + }; + + // This might be transient, so don't kill connection. We may be able to discriminate on the error to determine + // which errors are transient and which are conclusive. + // + // Some errors that we have observed: + // + // Error { kind: UnexpectedEof, message: "failed to fill whole buffer" }' + // Custom { kind: InvalidData, error: "Invalid data length: 0" }' + // Os { code: 36, kind: Uncategorized, message: "Operation now in progress" }' + // Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" } + // + if let Err(error) = &response { + warn!(?error, "modbus command error"); + } + + // This probably just means that the register task died or is no longer monitoring the response. + if let Err(response) = tx.send(response.map_err(Into::into)) { + warn!(?response, "error sending response"); + } + } } #[derive(Debug, Deserialize)] diff --git a/modbus-mqtt/src/modbus/connector.rs b/modbus-mqtt/src/modbus/connector.rs index 112fda4..83c789f 100644 --- a/modbus-mqtt/src/modbus/connector.rs +++ b/modbus-mqtt/src/modbus/connector.rs @@ -6,6 +6,10 @@ use serde_json::value::RawValue as RawJSON; use tokio::select; use tracing::{debug, error, info}; +/* +NOTE: Should this be a connection _registry_ of sorts which also restarts connections which die? +*/ + /// The topic filter under the prefix to look for connection configs const TOPIC: &str = "+/connect"; @@ -88,10 +92,10 @@ async fn connect(config: Config<'_>, mqtt: mqtt::Handle, shutdown: Shutdown) -> holding, } = config; - mqtt.publish("state", "connecting").await?; - let connection_handler = connection::run(settings, mqtt.clone(), shutdown).await?; + // TODO: consider waiting 1 second before sending the registers to MQTT, to ensure that the connection is listening. + for reg in input { let mqtt = mqtt.scoped("input"); if let Ok(r) = serde_json::from_slice::(reg.get().as_bytes()) { diff --git a/modbus-mqtt/src/mqtt.rs b/modbus-mqtt/src/mqtt.rs index 6391e20..8807758 100644 --- a/modbus-mqtt/src/mqtt.rs +++ b/modbus-mqtt/src/mqtt.rs @@ -164,6 +164,9 @@ impl Connection { for filter in &filters { let channel = channel.clone(); + // NOTE: Curently allows multiple components to watch the same topic filter, but if there is no need + // for this, it might make more sense to have it _replace_ the channel, so that old (stale) + // components automatically finish running. match self.subscriptions.get_mut(&filter.path) { Some(channels) => channels.push(channel), None => {