diff --git a/modbus-mqtt/examples/sungrow-sh5.0rs-http.json b/modbus-mqtt/examples/sungrow-sh5.0rs-http.json new file mode 100644 index 0000000..2461328 --- /dev/null +++ b/modbus-mqtt/examples/sungrow-sh5.0rs-http.json @@ -0,0 +1,19 @@ +{ + "host": "10.10.10.219", + "unit": 1, + "proto": "winet-s", + "input": [ + { + "address": 13000, + "type": "u16", + "name": "running_state", + "period": "3s" + }, + { + "address": 13022, + "type": "s16", + "name": "battery_power", + "period": "3s" + } + ] +} \ No newline at end of file diff --git a/modbus-mqtt/examples/sungrow-sh5.0rs.json b/modbus-mqtt/examples/sungrow-sh5.0rs.json index a8e5bb4..dc53062 100644 --- a/modbus-mqtt/examples/sungrow-sh5.0rs.json +++ b/modbus-mqtt/examples/sungrow-sh5.0rs.json @@ -8,43 +8,84 @@ "address": 5017, "type": "u32", "name": "dc_power", - "swap_words": false, - "period": "3s" + "swap_words": true, + "period": "1s" + }, + { + "address": 13034, + "type": "u32", + "name": "active_power", + "swap_words": true, + "period": "1s" }, { "address": 5008, "type": "s16", "name": "internal_temperature", - "period": "1m" + "period": "1m", + "scale": -1 }, { "address": 13008, "type": "s32", "name": "load_power", - "swap_words": false, - "period": "3s" + "swap_words": true, + "period": "1s" }, { "address": 13010, "type": "s32", "name": "export_power", - "swap_words": false, - "period": "3s" + "swap_words": true, + "period": "1s" }, { "address": 13022, "name": "battery_power", - "period": "3s" + "period": "1s" }, { "address": 13023, "name": "battery_level", - "period": "1m" + "period": "1m", + "scale": -1 }, { "address": 13024, "name": "battery_health", - "period": "10m" + "period": "10m", + "scale": -1 + }, + { + "address": 5036, + "name": "grid_frequency", + "period": "1m" + }, + { + "address": 5019, + "name": "phase_a_voltage", + "period": "1m" + }, + { + "address": 13031, + "name": "phase_a_current", + "period": "1m" + }, + { + "address": 5011, + "name": "mppt1_voltage" + }, + { + "address": 5012, + "name": "mppt1_current" + }, + { + "address": 5012, + "name": "mppt2_voltage" + }, + { + "address": 5013, + "name": "mppt2_current" } ], "holding": [ @@ -57,6 +98,15 @@ "address": 13059, "name": "min_soc", "period": "90s" + }, + { + "address": 13100, + "name": "battery_reserve" + }, + { + "address": 33148, + "name": "forced_battery_power", + "scale": 1 } ] } \ No newline at end of file diff --git a/modbus-mqtt/src/bin/run.rs b/modbus-mqtt/src/bin/run.rs index cc22f79..8f7fd74 100644 --- a/modbus-mqtt/src/bin/run.rs +++ b/modbus-mqtt/src/bin/run.rs @@ -1,6 +1,7 @@ use clap::Parser; use modbus_mqtt::{server, Result}; use rumqttc::MqttOptions; +use tokio::select; use url::Url; #[derive(Parser, Debug)] @@ -50,7 +51,30 @@ async fn main() -> Result<()> { prefix = options.client_id(); } - server::run(prefix, options, tokio::signal::ctrl_c()).await?; + let shutdown = async move { + let ctrl_c = tokio::signal::ctrl_c(); + + #[cfg(unix)] + { + use tokio::signal::unix::{signal, SignalKind}; + + let mut term = signal(SignalKind::terminate()).unwrap(); + let mut int = signal(SignalKind::interrupt()).unwrap(); + let mut hup = signal(SignalKind::hangup()).unwrap(); + + select! { + _ = ctrl_c => {}, + _ = term.recv() => {}, + _ = int.recv() => {}, + _ = hup.recv() => {}, + } + } + + #[cfg(not(unix))] + ctrl_c.await; + }; + + server::run(prefix, options, shutdown).await?; Ok(()) } diff --git a/modbus-mqtt/src/error.rs b/modbus-mqtt/src/error.rs new file mode 100644 index 0000000..372435b --- /dev/null +++ b/modbus-mqtt/src/error.rs @@ -0,0 +1,55 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum Error { + #[error(transparent)] + IOError(#[from] std::io::Error), + + #[error(transparent)] + MQTTOptionError(#[from] rumqttc::OptionError), + + #[error(transparent)] + MQTTClientError(#[from] rumqttc::ClientError), + + #[error(transparent)] + MQTTConnectionError(#[from] rumqttc::ConnectionError), + + #[error(transparent)] + InvalidSocketAddr(#[from] std::net::AddrParseError), + + #[error(transparent)] + SerialError(#[from] tokio_serial::Error), + + #[error(transparent)] + ParseIntError(#[from] std::num::ParseIntError), + + #[error(transparent)] + JSONError(#[from] serde_json::Error), + + #[error("RecvError")] + RecvError, + + #[error("SendError")] + SendError, + + #[error("Unrecognised modbus protocol")] + UnrecognisedModbusProtocol, + + #[error("{0}")] + Other(std::borrow::Cow<'static, str>), + + #[error("Unknown")] + Unknown, +} + +impl From for Error { + fn from(s: String) -> Self { + Self::Other(s.into()) + } +} +impl From<&'static str> for Error { + fn from(s: &'static str) -> Self { + Self::Other(s.into()) + } +} diff --git a/modbus-mqtt/src/homeassistant.rs b/modbus-mqtt/src/homeassistant.rs index 583a715..e69de29 100644 --- a/modbus-mqtt/src/homeassistant.rs +++ b/modbus-mqtt/src/homeassistant.rs @@ -1,8 +0,0 @@ -use tokio::sync::mpsc::Sender; - -use crate::{modbus::register::Register, mqtt}; - -/// Describes the register to Home Assistant -fn configure(_register: Register, _tx: Sender) -> crate::Result<()> { - Ok(()) -} diff --git a/modbus-mqtt/src/lib.rs b/modbus-mqtt/src/lib.rs index 588c009..cc3c6a9 100644 --- a/modbus-mqtt/src/lib.rs +++ b/modbus-mqtt/src/lib.rs @@ -1,147 +1,17 @@ -use rumqttc::{self}; - -use tracing::error; - -use thiserror::Error; - mod shutdown; -pub mod homeassistant; pub mod modbus; pub mod mqtt; pub mod server; -#[derive(Error, Debug)] -#[non_exhaustive] -pub enum Error { - #[error(transparent)] - IOError(#[from] std::io::Error), +//TODO: +// pub mod homeassistant; - #[error(transparent)] - MQTTOptionError(#[from] rumqttc::OptionError), - - #[error(transparent)] - MQTTClientError(#[from] rumqttc::ClientError), - - #[error(transparent)] - MQTTConnectionError(#[from] rumqttc::ConnectionError), - - #[error(transparent)] - InvalidSocketAddr(#[from] std::net::AddrParseError), - - #[error(transparent)] - SerialError(#[from] tokio_serial::Error), - - #[error("RecvError")] - RecvError, - - #[error("SendError")] - SendError, - - #[error("Unrecognised modbus protocol")] - UnrecognisedModbusProtocol, - - #[error("{0}")] - Other(std::borrow::Cow<'static, str>), - - #[error("Unknown")] - Unknown, -} - -impl From for Error { - fn from(s: String) -> Self { - Self::Other(s.into()) - } -} -impl From<&'static str> for Error { - fn from(s: &'static str) -> Self { - Self::Other(s.into()) - } -} +mod error; +pub use error::Error; pub type Result = std::result::Result; -// tokio::spawn(async move { -// while let Some(command) = modbus_rx.recv().await { -// } -// }); - -// use itertools::Itertools; -// for (duration, registers) in &connect.input.into_iter().group_by(|r| r.interval) { -// let registers_prefix = format!("{}/input/{}", topic_prefix, id); - -// tokio::spawn(watch_registers( -// ModbusReadType::Input, -// connect.address_offset, -// duration, -// registers.collect(), -// modbus_tx.clone(), -// dispatcher.clone(), -// registers_prefix, -// )); -// } -// for (duration, registers) in &connect.hold.into_iter().group_by(|r| r.interval) { -// let registers_prefix = format!("{}/hold/{}", topic_prefix, id); - -// tokio::spawn(watch_registers( -// ModbusReadType::Hold, -// connect.address_offset, -// duration, -// registers.collect(), -// modbus_tx.clone(), -// dispatcher.clone(), -// registers_prefix, -// )); -// } -// } -// Err(err) => { -// dispatcher -// .send(DispatchCommand::Publish { -// topic: format!("{}/status/{}", topic_prefix, id), -// payload: serde_json::to_vec(&json!({ -// "status": ConnectState::Errored, -// "error": format!("Invalid config: {}", err), -// })) -// .unwrap(), -// }) -// .await -// .unwrap(); -// } -// } -// } - -// #[tracing::instrument(level = "debug")] -// async fn watch_registers( -// read_type: ModbusReadType, -// address_offset: i8, -// duration: Duration, -// registers: Vec, -// modbus: mpsc::Sender, -// dispatcher: mpsc::Sender, -// registers_prefix: String, -// ) -> ! { -// let mut interval = tokio::time::interval(duration); -// interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - -// loop { -// interval.tick().await; -// for r in registers.iter() { -// let address = if address_offset >= 0 { -// r.address.checked_add(address_offset as u16) -// } else { -// r.address.checked_sub(address_offset.unsigned_abs() as u16) -// }; -// if let Some(address) = address { -// let size = r.parse.value_type.size(); -// debug!( -// name = r.name.as_ref().unwrap_or(&"".to_string()), -// address, -// size, -// register_type = ?read_type, -// value_type = r.parse.value_type.type_name(), -// "Polling register", -// ); - // let (tx, rx) = oneshot::channel(); // modbus diff --git a/modbus-mqtt/src/modbus/connection.rs b/modbus-mqtt/src/modbus/connection.rs index f67d36e..777e675 100644 --- a/modbus-mqtt/src/modbus/connection.rs +++ b/modbus-mqtt/src/modbus/connection.rs @@ -1,4 +1,6 @@ -use crate::modbus::{self}; +use super::Word; +use crate::modbus::{self, register}; +use crate::mqtt::Scopable; use crate::Error; use rust_decimal::prelude::Zero; use serde::Deserialize; @@ -9,6 +11,8 @@ use tracing::{debug, error, warn}; use crate::{mqtt, shutdown::Shutdown}; +use super::register::RegisterType; + pub(crate) async fn run( config: Config, mqtt: mqtt::Handle, @@ -83,20 +87,21 @@ impl Handle { address: u16, quantity: u8, ) -> crate::Result> { - self.read_register(ReadType::Input, address, quantity).await + self.read_register(RegisterType::Input, address, quantity) + .await } pub async fn read_holding_register( &self, address: u16, quantity: u8, ) -> crate::Result> { - self.read_register(ReadType::Holding, address, quantity) + self.read_register(RegisterType::Holding, address, quantity) .await } async fn read_register( &self, - reg_type: ReadType, + reg_type: RegisterType, address: u16, quantity: u8, ) -> crate::Result> { @@ -109,35 +114,45 @@ impl Handle { } } -type Word = u16; type Response = oneshot::Sender>>; -#[derive(Clone, Copy, Debug)] -enum ReadType { - Input, - Holding, -} - #[derive(Debug)] enum Command { - Read(ReadType, u16, u8, Response), + Read(RegisterType, u16, u8, Response), Write(u16, Vec, Response), } impl Connection { pub async fn run(mut self) -> crate::Result<()> { - let mut input_registers = self.mqtt.subscribe("input/+").await?; - let mut holding_registers = self.mqtt.subscribe("holding/+").await?; - - // TODO: if we get a new register definition for an existing register, how do we avoid redundant (and possibly - // conflicting) tasks? Should MQTT component only allow one subscriber per topic filter, replacing the old one - // when it gets a new subscribe request? + let mut registers_rx = register::subscribe(&self.mqtt).await?; loop { select! { - Some(reg) = input_registers.recv() => {}, - Some(reg) = holding_registers.recv() => {}, Some(cmd) = self.rx.recv() => { self.process_command(cmd).await; }, + + Some((reg_type, reg)) = registers_rx.recv() => { + debug!(?reg_type, ?reg); + let scope = format!( + "{}/{}", + match ®_type { + RegisterType::Input => "input", + RegisterType::Holding => "holding", + }, + reg.address + ); + let mqtt = self.mqtt.scoped(scope); + let modbus = self.handle(); + register::Monitor::new( + reg.register, + reg_type, + reg.address, + mqtt, + modbus, + ) + .run() + .await; + }, + _ = self.shutdown.recv() => { return Ok(()); } @@ -151,6 +166,11 @@ impl Connection { } } + // TODO: if we get a new register definition for an existing register, how do we avoid redundant (and possibly + // conflicting) tasks? Should MQTT component only allow one subscriber per topic filter, replacing the old one + // when it gets a new subscribe request? + // IDEA: Allow providing a subscription ID which _replaces_ any existing subscription with the same ID + /// Apply address offset to address. /// /// Panics if offset would overflow or underflow the address. @@ -175,12 +195,11 @@ impl Connection { } } - #[tracing::instrument(skip(self))] async fn process_command(&mut self, cmd: Command) { use tokio_modbus::prelude::Reader; let (tx, response) = match cmd { - Command::Read(ReadType::Input, address, count, tx) => { + Command::Read(RegisterType::Input, address, count, tx) => { let address = self.adjust_address(address); ( tx, @@ -189,7 +208,7 @@ impl Connection { .await, ) } - Command::Read(ReadType::Holding, address, count, tx) => { + Command::Read(RegisterType::Holding, address, count, tx) => { let address = self.adjust_address(address); ( tx, @@ -224,8 +243,17 @@ impl Connection { // Os { code: 36, kind: Uncategorized, message: "Operation now in progress" }' // Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" } // - if let Err(error) = &response { - warn!(?error, "modbus command error"); + match &response { + Err(error) => match error.kind() { + std::io::ErrorKind::UnexpectedEof => { + // THIS happening feels like a bug either in how I am using tokio_modbus or in tokio_modbus. It seems + // like the underlying buffers get all messed up and restarting doesn't always fix it unless I wait a + // few seconds. I might need to get help from someone to figure it out. + error!(?error, "Connection error, may not be recoverable"); + } + _ => error!(?error), + }, + _ => {} } // This probably just means that the register task died or is no longer monitoring the response. diff --git a/modbus-mqtt/src/modbus/connector.rs b/modbus-mqtt/src/modbus/connector.rs index 83c789f..712c71f 100644 --- a/modbus-mqtt/src/modbus/connector.rs +++ b/modbus-mqtt/src/modbus/connector.rs @@ -92,24 +92,24 @@ async fn connect(config: Config<'_>, mqtt: mqtt::Handle, shutdown: Shutdown) -> holding, } = config; - let connection_handler = connection::run(settings, mqtt.clone(), shutdown).await?; + let _ = connection::run(settings, mqtt.clone(), shutdown).await?; // TODO: consider waiting 1 second before sending the registers to MQTT, to ensure that the connection is listening. - for reg in input { - let mqtt = mqtt.scoped("input"); - if let Ok(r) = serde_json::from_slice::(reg.get().as_bytes()) { - debug!(?r); - let bytes: bytes::Bytes = reg.get().as_bytes().to_owned().into(); - mqtt.publish(r.address.to_string(), bytes).await?; - } - } - for reg in holding { - let mqtt = mqtt.scoped("holding"); - if let Ok(r) = serde_json::from_slice::(reg.get().as_bytes()) { - debug!(?r); - let bytes: bytes::Bytes = reg.get().as_bytes().to_owned().into(); - mqtt.publish(r.address.to_string(), bytes).await?; + for (reg_type, registers) in [("holding", holding), ("input", input)] { + let mqtt = mqtt.scoped(reg_type); + for reg in registers { + if let Ok(r) = + serde_json::from_slice::(reg.get().as_bytes()) + { + let json = serde_json::to_vec(&r.register).unwrap(); // unwrap() should be fine because we JUST deserialized it successfully + mqtt.publish(r.address.to_string(), json).await?; + // if let Some(name) = r.register.name { + // r.register.name = None; + // let json = serde_json::to_vec(&r).unwrap(); // unwrap() should be fine because we JUST deserialized it successfully + // mqtt.publish(name, json).await?; + // } + } } } diff --git a/modbus-mqtt/src/modbus/mod.rs b/modbus-mqtt/src/modbus/mod.rs index 624b212..7d7e8c7 100644 --- a/modbus-mqtt/src/modbus/mod.rs +++ b/modbus-mqtt/src/modbus/mod.rs @@ -1,19 +1,10 @@ -use rust_decimal::{prelude::FromPrimitive, Decimal}; -use serde::Serialize; - -use self::register::{Register, RegisterValueType}; - pub mod connection; pub mod connector; pub mod register; -#[derive(Serialize)] -#[serde(rename_all = "lowercase")] -pub enum ConnectState { - Connected, - Disconnected, - Errored, -} +pub use connection::Handle; + +type Word = u16; pub type UnitId = tokio_modbus::prelude::SlaveId; pub type Unit = tokio_modbus::prelude::Slave; diff --git a/modbus-mqtt/src/modbus/register.rs b/modbus-mqtt/src/modbus/register.rs index 738f102..c53b5b5 100644 --- a/modbus-mqtt/src/modbus/register.rs +++ b/modbus-mqtt/src/modbus/register.rs @@ -1,5 +1,130 @@ +use super::Word; use serde::{Deserialize, Serialize}; -use std::{ops::Add, time::Duration}; +use std::time::Duration; +use tokio::{ + select, + sync::mpsc, + time::{interval, MissedTickBehavior}, +}; +use tracing::{debug, warn}; + +pub struct Monitor { + mqtt: mqtt::Handle, + modbus: super::Handle, + address: u16, + register: Register, + register_type: RegisterType, +} + +impl Monitor { + pub fn new( + register: Register, + register_type: RegisterType, + address: u16, + mqtt: mqtt::Handle, + modbus: super::Handle, + ) -> Monitor { + Monitor { + mqtt, + register_type, + register, + address, + modbus, + } + } + + pub async fn run(self) { + tokio::spawn(async move { + let mut interval = interval(self.register.interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + interval.tick().await; + if let Ok(words) = self.read().await { + debug!(address=%self.address, "type"=?self.register_type, ?words); + + #[cfg(debug_assertions)] + self.mqtt + .publish("raw", serde_json::to_vec(&words).unwrap()) + .await + .unwrap(); + + let value = self + .register + .parse_words(&self.register.apply_swaps(&words)); + + self.mqtt + .publish("state", serde_json::to_vec(&value).unwrap()) + .await + .unwrap(); + } + } + }); + } + + async fn read(&self) -> crate::Result> { + match self.register_type { + RegisterType::Input => { + self.modbus + .read_input_register(self.address, self.register.size()) + .await + } + RegisterType::Holding => { + self.modbus + .read_holding_register(self.address, self.register.size()) + .await + } + } + } +} + +pub(crate) async fn subscribe( + mqtt: &mqtt::Handle, +) -> crate::Result> { + let (tx, rx) = mpsc::channel(8); + let mut input_registers = mqtt.subscribe("input/+").await?; + let mut holding_registers = mqtt.subscribe("holding/+").await?; + + tokio::spawn(async move { + fn to_register(payload: &Payload) -> crate::Result { + let Payload { bytes, topic } = payload; + let address = topic + .rsplit("/") + .next() + .expect("subscribed topic guarantees we have a last segment") + .parse()?; + Ok(AddressedRegister { + address, + register: serde_json::from_slice(&bytes)?, + }) + } + + loop { + select! { + Some(ref payload) = input_registers.recv() => { + match to_register(payload) { + Ok(register) => if let Err(_) = tx.send((RegisterType::Input, register)).await { break; }, + Err(error) => warn!(?error, def=?payload.bytes, "ignoring invalid input register definition"), + } + }, + Some(ref payload) = holding_registers.recv() => { + match to_register(payload) { + Ok(register) => if let Err(_) = tx.send((RegisterType::Holding, register)).await { break; }, + Err(error) => warn!(?error, def=?payload.bytes, "ignoring invalid holding register definition"), + } + } + } + } + }); + + Ok(rx) +} + +#[derive(Clone, Copy, Debug)] +pub enum RegisterType { + Input, + Holding, +} #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase", default)] @@ -302,7 +427,7 @@ impl RegisterValueType { use serde_json::json; use RegisterValueType as T; - let bytes: Vec = words.iter().flat_map(|v| v.to_ne_bytes()).collect(); + let bytes: Vec = words.iter().flat_map(|v| v.to_be_bytes()).collect(); match *self { T::Numeric { ref of, ref adjust } => { @@ -314,44 +439,44 @@ impl RegisterValueType { N::U32 => { json!(bytes .try_into() - .map(|bytes| scale * Decimal::from(u32::from_le_bytes(bytes)) + offset) + .map(|bytes| scale * Decimal::from(u32::from_be_bytes(bytes)) + offset) .ok()) } N::U64 => { json!(bytes .try_into() - .map(|bytes| scale * Decimal::from(u64::from_le_bytes(bytes)) + offset) + .map(|bytes| scale * Decimal::from(u64::from_be_bytes(bytes)) + offset) .ok()) } N::I8 => { json!(vec![bytes[1]] .try_into() - .map(|bytes| scale * Decimal::from(i8::from_le_bytes(bytes)) + offset) + .map(|bytes| scale * Decimal::from(i8::from_be_bytes(bytes)) + offset) .ok()) } N::I16 => { json!(bytes .try_into() - .map(|bytes| scale * Decimal::from(i16::from_le_bytes(bytes)) + offset) + .map(|bytes| scale * Decimal::from(i16::from_be_bytes(bytes)) + offset) .ok()) } N::I32 => { json!(bytes .try_into() - .map(|bytes| scale * Decimal::from(i32::from_le_bytes(bytes)) + offset) + .map(|bytes| scale * Decimal::from(i32::from_be_bytes(bytes)) + offset) .ok()) } N::I64 => { json!(bytes .try_into() - .map(|bytes| scale * Decimal::from(i64::from_le_bytes(bytes)) + offset) + .map(|bytes| scale * Decimal::from(i64::from_be_bytes(bytes)) + offset) .ok()) } N::F32 => { json!(bytes .try_into() .map(|bytes| scale - * Decimal::from_f32(f32::from_le_bytes(bytes)).unwrap() + * Decimal::from_f32(f32::from_be_bytes(bytes)).unwrap() + offset) .ok()) } @@ -359,7 +484,7 @@ impl RegisterValueType { json!(bytes .try_into() .map(|bytes| scale - * Decimal::from_f64(f64::from_le_bytes(bytes)).unwrap() + * Decimal::from_f64(f64::from_be_bytes(bytes)).unwrap() + offset) .ok()) } @@ -374,6 +499,10 @@ impl RegisterValueType { } impl Register { + pub fn size(&self) -> u8 { + self.parse.value_type.size() + } + pub fn parse_words(&self, words: &[u16]) -> serde_json::Value { self.parse.value_type.parse_words(words) } @@ -397,28 +526,27 @@ impl Register { } #[cfg(test)] use pretty_assertions::assert_eq; + +use crate::mqtt::{self, Payload}; #[test] fn test_parse_1() { use serde_json::json; - let reg = AddressedRegister { - address: 42, - register: Register { - name: None, - interval: Default::default(), - parse: RegisterParse { - swap_bytes: Swap(false), - swap_words: Swap(false), - value_type: RegisterValueType::Numeric { - of: RegisterNumeric::I32, - adjust: RegisterNumericAdjustment { - scale: 0, - offset: 0, - }, + Register { + name: None, + interval: Default::default(), + parse: RegisterParse { + swap_bytes: Swap(false), + swap_words: Swap(false), + value_type: RegisterValueType::Numeric { + of: RegisterNumeric::I32, + adjust: RegisterNumericAdjustment { + scale: 0, + offset: 0, }, }, }, }; - assert_eq!(reg.register.parse_words(&[843, 0]), json!(843)); + assert_eq!(reg.parse_words(&[843, 0]), json!(843)); } diff --git a/modbus-mqtt/src/mqtt.rs b/modbus-mqtt/src/mqtt.rs index 1ed7ad4..857a45e 100644 --- a/modbus-mqtt/src/mqtt.rs +++ b/modbus-mqtt/src/mqtt.rs @@ -1,9 +1,9 @@ -use std::{collections::HashMap, future::Future}; +use std::collections::HashMap; use bytes::Bytes; use rumqttc::{ - mqttbytes::matches as matches_topic, mqttbytes::valid_topic, AsyncClient, Event, EventLoop, - MqttOptions, Publish, Subscribe, SubscribeFilter, + mqttbytes::matches as matches_topic, AsyncClient, Event, EventLoop, MqttOptions, Publish, + Subscribe, SubscribeFilter, }; use tokio::{ select, @@ -70,18 +70,6 @@ impl Connection { Ok(()) } - /// Create a handle for interacting with the MQTT server such that a pre-provided prefix is transparently added to - /// all relevant commands which use a topic. - pub fn prefixed_handle + Send>(&self, prefix: S) -> crate::Result { - let prefix = prefix.into(); - - if !valid_topic(&prefix) { - return Err("Prefix is not a valid topic".into()); - } - - Ok(self.handle().scoped(prefix)) - } - pub fn handle(&self) -> Handle { Handle { prefix: None, @@ -185,6 +173,10 @@ pub struct Handle { tx: Sender, } +// IDEA: make subscribe+publish _generic_ over the payload type, as long as it implements a Payload trait we define, +// which allows them to perform the serialization/deserialization to Bytes. For most domain types, the trait would be +// implemented to use serde_json but for Bytes and Vec it would just return itself. +// The return values may need to be crate::Result> or crate::Result>>. impl Handle { pub async fn subscribe>(&self, topic: S) -> crate::Result> { let (tx_bytes, rx) = mpsc::channel(8); diff --git a/modbus-mqtt/src/server.rs b/modbus-mqtt/src/server.rs index 6c3242e..24d0cbe 100644 --- a/modbus-mqtt/src/server.rs +++ b/modbus-mqtt/src/server.rs @@ -4,12 +4,9 @@ use crate::{ }; use rumqttc::MqttOptions; -use std::{future::Future, time::Duration}; -use tokio::{ - sync::{broadcast, mpsc}, - time::timeout, -}; -use tracing::{error, info}; +use std::future::Future; +use tokio::sync::{broadcast, mpsc}; +use tracing::error; pub async fn run + Send>( prefix: P, diff --git a/sungrow-winets/src/lib.rs b/sungrow-winets/src/lib.rs index e277c6b..9748d01 100644 --- a/sungrow-winets/src/lib.rs +++ b/sungrow-winets/src/lib.rs @@ -344,6 +344,7 @@ struct Device { dev_type: u8, // unit/slave ID + #[allow(dead_code)] #[serde(deserialize_with = "serde_aux::prelude::deserialize_number_from_string")] phys_addr: u8, // UNUSED: @@ -397,7 +398,7 @@ fn test_deserialize_device() { enum WebSocketMessage { Connect { token: String }, - DeviceList { list: Vec }, + // DeviceList { list: Vec }, // Not yet used: // State, // system state @@ -414,7 +415,7 @@ enum WebSocketMessage { #[derive(Debug, Deserialize)] struct ResultList { - count: u16, + // count: u16, #[serde(rename = "list")] items: Vec, }