1
0
Fork 0

Updating to MQTT with parsed state working

However, definitely getting some TCP issues to Modbus client, likely due
to unsafe polling of a task or something. If the connection establishes,
it seems rock solid, but sometimes it will get a "could not fill buffer"
error early on and then it just doesn't work.

It seems to come in waves. I think I'll need to handle it specifically
and restart the connection, if I can't identify the root cause.
refactor
Bo Jeanes 2022-09-09 16:12:56 +10:00
parent 31da5cdd4b
commit 2d8612348f
13 changed files with 400 additions and 253 deletions

View File

@ -0,0 +1,19 @@
{
"host": "10.10.10.219",
"unit": 1,
"proto": "winet-s",
"input": [
{
"address": 13000,
"type": "u16",
"name": "running_state",
"period": "3s"
},
{
"address": 13022,
"type": "s16",
"name": "battery_power",
"period": "3s"
}
]
}

View File

@ -8,43 +8,84 @@
"address": 5017, "address": 5017,
"type": "u32", "type": "u32",
"name": "dc_power", "name": "dc_power",
"swap_words": false, "swap_words": true,
"period": "3s" "period": "1s"
},
{
"address": 13034,
"type": "u32",
"name": "active_power",
"swap_words": true,
"period": "1s"
}, },
{ {
"address": 5008, "address": 5008,
"type": "s16", "type": "s16",
"name": "internal_temperature", "name": "internal_temperature",
"period": "1m" "period": "1m",
"scale": -1
}, },
{ {
"address": 13008, "address": 13008,
"type": "s32", "type": "s32",
"name": "load_power", "name": "load_power",
"swap_words": false, "swap_words": true,
"period": "3s" "period": "1s"
}, },
{ {
"address": 13010, "address": 13010,
"type": "s32", "type": "s32",
"name": "export_power", "name": "export_power",
"swap_words": false, "swap_words": true,
"period": "3s" "period": "1s"
}, },
{ {
"address": 13022, "address": 13022,
"name": "battery_power", "name": "battery_power",
"period": "3s" "period": "1s"
}, },
{ {
"address": 13023, "address": 13023,
"name": "battery_level", "name": "battery_level",
"period": "1m" "period": "1m",
"scale": -1
}, },
{ {
"address": 13024, "address": 13024,
"name": "battery_health", "name": "battery_health",
"period": "10m" "period": "10m",
"scale": -1
},
{
"address": 5036,
"name": "grid_frequency",
"period": "1m"
},
{
"address": 5019,
"name": "phase_a_voltage",
"period": "1m"
},
{
"address": 13031,
"name": "phase_a_current",
"period": "1m"
},
{
"address": 5011,
"name": "mppt1_voltage"
},
{
"address": 5012,
"name": "mppt1_current"
},
{
"address": 5012,
"name": "mppt2_voltage"
},
{
"address": 5013,
"name": "mppt2_current"
} }
], ],
"holding": [ "holding": [
@ -57,6 +98,15 @@
"address": 13059, "address": 13059,
"name": "min_soc", "name": "min_soc",
"period": "90s" "period": "90s"
},
{
"address": 13100,
"name": "battery_reserve"
},
{
"address": 33148,
"name": "forced_battery_power",
"scale": 1
} }
] ]
} }

View File

@ -1,6 +1,7 @@
use clap::Parser; use clap::Parser;
use modbus_mqtt::{server, Result}; use modbus_mqtt::{server, Result};
use rumqttc::MqttOptions; use rumqttc::MqttOptions;
use tokio::select;
use url::Url; use url::Url;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@ -50,7 +51,30 @@ async fn main() -> Result<()> {
prefix = options.client_id(); prefix = options.client_id();
} }
server::run(prefix, options, tokio::signal::ctrl_c()).await?; let shutdown = async move {
let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut term = signal(SignalKind::terminate()).unwrap();
let mut int = signal(SignalKind::interrupt()).unwrap();
let mut hup = signal(SignalKind::hangup()).unwrap();
select! {
_ = ctrl_c => {},
_ = term.recv() => {},
_ = int.recv() => {},
_ = hup.recv() => {},
}
}
#[cfg(not(unix))]
ctrl_c.await;
};
server::run(prefix, options, shutdown).await?;
Ok(()) Ok(())
} }

View File

@ -0,0 +1,55 @@
use thiserror::Error;
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Error {
#[error(transparent)]
IOError(#[from] std::io::Error),
#[error(transparent)]
MQTTOptionError(#[from] rumqttc::OptionError),
#[error(transparent)]
MQTTClientError(#[from] rumqttc::ClientError),
#[error(transparent)]
MQTTConnectionError(#[from] rumqttc::ConnectionError),
#[error(transparent)]
InvalidSocketAddr(#[from] std::net::AddrParseError),
#[error(transparent)]
SerialError(#[from] tokio_serial::Error),
#[error(transparent)]
ParseIntError(#[from] std::num::ParseIntError),
#[error(transparent)]
JSONError(#[from] serde_json::Error),
#[error("RecvError")]
RecvError,
#[error("SendError")]
SendError,
#[error("Unrecognised modbus protocol")]
UnrecognisedModbusProtocol,
#[error("{0}")]
Other(std::borrow::Cow<'static, str>),
#[error("Unknown")]
Unknown,
}
impl From<String> for Error {
fn from(s: String) -> Self {
Self::Other(s.into())
}
}
impl From<&'static str> for Error {
fn from(s: &'static str) -> Self {
Self::Other(s.into())
}
}

View File

@ -1,8 +0,0 @@
use tokio::sync::mpsc::Sender;
use crate::{modbus::register::Register, mqtt};
/// Describes the register to Home Assistant
fn configure(_register: Register, _tx: Sender<mqtt::Message>) -> crate::Result<()> {
Ok(())
}

View File

@ -1,147 +1,17 @@
use rumqttc::{self};
use tracing::error;
use thiserror::Error;
mod shutdown; mod shutdown;
pub mod homeassistant;
pub mod modbus; pub mod modbus;
pub mod mqtt; pub mod mqtt;
pub mod server; pub mod server;
#[derive(Error, Debug)] //TODO:
#[non_exhaustive] // pub mod homeassistant;
pub enum Error {
#[error(transparent)]
IOError(#[from] std::io::Error),
#[error(transparent)] mod error;
MQTTOptionError(#[from] rumqttc::OptionError), pub use error::Error;
#[error(transparent)]
MQTTClientError(#[from] rumqttc::ClientError),
#[error(transparent)]
MQTTConnectionError(#[from] rumqttc::ConnectionError),
#[error(transparent)]
InvalidSocketAddr(#[from] std::net::AddrParseError),
#[error(transparent)]
SerialError(#[from] tokio_serial::Error),
#[error("RecvError")]
RecvError,
#[error("SendError")]
SendError,
#[error("Unrecognised modbus protocol")]
UnrecognisedModbusProtocol,
#[error("{0}")]
Other(std::borrow::Cow<'static, str>),
#[error("Unknown")]
Unknown,
}
impl From<String> for Error {
fn from(s: String) -> Self {
Self::Other(s.into())
}
}
impl From<&'static str> for Error {
fn from(s: &'static str) -> Self {
Self::Other(s.into())
}
}
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
// tokio::spawn(async move {
// while let Some(command) = modbus_rx.recv().await {
// }
// });
// use itertools::Itertools;
// for (duration, registers) in &connect.input.into_iter().group_by(|r| r.interval) {
// let registers_prefix = format!("{}/input/{}", topic_prefix, id);
// tokio::spawn(watch_registers(
// ModbusReadType::Input,
// connect.address_offset,
// duration,
// registers.collect(),
// modbus_tx.clone(),
// dispatcher.clone(),
// registers_prefix,
// ));
// }
// for (duration, registers) in &connect.hold.into_iter().group_by(|r| r.interval) {
// let registers_prefix = format!("{}/hold/{}", topic_prefix, id);
// tokio::spawn(watch_registers(
// ModbusReadType::Hold,
// connect.address_offset,
// duration,
// registers.collect(),
// modbus_tx.clone(),
// dispatcher.clone(),
// registers_prefix,
// ));
// }
// }
// Err(err) => {
// dispatcher
// .send(DispatchCommand::Publish {
// topic: format!("{}/status/{}", topic_prefix, id),
// payload: serde_json::to_vec(&json!({
// "status": ConnectState::Errored,
// "error": format!("Invalid config: {}", err),
// }))
// .unwrap(),
// })
// .await
// .unwrap();
// }
// }
// }
// #[tracing::instrument(level = "debug")]
// async fn watch_registers(
// read_type: ModbusReadType,
// address_offset: i8,
// duration: Duration,
// registers: Vec<modbus::config::Register>,
// modbus: mpsc::Sender<ModbusCommand>,
// dispatcher: mpsc::Sender<DispatchCommand>,
// registers_prefix: String,
// ) -> ! {
// let mut interval = tokio::time::interval(duration);
// interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// loop {
// interval.tick().await;
// for r in registers.iter() {
// let address = if address_offset >= 0 {
// r.address.checked_add(address_offset as u16)
// } else {
// r.address.checked_sub(address_offset.unsigned_abs() as u16)
// };
// if let Some(address) = address {
// let size = r.parse.value_type.size();
// debug!(
// name = r.name.as_ref().unwrap_or(&"".to_string()),
// address,
// size,
// register_type = ?read_type,
// value_type = r.parse.value_type.type_name(),
// "Polling register",
// );
// let (tx, rx) = oneshot::channel(); // let (tx, rx) = oneshot::channel();
// modbus // modbus

View File

@ -1,4 +1,6 @@
use crate::modbus::{self}; use super::Word;
use crate::modbus::{self, register};
use crate::mqtt::Scopable;
use crate::Error; use crate::Error;
use rust_decimal::prelude::Zero; use rust_decimal::prelude::Zero;
use serde::Deserialize; use serde::Deserialize;
@ -9,6 +11,8 @@ use tracing::{debug, error, warn};
use crate::{mqtt, shutdown::Shutdown}; use crate::{mqtt, shutdown::Shutdown};
use super::register::RegisterType;
pub(crate) async fn run( pub(crate) async fn run(
config: Config, config: Config,
mqtt: mqtt::Handle, mqtt: mqtt::Handle,
@ -83,20 +87,21 @@ impl Handle {
address: u16, address: u16,
quantity: u8, quantity: u8,
) -> crate::Result<Vec<Word>> { ) -> crate::Result<Vec<Word>> {
self.read_register(ReadType::Input, address, quantity).await self.read_register(RegisterType::Input, address, quantity)
.await
} }
pub async fn read_holding_register( pub async fn read_holding_register(
&self, &self,
address: u16, address: u16,
quantity: u8, quantity: u8,
) -> crate::Result<Vec<Word>> { ) -> crate::Result<Vec<Word>> {
self.read_register(ReadType::Holding, address, quantity) self.read_register(RegisterType::Holding, address, quantity)
.await .await
} }
async fn read_register( async fn read_register(
&self, &self,
reg_type: ReadType, reg_type: RegisterType,
address: u16, address: u16,
quantity: u8, quantity: u8,
) -> crate::Result<Vec<Word>> { ) -> crate::Result<Vec<Word>> {
@ -109,35 +114,45 @@ impl Handle {
} }
} }
type Word = u16;
type Response = oneshot::Sender<crate::Result<Vec<Word>>>; type Response = oneshot::Sender<crate::Result<Vec<Word>>>;
#[derive(Clone, Copy, Debug)]
enum ReadType {
Input,
Holding,
}
#[derive(Debug)] #[derive(Debug)]
enum Command { enum Command {
Read(ReadType, u16, u8, Response), Read(RegisterType, u16, u8, Response),
Write(u16, Vec<Word>, Response), Write(u16, Vec<Word>, Response),
} }
impl Connection { impl Connection {
pub async fn run(mut self) -> crate::Result<()> { pub async fn run(mut self) -> crate::Result<()> {
let mut input_registers = self.mqtt.subscribe("input/+").await?; let mut registers_rx = register::subscribe(&self.mqtt).await?;
let mut holding_registers = self.mqtt.subscribe("holding/+").await?;
// TODO: if we get a new register definition for an existing register, how do we avoid redundant (and possibly
// conflicting) tasks? Should MQTT component only allow one subscriber per topic filter, replacing the old one
// when it gets a new subscribe request?
loop { loop {
select! { select! {
Some(reg) = input_registers.recv() => {},
Some(reg) = holding_registers.recv() => {},
Some(cmd) = self.rx.recv() => { self.process_command(cmd).await; }, Some(cmd) = self.rx.recv() => { self.process_command(cmd).await; },
Some((reg_type, reg)) = registers_rx.recv() => {
debug!(?reg_type, ?reg);
let scope = format!(
"{}/{}",
match &reg_type {
RegisterType::Input => "input",
RegisterType::Holding => "holding",
},
reg.address
);
let mqtt = self.mqtt.scoped(scope);
let modbus = self.handle();
register::Monitor::new(
reg.register,
reg_type,
reg.address,
mqtt,
modbus,
)
.run()
.await;
},
_ = self.shutdown.recv() => { _ = self.shutdown.recv() => {
return Ok(()); return Ok(());
} }
@ -151,6 +166,11 @@ impl Connection {
} }
} }
// TODO: if we get a new register definition for an existing register, how do we avoid redundant (and possibly
// conflicting) tasks? Should MQTT component only allow one subscriber per topic filter, replacing the old one
// when it gets a new subscribe request?
// IDEA: Allow providing a subscription ID which _replaces_ any existing subscription with the same ID
/// Apply address offset to address. /// Apply address offset to address.
/// ///
/// Panics if offset would overflow or underflow the address. /// Panics if offset would overflow or underflow the address.
@ -175,12 +195,11 @@ impl Connection {
} }
} }
#[tracing::instrument(skip(self))]
async fn process_command(&mut self, cmd: Command) { async fn process_command(&mut self, cmd: Command) {
use tokio_modbus::prelude::Reader; use tokio_modbus::prelude::Reader;
let (tx, response) = match cmd { let (tx, response) = match cmd {
Command::Read(ReadType::Input, address, count, tx) => { Command::Read(RegisterType::Input, address, count, tx) => {
let address = self.adjust_address(address); let address = self.adjust_address(address);
( (
tx, tx,
@ -189,7 +208,7 @@ impl Connection {
.await, .await,
) )
} }
Command::Read(ReadType::Holding, address, count, tx) => { Command::Read(RegisterType::Holding, address, count, tx) => {
let address = self.adjust_address(address); let address = self.adjust_address(address);
( (
tx, tx,
@ -224,8 +243,17 @@ impl Connection {
// Os { code: 36, kind: Uncategorized, message: "Operation now in progress" }' // Os { code: 36, kind: Uncategorized, message: "Operation now in progress" }'
// Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" } // Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" }
// //
if let Err(error) = &response { match &response {
warn!(?error, "modbus command error"); Err(error) => match error.kind() {
std::io::ErrorKind::UnexpectedEof => {
// THIS happening feels like a bug either in how I am using tokio_modbus or in tokio_modbus. It seems
// like the underlying buffers get all messed up and restarting doesn't always fix it unless I wait a
// few seconds. I might need to get help from someone to figure it out.
error!(?error, "Connection error, may not be recoverable");
}
_ => error!(?error),
},
_ => {}
} }
// This probably just means that the register task died or is no longer monitoring the response. // This probably just means that the register task died or is no longer monitoring the response.

View File

@ -92,24 +92,24 @@ async fn connect(config: Config<'_>, mqtt: mqtt::Handle, shutdown: Shutdown) ->
holding, holding,
} = config; } = config;
let connection_handler = connection::run(settings, mqtt.clone(), shutdown).await?; let _ = connection::run(settings, mqtt.clone(), shutdown).await?;
// TODO: consider waiting 1 second before sending the registers to MQTT, to ensure that the connection is listening. // TODO: consider waiting 1 second before sending the registers to MQTT, to ensure that the connection is listening.
for reg in input { for (reg_type, registers) in [("holding", holding), ("input", input)] {
let mqtt = mqtt.scoped("input"); let mqtt = mqtt.scoped(reg_type);
if let Ok(r) = serde_json::from_slice::<register::AddressedRegister>(reg.get().as_bytes()) { for reg in registers {
debug!(?r); if let Ok(r) =
let bytes: bytes::Bytes = reg.get().as_bytes().to_owned().into(); serde_json::from_slice::<register::AddressedRegister>(reg.get().as_bytes())
mqtt.publish(r.address.to_string(), bytes).await?; {
} let json = serde_json::to_vec(&r.register).unwrap(); // unwrap() should be fine because we JUST deserialized it successfully
} mqtt.publish(r.address.to_string(), json).await?;
for reg in holding { // if let Some(name) = r.register.name {
let mqtt = mqtt.scoped("holding"); // r.register.name = None;
if let Ok(r) = serde_json::from_slice::<register::AddressedRegister>(reg.get().as_bytes()) { // let json = serde_json::to_vec(&r).unwrap(); // unwrap() should be fine because we JUST deserialized it successfully
debug!(?r); // mqtt.publish(name, json).await?;
let bytes: bytes::Bytes = reg.get().as_bytes().to_owned().into(); // }
mqtt.publish(r.address.to_string(), bytes).await?; }
} }
} }

View File

@ -1,19 +1,10 @@
use rust_decimal::{prelude::FromPrimitive, Decimal};
use serde::Serialize;
use self::register::{Register, RegisterValueType};
pub mod connection; pub mod connection;
pub mod connector; pub mod connector;
pub mod register; pub mod register;
#[derive(Serialize)] pub use connection::Handle;
#[serde(rename_all = "lowercase")]
pub enum ConnectState { type Word = u16;
Connected,
Disconnected,
Errored,
}
pub type UnitId = tokio_modbus::prelude::SlaveId; pub type UnitId = tokio_modbus::prelude::SlaveId;
pub type Unit = tokio_modbus::prelude::Slave; pub type Unit = tokio_modbus::prelude::Slave;

View File

@ -1,5 +1,130 @@
use super::Word;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ops::Add, time::Duration}; use std::time::Duration;
use tokio::{
select,
sync::mpsc,
time::{interval, MissedTickBehavior},
};
use tracing::{debug, warn};
pub struct Monitor {
mqtt: mqtt::Handle,
modbus: super::Handle,
address: u16,
register: Register,
register_type: RegisterType,
}
impl Monitor {
pub fn new(
register: Register,
register_type: RegisterType,
address: u16,
mqtt: mqtt::Handle,
modbus: super::Handle,
) -> Monitor {
Monitor {
mqtt,
register_type,
register,
address,
modbus,
}
}
pub async fn run(self) {
tokio::spawn(async move {
let mut interval = interval(self.register.interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
interval.tick().await;
if let Ok(words) = self.read().await {
debug!(address=%self.address, "type"=?self.register_type, ?words);
#[cfg(debug_assertions)]
self.mqtt
.publish("raw", serde_json::to_vec(&words).unwrap())
.await
.unwrap();
let value = self
.register
.parse_words(&self.register.apply_swaps(&words));
self.mqtt
.publish("state", serde_json::to_vec(&value).unwrap())
.await
.unwrap();
}
}
});
}
async fn read(&self) -> crate::Result<Vec<Word>> {
match self.register_type {
RegisterType::Input => {
self.modbus
.read_input_register(self.address, self.register.size())
.await
}
RegisterType::Holding => {
self.modbus
.read_holding_register(self.address, self.register.size())
.await
}
}
}
}
pub(crate) async fn subscribe(
mqtt: &mqtt::Handle,
) -> crate::Result<mpsc::Receiver<(RegisterType, AddressedRegister)>> {
let (tx, rx) = mpsc::channel(8);
let mut input_registers = mqtt.subscribe("input/+").await?;
let mut holding_registers = mqtt.subscribe("holding/+").await?;
tokio::spawn(async move {
fn to_register(payload: &Payload) -> crate::Result<AddressedRegister> {
let Payload { bytes, topic } = payload;
let address = topic
.rsplit("/")
.next()
.expect("subscribed topic guarantees we have a last segment")
.parse()?;
Ok(AddressedRegister {
address,
register: serde_json::from_slice(&bytes)?,
})
}
loop {
select! {
Some(ref payload) = input_registers.recv() => {
match to_register(payload) {
Ok(register) => if let Err(_) = tx.send((RegisterType::Input, register)).await { break; },
Err(error) => warn!(?error, def=?payload.bytes, "ignoring invalid input register definition"),
}
},
Some(ref payload) = holding_registers.recv() => {
match to_register(payload) {
Ok(register) => if let Err(_) = tx.send((RegisterType::Holding, register)).await { break; },
Err(error) => warn!(?error, def=?payload.bytes, "ignoring invalid holding register definition"),
}
}
}
}
});
Ok(rx)
}
#[derive(Clone, Copy, Debug)]
pub enum RegisterType {
Input,
Holding,
}
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase", default)] #[serde(rename_all = "lowercase", default)]
@ -302,7 +427,7 @@ impl RegisterValueType {
use serde_json::json; use serde_json::json;
use RegisterValueType as T; use RegisterValueType as T;
let bytes: Vec<u8> = words.iter().flat_map(|v| v.to_ne_bytes()).collect(); let bytes: Vec<u8> = words.iter().flat_map(|v| v.to_be_bytes()).collect();
match *self { match *self {
T::Numeric { ref of, ref adjust } => { T::Numeric { ref of, ref adjust } => {
@ -314,44 +439,44 @@ impl RegisterValueType {
N::U32 => { N::U32 => {
json!(bytes json!(bytes
.try_into() .try_into()
.map(|bytes| scale * Decimal::from(u32::from_le_bytes(bytes)) + offset) .map(|bytes| scale * Decimal::from(u32::from_be_bytes(bytes)) + offset)
.ok()) .ok())
} }
N::U64 => { N::U64 => {
json!(bytes json!(bytes
.try_into() .try_into()
.map(|bytes| scale * Decimal::from(u64::from_le_bytes(bytes)) + offset) .map(|bytes| scale * Decimal::from(u64::from_be_bytes(bytes)) + offset)
.ok()) .ok())
} }
N::I8 => { N::I8 => {
json!(vec![bytes[1]] json!(vec![bytes[1]]
.try_into() .try_into()
.map(|bytes| scale * Decimal::from(i8::from_le_bytes(bytes)) + offset) .map(|bytes| scale * Decimal::from(i8::from_be_bytes(bytes)) + offset)
.ok()) .ok())
} }
N::I16 => { N::I16 => {
json!(bytes json!(bytes
.try_into() .try_into()
.map(|bytes| scale * Decimal::from(i16::from_le_bytes(bytes)) + offset) .map(|bytes| scale * Decimal::from(i16::from_be_bytes(bytes)) + offset)
.ok()) .ok())
} }
N::I32 => { N::I32 => {
json!(bytes json!(bytes
.try_into() .try_into()
.map(|bytes| scale * Decimal::from(i32::from_le_bytes(bytes)) + offset) .map(|bytes| scale * Decimal::from(i32::from_be_bytes(bytes)) + offset)
.ok()) .ok())
} }
N::I64 => { N::I64 => {
json!(bytes json!(bytes
.try_into() .try_into()
.map(|bytes| scale * Decimal::from(i64::from_le_bytes(bytes)) + offset) .map(|bytes| scale * Decimal::from(i64::from_be_bytes(bytes)) + offset)
.ok()) .ok())
} }
N::F32 => { N::F32 => {
json!(bytes json!(bytes
.try_into() .try_into()
.map(|bytes| scale .map(|bytes| scale
* Decimal::from_f32(f32::from_le_bytes(bytes)).unwrap() * Decimal::from_f32(f32::from_be_bytes(bytes)).unwrap()
+ offset) + offset)
.ok()) .ok())
} }
@ -359,7 +484,7 @@ impl RegisterValueType {
json!(bytes json!(bytes
.try_into() .try_into()
.map(|bytes| scale .map(|bytes| scale
* Decimal::from_f64(f64::from_le_bytes(bytes)).unwrap() * Decimal::from_f64(f64::from_be_bytes(bytes)).unwrap()
+ offset) + offset)
.ok()) .ok())
} }
@ -374,6 +499,10 @@ impl RegisterValueType {
} }
impl Register { impl Register {
pub fn size(&self) -> u8 {
self.parse.value_type.size()
}
pub fn parse_words(&self, words: &[u16]) -> serde_json::Value { pub fn parse_words(&self, words: &[u16]) -> serde_json::Value {
self.parse.value_type.parse_words(words) self.parse.value_type.parse_words(words)
} }
@ -397,28 +526,27 @@ impl Register {
} }
#[cfg(test)] #[cfg(test)]
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use crate::mqtt::{self, Payload};
#[test] #[test]
fn test_parse_1() { fn test_parse_1() {
use serde_json::json; use serde_json::json;
let reg = AddressedRegister { Register {
address: 42, name: None,
register: Register { interval: Default::default(),
name: None, parse: RegisterParse {
interval: Default::default(), swap_bytes: Swap(false),
parse: RegisterParse { swap_words: Swap(false),
swap_bytes: Swap(false), value_type: RegisterValueType::Numeric {
swap_words: Swap(false), of: RegisterNumeric::I32,
value_type: RegisterValueType::Numeric { adjust: RegisterNumericAdjustment {
of: RegisterNumeric::I32, scale: 0,
adjust: RegisterNumericAdjustment { offset: 0,
scale: 0,
offset: 0,
},
}, },
}, },
}, },
}; };
assert_eq!(reg.register.parse_words(&[843, 0]), json!(843)); assert_eq!(reg.parse_words(&[843, 0]), json!(843));
} }

View File

@ -1,9 +1,9 @@
use std::{collections::HashMap, future::Future}; use std::collections::HashMap;
use bytes::Bytes; use bytes::Bytes;
use rumqttc::{ use rumqttc::{
mqttbytes::matches as matches_topic, mqttbytes::valid_topic, AsyncClient, Event, EventLoop, mqttbytes::matches as matches_topic, AsyncClient, Event, EventLoop, MqttOptions, Publish,
MqttOptions, Publish, Subscribe, SubscribeFilter, Subscribe, SubscribeFilter,
}; };
use tokio::{ use tokio::{
select, select,
@ -70,18 +70,6 @@ impl Connection {
Ok(()) Ok(())
} }
/// Create a handle for interacting with the MQTT server such that a pre-provided prefix is transparently added to
/// all relevant commands which use a topic.
pub fn prefixed_handle<S: Into<String> + Send>(&self, prefix: S) -> crate::Result<Handle> {
let prefix = prefix.into();
if !valid_topic(&prefix) {
return Err("Prefix is not a valid topic".into());
}
Ok(self.handle().scoped(prefix))
}
pub fn handle(&self) -> Handle { pub fn handle(&self) -> Handle {
Handle { Handle {
prefix: None, prefix: None,
@ -185,6 +173,10 @@ pub struct Handle {
tx: Sender<Message>, tx: Sender<Message>,
} }
// IDEA: make subscribe+publish _generic_ over the payload type, as long as it implements a Payload trait we define,
// which allows them to perform the serialization/deserialization to Bytes. For most domain types, the trait would be
// implemented to use serde_json but for Bytes and Vec<u8> it would just return itself.
// The return values may need to be crate::Result<Receiver<Option<T>> or crate::Result<Receiver<crate::Result<T>>>.
impl Handle { impl Handle {
pub async fn subscribe<S: Into<String>>(&self, topic: S) -> crate::Result<Receiver<Payload>> { pub async fn subscribe<S: Into<String>>(&self, topic: S) -> crate::Result<Receiver<Payload>> {
let (tx_bytes, rx) = mpsc::channel(8); let (tx_bytes, rx) = mpsc::channel(8);

View File

@ -4,12 +4,9 @@ use crate::{
}; };
use rumqttc::MqttOptions; use rumqttc::MqttOptions;
use std::{future::Future, time::Duration}; use std::future::Future;
use tokio::{ use tokio::sync::{broadcast, mpsc};
sync::{broadcast, mpsc}, use tracing::error;
time::timeout,
};
use tracing::{error, info};
pub async fn run<P: Into<String> + Send>( pub async fn run<P: Into<String> + Send>(
prefix: P, prefix: P,

View File

@ -344,6 +344,7 @@ struct Device {
dev_type: u8, dev_type: u8,
// unit/slave ID // unit/slave ID
#[allow(dead_code)]
#[serde(deserialize_with = "serde_aux::prelude::deserialize_number_from_string")] #[serde(deserialize_with = "serde_aux::prelude::deserialize_number_from_string")]
phys_addr: u8, phys_addr: u8,
// UNUSED: // UNUSED:
@ -397,7 +398,7 @@ fn test_deserialize_device() {
enum WebSocketMessage { enum WebSocketMessage {
Connect { token: String }, Connect { token: String },
DeviceList { list: Vec<Device> }, // DeviceList { list: Vec<Device> },
// Not yet used: // Not yet used:
// State, // system state // State, // system state
@ -414,7 +415,7 @@ enum WebSocketMessage {
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct ResultList<T> { struct ResultList<T> {
count: u16, // count: u16,
#[serde(rename = "list")] #[serde(rename = "list")]
items: Vec<T>, items: Vec<T>,
} }