diff --git a/src/main.rs b/src/main.rs index 017bb30..be52632 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use rumqttc::{self, AsyncClient, Event, Incoming, LastWill, MqttOptions, Publish, QoS}; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use serde_json::json; use std::{collections::HashMap, time::Duration}; use tokio::{sync::mpsc, sync::oneshot, time::MissedTickBehavior}; @@ -7,6 +7,8 @@ use tokio_modbus::prelude::*; use clap::Parser; +mod modbus; + #[derive(Parser)] struct Cli { mqtt_host: String, @@ -28,223 +30,6 @@ struct Cli { mqtt_topic_prefix: String, } -#[derive(Clone, Serialize, Deserialize)] -#[serde(untagged)] -enum ModbusProto { - Tcp { - host: String, - - #[serde(default = "default_modbus_port")] - port: u16, - }, - #[serde(rename_all = "lowercase")] - Rtu { - // tty: std::path::PathBuf, - tty: String, - baud_rate: u32, - - #[serde(default = "default_modbus_data_bits")] - data_bits: tokio_serial::DataBits, // TODO: allow this to be represented as a number instead of string - - #[serde(default = "default_modbus_stop_bits")] - stop_bits: tokio_serial::StopBits, // TODO: allow this to be represented as a number instead of string - - #[serde(default = "default_modbus_flow_control")] - flow_control: tokio_serial::FlowControl, - - #[serde(default = "default_modbus_parity")] - parity: tokio_serial::Parity, - }, -} - -fn default_modbus_port() -> u16 { - 502 -} - -fn default_modbus_data_bits() -> tokio_serial::DataBits { - tokio_serial::DataBits::Eight -} - -fn default_modbus_stop_bits() -> tokio_serial::StopBits { - tokio_serial::StopBits::One -} - -fn default_modbus_flow_control() -> tokio_serial::FlowControl { - tokio_serial::FlowControl::None -} - -fn default_modbus_parity() -> tokio_serial::Parity { - tokio_serial::Parity::None -} - -#[derive(Clone, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -// TODO: `scale`, `offset`, `precision` -enum RegisterFixedValueType { - U8, - U16, - U32, - U64, - - I8, - I16, - I32, - I64, - - F32, - F64, -} - -impl RegisterFixedValueType { - // Modbus limits sequential reads to 125 apparently, so 8-bit should be fine - https://github.com/slowtec/tokio-modbus/issues/112#issuecomment-1095316069= - fn size(&self) -> u8 { - use RegisterFixedValueType::*; - // Each Modbus register holds 16-bits, so count is half what the byte count would be - match self { - U8 => 1, - U16 => 1, - U32 => 2, - U64 => 4, - I8 => 1, - I16 => 1, - I32 => 2, - I64 => 4, - F32 => 2, - F64 => 4, - } - } -} - -#[derive(Clone, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -enum RegisterVariableValueType { - String, - Array(RegisterFixedValueType), -} - -#[derive(Clone, Serialize, Deserialize)] -#[serde(untagged, rename_all = "lowercase")] -enum RegisterValueType { - Fixed(RegisterFixedValueType), - Variable(RegisterVariableValueType, u8), -} - -impl RegisterValueType { - // Modbus limits sequential reads to 125 apparently, so 8-bit should be fine - https://github.com/slowtec/tokio-modbus/issues/112#issuecomment-1095316069= - fn size(&self) -> u8 { - use RegisterValueType::*; - use RegisterVariableValueType::*; - - match self { - Fixed(fixed) => fixed.size(), - Variable(variable, count) => match variable { - String => *count, - Array(fixed) => *count * fixed.size(), - }, - } - } -} - -#[derive(Clone, Serialize, Deserialize)] -struct RegisterParse { - #[serde(default = "default_swap")] - swap_bytes: bool, - - #[serde(default = "default_swap")] - swap_words: bool, - - #[serde(rename = "type", default = "default_value_type")] - value_type: RegisterValueType, -} - -fn default_swap() -> bool { - false -} - -fn default_value_type() -> RegisterValueType { - RegisterValueType::Fixed(RegisterFixedValueType::U16) -} - -#[derive(Clone, Serialize, Deserialize)] -struct Register { - address: u16, - - #[serde(skip_serializing_if = "Option::is_none")] - name: Option, - - #[serde(flatten, default = "default_register_parse")] - parse: RegisterParse, - - #[serde( - with = "humantime_serde", - default = "default_register_interval", - alias = "period", - alias = "duration" - )] - interval: Duration, -} - -fn default_register_interval() -> Duration { - Duration::from_secs(60) -} - -fn default_register_parse() -> RegisterParse { - RegisterParse { - swap_bytes: default_swap(), - swap_words: default_swap(), - value_type: default_value_type(), - } -} - -#[derive(Clone, Serialize, Deserialize)] -struct Connect { - #[serde(flatten)] - settings: ModbusProto, - - #[serde(default, skip_serializing_if = "Vec::is_empty")] - input: Vec, - - #[serde(default, skip_serializing_if = "Vec::is_empty")] - hold: Vec, - - #[serde(alias = "slave", default = "default_modbus_unit", with = "ext::Unit")] - unit: Unit, - - #[serde(default = "default_address_offset")] - address_offset: i8, -} - -fn default_modbus_unit() -> Unit { - Slave(0) -} -fn default_address_offset() -> i8 { - 0 -} - -type UnitId = SlaveId; -type Unit = Slave; -mod ext { - use serde::{Deserialize, Serialize}; - #[derive(Serialize, Deserialize)] - #[serde(remote = "tokio_modbus::slave::Slave")] - pub struct Unit(pub crate::UnitId); -} - -#[derive(Serialize)] -#[serde(rename_all = "lowercase")] -enum ConnectState { - Connected, - Disconnected, - Errored, -} - -#[derive(Serialize)] -struct ConnectStatus { - #[serde(flatten)] - connect: Connect, - status: ConnectState, -} - #[derive(Serialize)] #[serde(rename_all = "lowercase")] enum MainStatus { @@ -446,6 +231,8 @@ async fn handle_connect( topic_prefix: String, payload: bytes::Bytes, ) { + use modbus::config::*; + use modbus::ConnectState; println!("Starting connection handler for {}", id); match serde_json::from_slice::(&payload) { Ok(connect) => { @@ -473,7 +260,7 @@ async fn handle_connect( rtu::connect_slave(port, unit).await.unwrap() } }; - let status = ConnectStatus { + let status = modbus::ConnectStatus { connect: connect.clone(), status: ConnectState::Connected, }; @@ -505,9 +292,13 @@ async fn handle_connect( responder .send( modbus - .write_multiple_registers(address, &data[..]) - .await - .map(|_| vec![]), + .read_write_multiple_registers( + address, + data.len() as u16, + address, + &data[..], + ) + .await, ) .unwrap(); } @@ -563,11 +354,11 @@ async fn watch_registers( read_type: ModbusReadType, address_offset: i8, duration: Duration, - registers: Vec, + registers: Vec, modbus: mpsc::Sender, dispatcher: mpsc::Sender, registers_prefix: String, -) { +) -> ! { let mut interval = tokio::time::interval(duration); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -601,7 +392,55 @@ async fn watch_registers( let values = rx.await.unwrap().unwrap(); - let payload = serde_json::to_vec(&json!({ "raw": values, })).unwrap(); + let swapped_values = if r.parse.swap_bytes { + values.iter().map(|v| v.swap_bytes()).collect() + } else { + values.clone() + }; + + let swapped_values = if r.parse.swap_words { + swapped_values + .chunks_exact(2) + .flat_map(|chunk| vec![chunk[1], chunk[0]]) + .collect() + } else { + swapped_values + }; + + let bytes: Vec = swapped_values + .iter() + .flat_map(|v| v.to_ne_bytes()) + .collect(); + + use crate::modbus::config::RegisterFixedValueType::*; + use crate::modbus::config::RegisterValueType::*; + use crate::modbus::config::RegisterVariableValueType as Var; + + let value = match r.parse.value_type { + Fixed(ref fixed) => match fixed { + U8 => json!(bytes[1]), // or is it 0? + U16 => json!(swapped_values[0]), + U32 => json!(bytes.try_into().map(|bytes| u32::from_le_bytes(bytes)).ok()), + U64 => json!(bytes.try_into().map(|bytes| u64::from_le_bytes(bytes)).ok()), + I8 => json!(vec![bytes[1]] + .try_into() + .map(|bytes| i8::from_le_bytes(bytes))), + I16 => json!(bytes.try_into().map(|bytes| i16::from_le_bytes(bytes)).ok()), + I32 => json!(bytes.try_into().map(|bytes| i32::from_le_bytes(bytes)).ok()), + I64 => json!(bytes.try_into().map(|bytes| i64::from_le_bytes(bytes)).ok()), + F32 => json!(bytes.try_into().map(|bytes| f32::from_le_bytes(bytes)).ok()), + F64 => json!(bytes.try_into().map(|bytes| f64::from_le_bytes(bytes)).ok()), + }, + Variable(ref var, _count) => match var { + Var::String => json!(String::from_utf16_lossy(&swapped_values)), + Var::Array(_) => todo!(), + }, + }; + + let payload = serde_json::to_vec( + &json!({ "raw": values, "swapped": swapped_values, "value": value }), + ) + .unwrap(); dispatcher .send(DispatchCommand::Publish { diff --git a/src/modbus/config.rs b/src/modbus/config.rs new file mode 100644 index 0000000..f369b83 --- /dev/null +++ b/src/modbus/config.rs @@ -0,0 +1,204 @@ +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +#[derive(Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum ModbusProto { + Tcp { + host: String, + + #[serde(default = "default_modbus_port")] + port: u16, + }, + #[serde(rename_all = "lowercase")] + Rtu { + // tty: std::path::PathBuf, + tty: String, + baud_rate: u32, + + #[serde(default = "default_modbus_data_bits")] + data_bits: tokio_serial::DataBits, // TODO: allow this to be represented as a number instead of string + + #[serde(default = "default_modbus_stop_bits")] + stop_bits: tokio_serial::StopBits, // TODO: allow this to be represented as a number instead of string + + #[serde(default = "default_modbus_flow_control")] + flow_control: tokio_serial::FlowControl, + + #[serde(default = "default_modbus_parity")] + parity: tokio_serial::Parity, + }, +} + +fn default_modbus_port() -> u16 { + 502 +} + +fn default_modbus_data_bits() -> tokio_serial::DataBits { + tokio_serial::DataBits::Eight +} + +fn default_modbus_stop_bits() -> tokio_serial::StopBits { + tokio_serial::StopBits::One +} + +fn default_modbus_flow_control() -> tokio_serial::FlowControl { + tokio_serial::FlowControl::None +} + +fn default_modbus_parity() -> tokio_serial::Parity { + tokio_serial::Parity::None +} + +#[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +// TODO: `scale`, `offset`, `precision` +pub enum RegisterFixedValueType { + U8, + U16, + U32, + U64, + + #[serde(alias = "s8")] + I8, + #[serde(alias = "s16")] + I16, + #[serde(alias = "s32")] + I32, + #[serde(alias = "s64")] + I64, + + F32, + F64, +} + +impl RegisterFixedValueType { + // Modbus limits sequential reads to 125 apparently, so 8-bit should be fine - https://github.com/slowtec/tokio-modbus/issues/112#issuecomment-1095316069= + fn size(&self) -> u8 { + use RegisterFixedValueType::*; + // Each Modbus register holds 16-bits, so count is half what the byte count would be + match self { + U8 => 1, + U16 => 1, + U32 => 2, + U64 => 4, + I8 => 1, + I16 => 1, + I32 => 2, + I64 => 4, + F32 => 2, + F64 => 4, + } + } +} + +#[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum RegisterVariableValueType { + String, + Array(RegisterFixedValueType), +} + +#[derive(Clone, Serialize, Deserialize)] +#[serde(untagged, rename_all = "lowercase")] +pub enum RegisterValueType { + Fixed(RegisterFixedValueType), + Variable(RegisterVariableValueType, u8), +} + +impl RegisterValueType { + // Modbus limits sequential reads to 125 apparently, so 8-bit should be fine - https://github.com/slowtec/tokio-modbus/issues/112#issuecomment-1095316069= + pub fn size(&self) -> u8 { + use RegisterValueType::*; + use RegisterVariableValueType::*; + + match self { + Fixed(fixed) => fixed.size(), + Variable(variable, count) => match variable { + String => *count, + Array(fixed) => *count * fixed.size(), + }, + } + } +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct RegisterParse { + #[serde(default = "default_swap")] + pub swap_bytes: bool, + + #[serde(default = "default_swap")] + pub swap_words: bool, + + #[serde(rename = "type", default = "default_value_type")] + pub value_type: RegisterValueType, +} + +fn default_swap() -> bool { + false +} + +fn default_value_type() -> RegisterValueType { + RegisterValueType::Fixed(RegisterFixedValueType::U16) +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct Register { + pub address: u16, + + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + + #[serde(flatten, default = "default_register_parse")] + pub parse: RegisterParse, + + #[serde( + with = "humantime_serde", + default = "default_register_interval", + alias = "period", + alias = "duration" + )] + pub interval: Duration, +} + +fn default_register_interval() -> Duration { + Duration::from_secs(60) +} + +fn default_register_parse() -> RegisterParse { + RegisterParse { + swap_bytes: default_swap(), + swap_words: default_swap(), + value_type: default_value_type(), + } +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct Connect { + #[serde(flatten)] + pub settings: ModbusProto, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub input: Vec, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub hold: Vec, + + #[serde( + alias = "slave", + default = "tokio_modbus::slave::Slave::broadcast", + with = "Unit" + )] + pub unit: crate::modbus::Unit, + + #[serde(default = "default_address_offset")] + pub address_offset: i8, +} + +#[derive(Serialize, Deserialize)] +#[serde(remote = "tokio_modbus::slave::Slave")] +struct Unit(crate::modbus::UnitId); + +fn default_address_offset() -> i8 { + 0 +} diff --git a/src/modbus/mod.rs b/src/modbus/mod.rs new file mode 100644 index 0000000..5a39435 --- /dev/null +++ b/src/modbus/mod.rs @@ -0,0 +1,21 @@ +use serde::Serialize; + +pub mod config; + +#[derive(Serialize)] +#[serde(rename_all = "lowercase")] +pub enum ConnectState { + Connected, + Disconnected, + Errored, +} + +#[derive(Serialize)] +pub struct ConnectStatus { + #[serde(flatten)] + pub connect: config::Connect, + pub status: ConnectState, +} + +pub type UnitId = tokio_modbus::prelude::SlaveId; +pub type Unit = tokio_modbus::prelude::Slave;