From 120f62f0c779549fa5f175e5bd60445b6d03530f Mon Sep 17 00:00:00 2001 From: Bo Jeanes Date: Fri, 9 Sep 2022 19:23:05 +1000 Subject: [PATCH] Inline register type to Register struct --- modbus-mqtt/CHANGELOG.md | 4 +- modbus-mqtt/README.md | 43 ++++----- .../examples/sungrow-sh5.0rs-http.json | 2 +- modbus-mqtt/examples/sungrow-sh5.0rs.json | 8 +- modbus-mqtt/src/modbus/connection.rs | 18 +--- modbus-mqtt/src/modbus/connector.rs | 56 +++++++---- modbus-mqtt/src/modbus/register.rs | 94 ++++++++----------- 7 files changed, 108 insertions(+), 117 deletions(-) diff --git a/modbus-mqtt/CHANGELOG.md b/modbus-mqtt/CHANGELOG.md index c91b471..776c95d 100644 --- a/modbus-mqtt/CHANGELOG.md +++ b/modbus-mqtt/CHANGELOG.md @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning]. ## [Unreleased] -- ... +### Deprecated + +- Separate `holding` and `input` sections, in favour of specifying `register_type` field on the register definition to either `"input"` (default) or `"holding"`. ## [0.2.0] - 2022-09-09 diff --git a/modbus-mqtt/README.md b/modbus-mqtt/README.md index e287958..5b99fd6 100644 --- a/modbus-mqtt/README.md +++ b/modbus-mqtt/README.md @@ -103,43 +103,36 @@ Post to `$MODBUS_MQTT_TOPIC/$CONNECTION_ID/$TYPE/$ADDRESS` where `$TYPE` is one ```jsonc { - "name": null, // OPTIONAL - gives the register a name which is used in the register MQTT topics (must be a valid topic component) + "address": 5123, // REQUIRED - "interval": "1m", // OPTIONAL - how often to update the registers value to MQTT - // e.g.: 3s (every 3 seconds) - // 2m (every 2 minutes) - // 1h (every 1 hour) + "register_type": "input", // OPTIONAL - "swap_bytes": false, // OPTIONAL - "swap_words": false, // OPTIONAL + "name": null, // OPTIONAL - gives the register a name which is used in the register MQTT topics (must be a valid topic component) - "type": "s16", // OPTIONAL - // valid: s8, s16, s32, s64 (signed) - // u8, u16, u32, u64 (unsigned) - // f32, f64 (floating point) + "interval": "1m", // OPTIONAL - how often to update the registers value to MQTT + // e.g.: 3s (every 3 seconds) + // 2m (every 2 minutes) + // 1h (every 1 hour) - "scale": 0, // OPTIONAL - number in register will be multiplied by 10^(scale) - // e.g.: to turn kW into W, you would provide scale=3 - // to turn W into kW, you would provide scale=-3 + "swap_bytes": false, // OPTIONAL + "swap_words": false, // OPTIONAL - "offset": 0, // OPTIONAL - will be added to the final result (AFTER scaling) + "type": "s16", // OPTIONAL + // valid: s8, s16, s32, s64 (signed) + // u8, u16, u32, u64 (unsigned) + // f32, f64 (floating point) + "scale": 0, // OPTIONAL - number in register will be multiplied by 10^(scale) + // e.g.: to turn kW into W, you would provide scale=3 + // to turn W into kW, you would provide scale=-3 - // Additionally, "type" can be set to "array": - "type": "array", - "of": "u16" // The default array element is u16, but you can change it with the `of` field + "offset": 0, // OPTIONAL - will be added to the final result (AFTER scaling) } ``` -Further, the `type` field can additionally be set to `"array"`, in which case, a `count` field must be provided. The array elements default to `"s16"` but can be overriden in the `"of"` field. - -NOTE: this is likely to change such that there is always a `count` field (with default of 1) and if provided to be greater than 1, it will be interpreted to be an array of elements of the `type` specified. - -There is some code to accept `"string"` type (with a required `length` field) but this is experimental and untested. - ##### Register shorthand -When issuing the `connect` payload, you can optionally include `input` and/or `holding` fields as arrays containing the above register schema, as long as an `address` field is added. When present, these payloads will be replayed to the MQTT server as if the user had specified each register separately, as above. +When issuing the `connect` payload, you can optionally include a top-level `registers` array, containing the above register schema. When present, these payloads will be replayed to the MQTT server as if the user had specified each register separately, as above. This is a recommended way to specify connections, but the registers are broken out separately so that they can be dynamically added to too. diff --git a/modbus-mqtt/examples/sungrow-sh5.0rs-http.json b/modbus-mqtt/examples/sungrow-sh5.0rs-http.json index 2461328..4bd6d81 100644 --- a/modbus-mqtt/examples/sungrow-sh5.0rs-http.json +++ b/modbus-mqtt/examples/sungrow-sh5.0rs-http.json @@ -2,7 +2,7 @@ "host": "10.10.10.219", "unit": 1, "proto": "winet-s", - "input": [ + "registers": [ { "address": 13000, "type": "u16", diff --git a/modbus-mqtt/examples/sungrow-sh5.0rs.json b/modbus-mqtt/examples/sungrow-sh5.0rs.json index 4de50b7..93adafe 100644 --- a/modbus-mqtt/examples/sungrow-sh5.0rs.json +++ b/modbus-mqtt/examples/sungrow-sh5.0rs.json @@ -3,7 +3,7 @@ "unit": 1, "proto": "tcp", "address_offset": -1, - "input": [ + "registers": [ { "address": 5017, "type": "u32", @@ -87,25 +87,27 @@ "address": 5013, "name": "mppt2_current" } - ], - "holding": [ { + "register_type": "holding", "address": 13058, "name": "max_soc", "period": "90s", "scale": -1 }, { + "register_type": "holding", "address": 13059, "name": "min_soc", "period": "90s", "scale": -1 }, { + "register_type": "holding", "address": 13100, "name": "battery_reserve" }, { + "register_type": "holding", "address": 33148, "name": "forced_battery_power", "scale": 1 diff --git a/modbus-mqtt/src/modbus/connection.rs b/modbus-mqtt/src/modbus/connection.rs index 1521221..7db6e41 100644 --- a/modbus-mqtt/src/modbus/connection.rs +++ b/modbus-mqtt/src/modbus/connection.rs @@ -130,22 +130,12 @@ impl Connection { select! { Some(cmd) = self.rx.recv() => { self.process_command(cmd).await; }, - Some((reg_type, reg)) = registers_rx.recv() => { - debug!(?reg_type, ?reg); - let scope = format!( - "{}/{}", - match ®_type { - RegisterType::Input => "input", - RegisterType::Holding => "holding", - }, - reg.address - ); - let mqtt = self.mqtt.scoped(scope); + Some(register) = registers_rx.recv() => { + debug!(?register); + let mqtt = self.mqtt.scoped("registers"); let modbus = self.handle(); register::Monitor::new( - reg.register, - reg_type, - reg.address, + register, mqtt, modbus, ) diff --git a/modbus-mqtt/src/modbus/connector.rs b/modbus-mqtt/src/modbus/connector.rs index 712c71f..58efd76 100644 --- a/modbus-mqtt/src/modbus/connector.rs +++ b/modbus-mqtt/src/modbus/connector.rs @@ -2,7 +2,7 @@ use crate::modbus::{connection, register}; use crate::mqtt::{Payload, Scopable}; use crate::{mqtt, shutdown::Shutdown}; use serde::Deserialize; -use serde_json::value::RawValue as RawJSON; +use serde_json::value::Value as JSON; use tokio::select; use tracing::{debug, error, info}; @@ -81,34 +81,45 @@ async fn parse_and_connect( } Ok(()) } -async fn connect(config: Config<'_>, mqtt: mqtt::Handle, shutdown: Shutdown) -> crate::Result<()> { +async fn connect(config: Config, mqtt: mqtt::Handle, shutdown: Shutdown) -> crate::Result<()> { if shutdown.is_shutdown() { return Ok(()); } + #[allow(deprecated)] let Config { connection: settings, input, holding, + registers, } = config; let _ = connection::run(settings, mqtt.clone(), shutdown).await?; // TODO: consider waiting 1 second before sending the registers to MQTT, to ensure that the connection is listening. - for (reg_type, registers) in [("holding", holding), ("input", input)] { - let mqtt = mqtt.scoped(reg_type); + enum Type { + Holding, + Input, + Unchanged, + } + for (reg_type, registers) in [ + (Type::Holding, holding), + (Type::Input, input), + (Type::Unchanged, registers), + ] { + use register::*; + let mqtt = mqtt.scoped("registers"); for reg in registers { - if let Ok(r) = - serde_json::from_slice::(reg.get().as_bytes()) - { - let json = serde_json::to_vec(&r.register).unwrap(); // unwrap() should be fine because we JUST deserialized it successfully - mqtt.publish(r.address.to_string(), json).await?; - // if let Some(name) = r.register.name { - // r.register.name = None; - // let json = serde_json::to_vec(&r).unwrap(); // unwrap() should be fine because we JUST deserialized it successfully - // mqtt.publish(name, json).await?; - // } + if let Ok(mut reg) = serde_json::from_value::(reg) { + reg.register_type = match reg_type { + Type::Holding => RegisterType::Holding, + Type::Input => RegisterType::Input, + Type::Unchanged => reg.register_type, + }; + + let json = serde_json::to_vec(®).unwrap(); // unwrap() should be fine because we JUST deserialized it successfully + mqtt.publish(format!("{}/config", reg.path()), json).await?; } } } @@ -119,15 +130,22 @@ async fn connect(config: Config<'_>, mqtt: mqtt::Handle, shutdown: Shutdown) -> /// Wrapper around `modbus::connection::Config` that can include some registers inline, which the connector will /// re-publish to the appropriate topic once the connection is established. #[derive(Debug, Deserialize)] -struct Config<'a> { +struct Config { #[serde(flatten)] connection: connection::Config, // Allow registers to be defined inline, but capture them as raw JSON so that if they have incorrect schema, we can // still establish the Modbus connection. Valid registers will be re-emitted as individual register configs to MQTT, // to be picked up by the connection. - #[serde(default, borrow)] - pub input: Vec<&'a RawJSON>, - #[serde(alias = "hold", default, borrow)] - pub holding: Vec<&'a RawJSON>, + #[deprecated] + #[serde(default)] + input: Vec, + + #[deprecated] + #[serde(alias = "hold", default)] + holding: Vec, + + #[deprecated] + #[serde(default)] + registers: Vec, } diff --git a/modbus-mqtt/src/modbus/register.rs b/modbus-mqtt/src/modbus/register.rs index 1841165..6caaaeb 100644 --- a/modbus-mqtt/src/modbus/register.rs +++ b/modbus-mqtt/src/modbus/register.rs @@ -1,8 +1,8 @@ use super::Word; +use crate::mqtt::{self, Payload, Scopable}; use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::{ - select, sync::mpsc, time::{interval, MissedTickBehavior}, }; @@ -11,25 +11,15 @@ use tracing::{debug, warn}; pub struct Monitor { mqtt: mqtt::Handle, modbus: super::Handle, - address: u16, register: Register, - register_type: RegisterType, } impl Monitor { - pub fn new( - register: Register, - register_type: RegisterType, - address: u16, - mqtt: mqtt::Handle, - modbus: super::Handle, - ) -> Monitor { + pub fn new(register: Register, mqtt: mqtt::Handle, modbus: super::Handle) -> Monitor { Monitor { - mqtt, - register_type, - register, - address, + mqtt: mqtt.scoped(register.path()), modbus, + register, } } @@ -41,9 +31,9 @@ impl Monitor { loop { interval.tick().await; if let Ok(words) = self.read().await { - debug!(address=%self.address, "type"=?self.register_type, ?words); + debug!(address=%self.register.address, "type"=?self.register.register_type, ?words); - #[cfg(debug_assertions)] + #[cfg(feature = "raw")] self.mqtt .publish("raw", serde_json::to_vec(&words).unwrap()) .await @@ -61,54 +51,41 @@ impl Monitor { } async fn read(&self) -> crate::Result> { - match self.register_type { + let Self { ref register, .. } = self; + match register.register_type { RegisterType::Input => { self.modbus - .read_input_register(self.address, self.register.size()) + .read_input_register(register.address, register.size()) .await } RegisterType::Holding => { self.modbus - .read_holding_register(self.address, self.register.size()) + .read_holding_register(register.address, register.size()) .await } } } } -pub(crate) async fn subscribe( - mqtt: &mqtt::Handle, -) -> crate::Result> { +pub(crate) async fn subscribe(mqtt: &mqtt::Handle) -> crate::Result> { let (tx, rx) = mpsc::channel(8); - let mut input_registers = mqtt.subscribe("input/+").await?; - let mut holding_registers = mqtt.subscribe("holding/+").await?; + let mut registers = mqtt.subscribe("registers/+/config").await?; tokio::spawn(async move { - fn to_register(payload: &Payload) -> crate::Result { - let Payload { bytes, topic } = payload; - let address = topic - .rsplit('/') - .next() - .expect("subscribed topic guarantees we have a last segment") - .parse()?; - Ok(AddressedRegister { - address, - register: serde_json::from_slice(bytes)?, - }) + fn to_register(payload: &Payload) -> crate::Result { + Ok(serde_json::from_slice(&payload.bytes)?) } loop { - select! { - Some(ref payload) = input_registers.recv() => { - match to_register(payload) { - Ok(register) => if (tx.send((RegisterType::Input, register)).await).is_err() { break; }, - Err(error) => warn!(?error, def=?payload.bytes, "ignoring invalid input register definition"), + if let Some(ref payload) = registers.recv().await { + match to_register(payload) { + Ok(register) => { + if (tx.send(register).await).is_err() { + break; + } } - }, - Some(ref payload) = holding_registers.recv() => { - match to_register(payload) { - Ok(register) => if (tx.send((RegisterType::Holding, register)).await).is_err() { break; }, - Err(error) => warn!(?error, def=?payload.bytes, "ignoring invalid holding register definition"), + Err(error) => { + warn!(?error, def=?payload.bytes, "ignoring invalid input register definition") } } } @@ -118,8 +95,10 @@ pub(crate) async fn subscribe( Ok(rx) } -#[derive(Clone, Copy, Debug)] +#[derive(Deserialize, Serialize, PartialEq, Default, Clone, Copy, Debug)] +#[serde(rename_all = "lowercase")] pub enum RegisterType { + #[default] Input, Holding, } @@ -280,6 +259,11 @@ pub struct Register { #[serde(skip_serializing_if = "Option::is_none")] pub name: Option, + pub address: u16, + + #[serde(default, skip_serializing_if = "IsDefault::is_default")] + pub register_type: RegisterType, + #[serde(flatten, default, skip_serializing_if = "IsDefault::is_default")] pub parse: RegisterParse, @@ -291,13 +275,6 @@ pub struct Register { )] pub interval: Duration, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct AddressedRegister { - pub address: u16, - - #[serde(flatten)] - pub register: Register, -} fn default_register_interval() -> Duration { Duration::from_secs(60) @@ -501,6 +478,14 @@ impl Register { self.parse.value_type.size() } + pub fn path(&self) -> String { + if let Some(ref name) = self.name { + name.clone() + } else { + self.address.to_string() + } + } + pub fn parse_words(&self, words: &[u16]) -> serde_json::Value { self.parse.value_type.parse_words(&self.apply_swaps(words)) } @@ -525,12 +510,13 @@ impl Register { #[cfg(test)] use pretty_assertions::assert_eq; -use crate::mqtt::{self, Payload}; #[test] fn test_parse_1() { use serde_json::json; let reg = Register { + register_type: RegisterType::Input, + address: 42, name: None, interval: Default::default(), parse: RegisterParse {