1
0
Fork 0

Inline register type to Register struct

gh-action
Bo Jeanes 2022-09-09 19:23:05 +10:00
parent bb715f30b5
commit 120f62f0c7
7 changed files with 108 additions and 117 deletions

View File

@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning].
## [Unreleased]
- ...
### Deprecated
- Separate `holding` and `input` sections, in favour of specifying `register_type` field on the register definition to either `"input"` (default) or `"holding"`.
## [0.2.0] - 2022-09-09

View File

@ -103,43 +103,36 @@ Post to `$MODBUS_MQTT_TOPIC/$CONNECTION_ID/$TYPE/$ADDRESS` where `$TYPE` is one
```jsonc
{
"name": null, // OPTIONAL - gives the register a name which is used in the register MQTT topics (must be a valid topic component)
"address": 5123, // REQUIRED
"interval": "1m", // OPTIONAL - how often to update the registers value to MQTT
// e.g.: 3s (every 3 seconds)
// 2m (every 2 minutes)
// 1h (every 1 hour)
"register_type": "input", // OPTIONAL
"swap_bytes": false, // OPTIONAL
"swap_words": false, // OPTIONAL
"name": null, // OPTIONAL - gives the register a name which is used in the register MQTT topics (must be a valid topic component)
"type": "s16", // OPTIONAL
// valid: s8, s16, s32, s64 (signed)
// u8, u16, u32, u64 (unsigned)
// f32, f64 (floating point)
"interval": "1m", // OPTIONAL - how often to update the registers value to MQTT
// e.g.: 3s (every 3 seconds)
// 2m (every 2 minutes)
// 1h (every 1 hour)
"scale": 0, // OPTIONAL - number in register will be multiplied by 10^(scale)
// e.g.: to turn kW into W, you would provide scale=3
// to turn W into kW, you would provide scale=-3
"swap_bytes": false, // OPTIONAL
"swap_words": false, // OPTIONAL
"offset": 0, // OPTIONAL - will be added to the final result (AFTER scaling)
"type": "s16", // OPTIONAL
// valid: s8, s16, s32, s64 (signed)
// u8, u16, u32, u64 (unsigned)
// f32, f64 (floating point)
"scale": 0, // OPTIONAL - number in register will be multiplied by 10^(scale)
// e.g.: to turn kW into W, you would provide scale=3
// to turn W into kW, you would provide scale=-3
// Additionally, "type" can be set to "array":
"type": "array",
"of": "u16" // The default array element is u16, but you can change it with the `of` field
"offset": 0, // OPTIONAL - will be added to the final result (AFTER scaling)
}
```
Further, the `type` field can additionally be set to `"array"`, in which case, a `count` field must be provided. The array elements default to `"s16"` but can be overriden in the `"of"` field.
NOTE: this is likely to change such that there is always a `count` field (with default of 1) and if provided to be greater than 1, it will be interpreted to be an array of elements of the `type` specified.
There is some code to accept `"string"` type (with a required `length` field) but this is experimental and untested.
##### Register shorthand
When issuing the `connect` payload, you can optionally include `input` and/or `holding` fields as arrays containing the above register schema, as long as an `address` field is added. When present, these payloads will be replayed to the MQTT server as if the user had specified each register separately, as above.
When issuing the `connect` payload, you can optionally include a top-level `registers` array, containing the above register schema. When present, these payloads will be replayed to the MQTT server as if the user had specified each register separately, as above.
This is a recommended way to specify connections, but the registers are broken out separately so that they can be dynamically added to too.

View File

@ -2,7 +2,7 @@
"host": "10.10.10.219",
"unit": 1,
"proto": "winet-s",
"input": [
"registers": [
{
"address": 13000,
"type": "u16",

View File

@ -3,7 +3,7 @@
"unit": 1,
"proto": "tcp",
"address_offset": -1,
"input": [
"registers": [
{
"address": 5017,
"type": "u32",
@ -87,25 +87,27 @@
"address": 5013,
"name": "mppt2_current"
}
],
"holding": [
{
"register_type": "holding",
"address": 13058,
"name": "max_soc",
"period": "90s",
"scale": -1
},
{
"register_type": "holding",
"address": 13059,
"name": "min_soc",
"period": "90s",
"scale": -1
},
{
"register_type": "holding",
"address": 13100,
"name": "battery_reserve"
},
{
"register_type": "holding",
"address": 33148,
"name": "forced_battery_power",
"scale": 1

View File

@ -130,22 +130,12 @@ impl Connection {
select! {
Some(cmd) = self.rx.recv() => { self.process_command(cmd).await; },
Some((reg_type, reg)) = registers_rx.recv() => {
debug!(?reg_type, ?reg);
let scope = format!(
"{}/{}",
match &reg_type {
RegisterType::Input => "input",
RegisterType::Holding => "holding",
},
reg.address
);
let mqtt = self.mqtt.scoped(scope);
Some(register) = registers_rx.recv() => {
debug!(?register);
let mqtt = self.mqtt.scoped("registers");
let modbus = self.handle();
register::Monitor::new(
reg.register,
reg_type,
reg.address,
register,
mqtt,
modbus,
)

View File

@ -2,7 +2,7 @@ use crate::modbus::{connection, register};
use crate::mqtt::{Payload, Scopable};
use crate::{mqtt, shutdown::Shutdown};
use serde::Deserialize;
use serde_json::value::RawValue as RawJSON;
use serde_json::value::Value as JSON;
use tokio::select;
use tracing::{debug, error, info};
@ -81,34 +81,45 @@ async fn parse_and_connect(
}
Ok(())
}
async fn connect(config: Config<'_>, mqtt: mqtt::Handle, shutdown: Shutdown) -> crate::Result<()> {
async fn connect(config: Config, mqtt: mqtt::Handle, shutdown: Shutdown) -> crate::Result<()> {
if shutdown.is_shutdown() {
return Ok(());
}
#[allow(deprecated)]
let Config {
connection: settings,
input,
holding,
registers,
} = config;
let _ = connection::run(settings, mqtt.clone(), shutdown).await?;
// TODO: consider waiting 1 second before sending the registers to MQTT, to ensure that the connection is listening.
for (reg_type, registers) in [("holding", holding), ("input", input)] {
let mqtt = mqtt.scoped(reg_type);
enum Type {
Holding,
Input,
Unchanged,
}
for (reg_type, registers) in [
(Type::Holding, holding),
(Type::Input, input),
(Type::Unchanged, registers),
] {
use register::*;
let mqtt = mqtt.scoped("registers");
for reg in registers {
if let Ok(r) =
serde_json::from_slice::<register::AddressedRegister>(reg.get().as_bytes())
{
let json = serde_json::to_vec(&r.register).unwrap(); // unwrap() should be fine because we JUST deserialized it successfully
mqtt.publish(r.address.to_string(), json).await?;
// if let Some(name) = r.register.name {
// r.register.name = None;
// let json = serde_json::to_vec(&r).unwrap(); // unwrap() should be fine because we JUST deserialized it successfully
// mqtt.publish(name, json).await?;
// }
if let Ok(mut reg) = serde_json::from_value::<Register>(reg) {
reg.register_type = match reg_type {
Type::Holding => RegisterType::Holding,
Type::Input => RegisterType::Input,
Type::Unchanged => reg.register_type,
};
let json = serde_json::to_vec(&reg).unwrap(); // unwrap() should be fine because we JUST deserialized it successfully
mqtt.publish(format!("{}/config", reg.path()), json).await?;
}
}
}
@ -119,15 +130,22 @@ async fn connect(config: Config<'_>, mqtt: mqtt::Handle, shutdown: Shutdown) ->
/// Wrapper around `modbus::connection::Config` that can include some registers inline, which the connector will
/// re-publish to the appropriate topic once the connection is established.
#[derive(Debug, Deserialize)]
struct Config<'a> {
struct Config {
#[serde(flatten)]
connection: connection::Config,
// Allow registers to be defined inline, but capture them as raw JSON so that if they have incorrect schema, we can
// still establish the Modbus connection. Valid registers will be re-emitted as individual register configs to MQTT,
// to be picked up by the connection.
#[serde(default, borrow)]
pub input: Vec<&'a RawJSON>,
#[serde(alias = "hold", default, borrow)]
pub holding: Vec<&'a RawJSON>,
#[deprecated]
#[serde(default)]
input: Vec<JSON>,
#[deprecated]
#[serde(alias = "hold", default)]
holding: Vec<JSON>,
#[deprecated]
#[serde(default)]
registers: Vec<JSON>,
}

View File

@ -1,8 +1,8 @@
use super::Word;
use crate::mqtt::{self, Payload, Scopable};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::{
select,
sync::mpsc,
time::{interval, MissedTickBehavior},
};
@ -11,25 +11,15 @@ use tracing::{debug, warn};
pub struct Monitor {
mqtt: mqtt::Handle,
modbus: super::Handle,
address: u16,
register: Register,
register_type: RegisterType,
}
impl Monitor {
pub fn new(
register: Register,
register_type: RegisterType,
address: u16,
mqtt: mqtt::Handle,
modbus: super::Handle,
) -> Monitor {
pub fn new(register: Register, mqtt: mqtt::Handle, modbus: super::Handle) -> Monitor {
Monitor {
mqtt,
register_type,
register,
address,
mqtt: mqtt.scoped(register.path()),
modbus,
register,
}
}
@ -41,9 +31,9 @@ impl Monitor {
loop {
interval.tick().await;
if let Ok(words) = self.read().await {
debug!(address=%self.address, "type"=?self.register_type, ?words);
debug!(address=%self.register.address, "type"=?self.register.register_type, ?words);
#[cfg(debug_assertions)]
#[cfg(feature = "raw")]
self.mqtt
.publish("raw", serde_json::to_vec(&words).unwrap())
.await
@ -61,54 +51,41 @@ impl Monitor {
}
async fn read(&self) -> crate::Result<Vec<Word>> {
match self.register_type {
let Self { ref register, .. } = self;
match register.register_type {
RegisterType::Input => {
self.modbus
.read_input_register(self.address, self.register.size())
.read_input_register(register.address, register.size())
.await
}
RegisterType::Holding => {
self.modbus
.read_holding_register(self.address, self.register.size())
.read_holding_register(register.address, register.size())
.await
}
}
}
}
pub(crate) async fn subscribe(
mqtt: &mqtt::Handle,
) -> crate::Result<mpsc::Receiver<(RegisterType, AddressedRegister)>> {
pub(crate) async fn subscribe(mqtt: &mqtt::Handle) -> crate::Result<mpsc::Receiver<Register>> {
let (tx, rx) = mpsc::channel(8);
let mut input_registers = mqtt.subscribe("input/+").await?;
let mut holding_registers = mqtt.subscribe("holding/+").await?;
let mut registers = mqtt.subscribe("registers/+/config").await?;
tokio::spawn(async move {
fn to_register(payload: &Payload) -> crate::Result<AddressedRegister> {
let Payload { bytes, topic } = payload;
let address = topic
.rsplit('/')
.next()
.expect("subscribed topic guarantees we have a last segment")
.parse()?;
Ok(AddressedRegister {
address,
register: serde_json::from_slice(bytes)?,
})
fn to_register(payload: &Payload) -> crate::Result<Register> {
Ok(serde_json::from_slice(&payload.bytes)?)
}
loop {
select! {
Some(ref payload) = input_registers.recv() => {
match to_register(payload) {
Ok(register) => if (tx.send((RegisterType::Input, register)).await).is_err() { break; },
Err(error) => warn!(?error, def=?payload.bytes, "ignoring invalid input register definition"),
if let Some(ref payload) = registers.recv().await {
match to_register(payload) {
Ok(register) => {
if (tx.send(register).await).is_err() {
break;
}
}
},
Some(ref payload) = holding_registers.recv() => {
match to_register(payload) {
Ok(register) => if (tx.send((RegisterType::Holding, register)).await).is_err() { break; },
Err(error) => warn!(?error, def=?payload.bytes, "ignoring invalid holding register definition"),
Err(error) => {
warn!(?error, def=?payload.bytes, "ignoring invalid input register definition")
}
}
}
@ -118,8 +95,10 @@ pub(crate) async fn subscribe(
Ok(rx)
}
#[derive(Clone, Copy, Debug)]
#[derive(Deserialize, Serialize, PartialEq, Default, Clone, Copy, Debug)]
#[serde(rename_all = "lowercase")]
pub enum RegisterType {
#[default]
Input,
Holding,
}
@ -280,6 +259,11 @@ pub struct Register {
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
pub address: u16,
#[serde(default, skip_serializing_if = "IsDefault::is_default")]
pub register_type: RegisterType,
#[serde(flatten, default, skip_serializing_if = "IsDefault::is_default")]
pub parse: RegisterParse,
@ -291,13 +275,6 @@ pub struct Register {
)]
pub interval: Duration,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AddressedRegister {
pub address: u16,
#[serde(flatten)]
pub register: Register,
}
fn default_register_interval() -> Duration {
Duration::from_secs(60)
@ -501,6 +478,14 @@ impl Register {
self.parse.value_type.size()
}
pub fn path(&self) -> String {
if let Some(ref name) = self.name {
name.clone()
} else {
self.address.to_string()
}
}
pub fn parse_words(&self, words: &[u16]) -> serde_json::Value {
self.parse.value_type.parse_words(&self.apply_swaps(words))
}
@ -525,12 +510,13 @@ impl Register {
#[cfg(test)]
use pretty_assertions::assert_eq;
use crate::mqtt::{self, Payload};
#[test]
fn test_parse_1() {
use serde_json::json;
let reg = Register {
register_type: RegisterType::Input,
address: 42,
name: None,
interval: Default::default(),
parse: RegisterParse {