1
0
Fork 0

Start grouping into modules

gh-action
Bo Jeanes 2022-07-21 17:57:41 +10:00
parent ca5639b6e4
commit 6bcb0420ec
3 changed files with 289 additions and 225 deletions

View File

@ -1,5 +1,5 @@
use rumqttc::{self, AsyncClient, Event, Incoming, LastWill, MqttOptions, Publish, QoS}; use rumqttc::{self, AsyncClient, Event, Incoming, LastWill, MqttOptions, Publish, QoS};
use serde::{Deserialize, Serialize}; use serde::Serialize;
use serde_json::json; use serde_json::json;
use std::{collections::HashMap, time::Duration}; use std::{collections::HashMap, time::Duration};
use tokio::{sync::mpsc, sync::oneshot, time::MissedTickBehavior}; use tokio::{sync::mpsc, sync::oneshot, time::MissedTickBehavior};
@ -7,6 +7,8 @@ use tokio_modbus::prelude::*;
use clap::Parser; use clap::Parser;
mod modbus;
#[derive(Parser)] #[derive(Parser)]
struct Cli { struct Cli {
mqtt_host: String, mqtt_host: String,
@ -28,223 +30,6 @@ struct Cli {
mqtt_topic_prefix: String, mqtt_topic_prefix: String,
} }
#[derive(Clone, Serialize, Deserialize)]
#[serde(untagged)]
enum ModbusProto {
Tcp {
host: String,
#[serde(default = "default_modbus_port")]
port: u16,
},
#[serde(rename_all = "lowercase")]
Rtu {
// tty: std::path::PathBuf,
tty: String,
baud_rate: u32,
#[serde(default = "default_modbus_data_bits")]
data_bits: tokio_serial::DataBits, // TODO: allow this to be represented as a number instead of string
#[serde(default = "default_modbus_stop_bits")]
stop_bits: tokio_serial::StopBits, // TODO: allow this to be represented as a number instead of string
#[serde(default = "default_modbus_flow_control")]
flow_control: tokio_serial::FlowControl,
#[serde(default = "default_modbus_parity")]
parity: tokio_serial::Parity,
},
}
fn default_modbus_port() -> u16 {
502
}
fn default_modbus_data_bits() -> tokio_serial::DataBits {
tokio_serial::DataBits::Eight
}
fn default_modbus_stop_bits() -> tokio_serial::StopBits {
tokio_serial::StopBits::One
}
fn default_modbus_flow_control() -> tokio_serial::FlowControl {
tokio_serial::FlowControl::None
}
fn default_modbus_parity() -> tokio_serial::Parity {
tokio_serial::Parity::None
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
// TODO: `scale`, `offset`, `precision`
enum RegisterFixedValueType {
U8,
U16,
U32,
U64,
I8,
I16,
I32,
I64,
F32,
F64,
}
impl RegisterFixedValueType {
// Modbus limits sequential reads to 125 apparently, so 8-bit should be fine - https://github.com/slowtec/tokio-modbus/issues/112#issuecomment-1095316069=
fn size(&self) -> u8 {
use RegisterFixedValueType::*;
// Each Modbus register holds 16-bits, so count is half what the byte count would be
match self {
U8 => 1,
U16 => 1,
U32 => 2,
U64 => 4,
I8 => 1,
I16 => 1,
I32 => 2,
I64 => 4,
F32 => 2,
F64 => 4,
}
}
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
enum RegisterVariableValueType {
String,
Array(RegisterFixedValueType),
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(untagged, rename_all = "lowercase")]
enum RegisterValueType {
Fixed(RegisterFixedValueType),
Variable(RegisterVariableValueType, u8),
}
impl RegisterValueType {
// Modbus limits sequential reads to 125 apparently, so 8-bit should be fine - https://github.com/slowtec/tokio-modbus/issues/112#issuecomment-1095316069=
fn size(&self) -> u8 {
use RegisterValueType::*;
use RegisterVariableValueType::*;
match self {
Fixed(fixed) => fixed.size(),
Variable(variable, count) => match variable {
String => *count,
Array(fixed) => *count * fixed.size(),
},
}
}
}
#[derive(Clone, Serialize, Deserialize)]
struct RegisterParse {
#[serde(default = "default_swap")]
swap_bytes: bool,
#[serde(default = "default_swap")]
swap_words: bool,
#[serde(rename = "type", default = "default_value_type")]
value_type: RegisterValueType,
}
fn default_swap() -> bool {
false
}
fn default_value_type() -> RegisterValueType {
RegisterValueType::Fixed(RegisterFixedValueType::U16)
}
#[derive(Clone, Serialize, Deserialize)]
struct Register {
address: u16,
#[serde(skip_serializing_if = "Option::is_none")]
name: Option<String>,
#[serde(flatten, default = "default_register_parse")]
parse: RegisterParse,
#[serde(
with = "humantime_serde",
default = "default_register_interval",
alias = "period",
alias = "duration"
)]
interval: Duration,
}
fn default_register_interval() -> Duration {
Duration::from_secs(60)
}
fn default_register_parse() -> RegisterParse {
RegisterParse {
swap_bytes: default_swap(),
swap_words: default_swap(),
value_type: default_value_type(),
}
}
#[derive(Clone, Serialize, Deserialize)]
struct Connect {
#[serde(flatten)]
settings: ModbusProto,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
input: Vec<Register>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
hold: Vec<Register>,
#[serde(alias = "slave", default = "default_modbus_unit", with = "ext::Unit")]
unit: Unit,
#[serde(default = "default_address_offset")]
address_offset: i8,
}
fn default_modbus_unit() -> Unit {
Slave(0)
}
fn default_address_offset() -> i8 {
0
}
type UnitId = SlaveId;
type Unit = Slave;
mod ext {
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
#[serde(remote = "tokio_modbus::slave::Slave")]
pub struct Unit(pub crate::UnitId);
}
#[derive(Serialize)]
#[serde(rename_all = "lowercase")]
enum ConnectState {
Connected,
Disconnected,
Errored,
}
#[derive(Serialize)]
struct ConnectStatus {
#[serde(flatten)]
connect: Connect,
status: ConnectState,
}
#[derive(Serialize)] #[derive(Serialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
enum MainStatus { enum MainStatus {
@ -446,6 +231,8 @@ async fn handle_connect(
topic_prefix: String, topic_prefix: String,
payload: bytes::Bytes, payload: bytes::Bytes,
) { ) {
use modbus::config::*;
use modbus::ConnectState;
println!("Starting connection handler for {}", id); println!("Starting connection handler for {}", id);
match serde_json::from_slice::<Connect>(&payload) { match serde_json::from_slice::<Connect>(&payload) {
Ok(connect) => { Ok(connect) => {
@ -473,7 +260,7 @@ async fn handle_connect(
rtu::connect_slave(port, unit).await.unwrap() rtu::connect_slave(port, unit).await.unwrap()
} }
}; };
let status = ConnectStatus { let status = modbus::ConnectStatus {
connect: connect.clone(), connect: connect.clone(),
status: ConnectState::Connected, status: ConnectState::Connected,
}; };
@ -505,9 +292,13 @@ async fn handle_connect(
responder responder
.send( .send(
modbus modbus
.write_multiple_registers(address, &data[..]) .read_write_multiple_registers(
.await address,
.map(|_| vec![]), data.len() as u16,
address,
&data[..],
)
.await,
) )
.unwrap(); .unwrap();
} }
@ -563,11 +354,11 @@ async fn watch_registers(
read_type: ModbusReadType, read_type: ModbusReadType,
address_offset: i8, address_offset: i8,
duration: Duration, duration: Duration,
registers: Vec<Register>, registers: Vec<modbus::config::Register>,
modbus: mpsc::Sender<ModbusCommand>, modbus: mpsc::Sender<ModbusCommand>,
dispatcher: mpsc::Sender<DispatchCommand>, dispatcher: mpsc::Sender<DispatchCommand>,
registers_prefix: String, registers_prefix: String,
) { ) -> ! {
let mut interval = tokio::time::interval(duration); let mut interval = tokio::time::interval(duration);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay); interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
@ -601,7 +392,55 @@ async fn watch_registers(
let values = rx.await.unwrap().unwrap(); let values = rx.await.unwrap().unwrap();
let payload = serde_json::to_vec(&json!({ "raw": values, })).unwrap(); let swapped_values = if r.parse.swap_bytes {
values.iter().map(|v| v.swap_bytes()).collect()
} else {
values.clone()
};
let swapped_values = if r.parse.swap_words {
swapped_values
.chunks_exact(2)
.flat_map(|chunk| vec![chunk[1], chunk[0]])
.collect()
} else {
swapped_values
};
let bytes: Vec<u8> = swapped_values
.iter()
.flat_map(|v| v.to_ne_bytes())
.collect();
use crate::modbus::config::RegisterFixedValueType::*;
use crate::modbus::config::RegisterValueType::*;
use crate::modbus::config::RegisterVariableValueType as Var;
let value = match r.parse.value_type {
Fixed(ref fixed) => match fixed {
U8 => json!(bytes[1]), // or is it 0?
U16 => json!(swapped_values[0]),
U32 => json!(bytes.try_into().map(|bytes| u32::from_le_bytes(bytes)).ok()),
U64 => json!(bytes.try_into().map(|bytes| u64::from_le_bytes(bytes)).ok()),
I8 => json!(vec![bytes[1]]
.try_into()
.map(|bytes| i8::from_le_bytes(bytes))),
I16 => json!(bytes.try_into().map(|bytes| i16::from_le_bytes(bytes)).ok()),
I32 => json!(bytes.try_into().map(|bytes| i32::from_le_bytes(bytes)).ok()),
I64 => json!(bytes.try_into().map(|bytes| i64::from_le_bytes(bytes)).ok()),
F32 => json!(bytes.try_into().map(|bytes| f32::from_le_bytes(bytes)).ok()),
F64 => json!(bytes.try_into().map(|bytes| f64::from_le_bytes(bytes)).ok()),
},
Variable(ref var, _count) => match var {
Var::String => json!(String::from_utf16_lossy(&swapped_values)),
Var::Array(_) => todo!(),
},
};
let payload = serde_json::to_vec(
&json!({ "raw": values, "swapped": swapped_values, "value": value }),
)
.unwrap();
dispatcher dispatcher
.send(DispatchCommand::Publish { .send(DispatchCommand::Publish {

View File

@ -0,0 +1,204 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ModbusProto {
Tcp {
host: String,
#[serde(default = "default_modbus_port")]
port: u16,
},
#[serde(rename_all = "lowercase")]
Rtu {
// tty: std::path::PathBuf,
tty: String,
baud_rate: u32,
#[serde(default = "default_modbus_data_bits")]
data_bits: tokio_serial::DataBits, // TODO: allow this to be represented as a number instead of string
#[serde(default = "default_modbus_stop_bits")]
stop_bits: tokio_serial::StopBits, // TODO: allow this to be represented as a number instead of string
#[serde(default = "default_modbus_flow_control")]
flow_control: tokio_serial::FlowControl,
#[serde(default = "default_modbus_parity")]
parity: tokio_serial::Parity,
},
}
fn default_modbus_port() -> u16 {
502
}
fn default_modbus_data_bits() -> tokio_serial::DataBits {
tokio_serial::DataBits::Eight
}
fn default_modbus_stop_bits() -> tokio_serial::StopBits {
tokio_serial::StopBits::One
}
fn default_modbus_flow_control() -> tokio_serial::FlowControl {
tokio_serial::FlowControl::None
}
fn default_modbus_parity() -> tokio_serial::Parity {
tokio_serial::Parity::None
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
// TODO: `scale`, `offset`, `precision`
pub enum RegisterFixedValueType {
U8,
U16,
U32,
U64,
#[serde(alias = "s8")]
I8,
#[serde(alias = "s16")]
I16,
#[serde(alias = "s32")]
I32,
#[serde(alias = "s64")]
I64,
F32,
F64,
}
impl RegisterFixedValueType {
// Modbus limits sequential reads to 125 apparently, so 8-bit should be fine - https://github.com/slowtec/tokio-modbus/issues/112#issuecomment-1095316069=
fn size(&self) -> u8 {
use RegisterFixedValueType::*;
// Each Modbus register holds 16-bits, so count is half what the byte count would be
match self {
U8 => 1,
U16 => 1,
U32 => 2,
U64 => 4,
I8 => 1,
I16 => 1,
I32 => 2,
I64 => 4,
F32 => 2,
F64 => 4,
}
}
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RegisterVariableValueType {
String,
Array(RegisterFixedValueType),
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(untagged, rename_all = "lowercase")]
pub enum RegisterValueType {
Fixed(RegisterFixedValueType),
Variable(RegisterVariableValueType, u8),
}
impl RegisterValueType {
// Modbus limits sequential reads to 125 apparently, so 8-bit should be fine - https://github.com/slowtec/tokio-modbus/issues/112#issuecomment-1095316069=
pub fn size(&self) -> u8 {
use RegisterValueType::*;
use RegisterVariableValueType::*;
match self {
Fixed(fixed) => fixed.size(),
Variable(variable, count) => match variable {
String => *count,
Array(fixed) => *count * fixed.size(),
},
}
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct RegisterParse {
#[serde(default = "default_swap")]
pub swap_bytes: bool,
#[serde(default = "default_swap")]
pub swap_words: bool,
#[serde(rename = "type", default = "default_value_type")]
pub value_type: RegisterValueType,
}
fn default_swap() -> bool {
false
}
fn default_value_type() -> RegisterValueType {
RegisterValueType::Fixed(RegisterFixedValueType::U16)
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Register {
pub address: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(flatten, default = "default_register_parse")]
pub parse: RegisterParse,
#[serde(
with = "humantime_serde",
default = "default_register_interval",
alias = "period",
alias = "duration"
)]
pub interval: Duration,
}
fn default_register_interval() -> Duration {
Duration::from_secs(60)
}
fn default_register_parse() -> RegisterParse {
RegisterParse {
swap_bytes: default_swap(),
swap_words: default_swap(),
value_type: default_value_type(),
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Connect {
#[serde(flatten)]
pub settings: ModbusProto,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub input: Vec<Register>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub hold: Vec<Register>,
#[serde(
alias = "slave",
default = "tokio_modbus::slave::Slave::broadcast",
with = "Unit"
)]
pub unit: crate::modbus::Unit,
#[serde(default = "default_address_offset")]
pub address_offset: i8,
}
#[derive(Serialize, Deserialize)]
#[serde(remote = "tokio_modbus::slave::Slave")]
struct Unit(crate::modbus::UnitId);
fn default_address_offset() -> i8 {
0
}

21
src/modbus/mod.rs 100644
View File

@ -0,0 +1,21 @@
use serde::Serialize;
pub mod config;
#[derive(Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ConnectState {
Connected,
Disconnected,
Errored,
}
#[derive(Serialize)]
pub struct ConnectStatus {
#[serde(flatten)]
pub connect: config::Connect,
pub status: ConnectState,
}
pub type UnitId = tokio_modbus::prelude::SlaveId;
pub type Unit = tokio_modbus::prelude::Slave;