From ba09d6a76a9f445474bc78d6de734a180edf3b79 Mon Sep 17 00:00:00 2001 From: Bo Jeanes Date: Sun, 17 Jul 2022 18:14:55 +1000 Subject: [PATCH] Awkwardly put into async tasks O_o --- Cargo.lock | 1 + Cargo.toml | 1 + src/main.rs | 318 +++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 240 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 80d2930..ba6077f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -439,6 +439,7 @@ dependencies = [ name = "modbus-mqtt" version = "0.1.0" dependencies = [ + "bytes", "clap", "rumqttc", "serde", diff --git a/Cargo.toml b/Cargo.toml index 4593a20..2a06ad4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bytes = "1.1.0" clap = { version = "3.2.12", features = ["derive", "env"] } rumqttc = { version = "0.13.0", features = ["url"], git = "https://github.com/bytebeamio/rumqtt" } serde = { version = "1.0.139", features = ["serde_derive"] } diff --git a/src/main.rs b/src/main.rs index 0f9c41a..3053f61 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use rumqttc::{self, AsyncClient, Event, Incoming, LastWill, MqttOptions, Publish use serde::{Deserialize, Serialize}; use serde_json::json; use std::time::Duration; +use tokio::sync::mpsc; use tokio_modbus::prelude::*; use clap::Parser; @@ -36,9 +37,15 @@ enum ModbusProto { #[serde(default = "default_modbus_port")] port: u16, }, + #[serde(rename_all = "lowercase")] Rtu { + // tty: std::path::PathBuf, tty: String, baud_rate: u32, + // data_bits: tokio_serial::DataBits, // TODO: allow this to be represented as a number instead of string + // stop_bits: tokio_serial::StopBits, // TODO: allow this to be represented as a number instead of string + // flow_control: tokio_se&rial::FlowControl, + // parity: tokio_serial::Parity, }, } @@ -46,18 +53,71 @@ fn default_modbus_port() -> u16 { 502 } +#[derive(Serialize, Deserialize)] +struct Range { + address: u16, + size: u16, +} + +// TODO: `scale`, `offset`, `precision` +enum RegisterValueType { + U8, + U16, + U32, + U64, + I8, + I16, + I32, + I64, + F32, + F64, + String, +} + +#[derive(Serialize, Deserialize)] +struct RegisterParse { + #[serde(default = "default_swap")] + swap_bytes: bool, + + #[serde(default = "default_swap")] + swap_words: bool, +} + +fn default_swap() -> bool { + false +} + +#[derive(Serialize, Deserialize)] +struct Register { + #[serde(flatten)] + range: Range, + + #[serde(skip_serializing_if = "Option::is_none")] + name: Option, + + parse: Option, +} + #[derive(Serialize, Deserialize)] struct Connect { #[serde(flatten)] settings: ModbusProto, + // input_ranges: Vec, + // hold_ranges: Vec, #[serde(default = "default_modbus_unit")] slave: u8, // TODO make `Slave` but need custom deserializer I think + + #[serde(default = "default_address_offset")] + address_offset: i8, } fn default_modbus_unit() -> u8 { 0 } +fn default_address_offset() -> i8 { + 0 +} #[derive(Serialize)] #[serde(rename_all = "lowercase")] @@ -85,13 +145,52 @@ enum MainStatus { async fn main() { let args = Cli::parse(); - let mut mqttoptions = MqttOptions::new("mqtt", args.mqtt_host.as_str(), args.mqtt_port); - if let (Some(u), Some(p)) = (args.mqtt_user, args.mqtt_password) { - mqttoptions.set_credentials(u, p); - } - mqttoptions.set_keep_alive(Duration::from_secs(5)); - mqttoptions.set_last_will(LastWill { - topic: format!("{}/status", args.mqtt_topic_prefix).to_string(), + let (registry_tx, mut registry_rx) = mpsc::channel::(32); + let (dispatcher_tx, mut dispatcher_rx) = mpsc::channel::(32); + + // Modbus connection registry + let registry_handle = { + let prefix = args.mqtt_topic_prefix.clone(); + tokio::spawn(async move { connection_registry(prefix, dispatcher_tx, registry_rx).await }) + }; + + // MQTT Dispatcher + let dispatcher_handle = { + let prefix = args.mqtt_topic_prefix.clone(); + let mut options = MqttOptions::new( + env!("CARGO_PKG_NAME"), + args.mqtt_host.as_str(), + args.mqtt_port, + ); + if let (Some(u), Some(p)) = (args.mqtt_user, args.mqtt_password) { + options.set_credentials(u, p); + } + options.set_keep_alive(Duration::from_secs(5)); // TODO: make this configurable + + tokio::spawn(async move { + mqtt_dispatcher(options, prefix, registry_tx, dispatcher_rx).await; + }) + }; + + registry_handle.await.unwrap(); + dispatcher_handle.await.unwrap(); +} + +#[derive(Debug)] +enum DispatchCommand { + Publish { topic: String, payload: Vec }, +} + +async fn mqtt_dispatcher( + mut options: MqttOptions, + prefix: String, + registry: mpsc::Sender, + mut rx: mpsc::Receiver, +) { + println!("Connecting to MQTT broker..."); + + options.set_last_will(LastWill { + topic: format!("{}/status", prefix).to_string(), message: serde_json::to_vec(&json!({ "status": MainStatus::Stopped, })) @@ -101,19 +200,11 @@ async fn main() { retain: false, }); - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - - client - .subscribe( - format!("{}/connect/#", args.mqtt_topic_prefix), - QoS::AtMostOnce, - ) - .await - .unwrap(); + let (client, mut eventloop) = AsyncClient::new(options, 10); client .publish( - format!("{}/status", args.mqtt_topic_prefix).to_string(), + format!("{}/status", prefix).to_string(), QoS::AtMostOnce, false, serde_json::to_vec(&json!({ @@ -124,71 +215,48 @@ async fn main() { .await .unwrap(); - while let Ok(event) = eventloop.poll().await { - match event { - Event::Outgoing(_) => (), - Event::Incoming(Incoming::ConnAck(_)) => println!("Connected to MQTT!"), - Event::Incoming(Incoming::PingResp | Incoming::SubAck(_)) => (), + client + .subscribe(format!("{}/connect/#", prefix), QoS::AtMostOnce) + .await + .unwrap(); + + let rx_loop_handler = { + let client = client.clone(); + tokio::spawn(async move { + println!("Start dispatcher rx loop"); + while let Some(command) = rx.recv().await { + match command { + DispatchCommand::Publish { topic, payload } => { + client + .publish(topic, QoS::AtMostOnce, false, payload) + .await + .unwrap(); + } + } + } + }) + }; + + while let Ok(event) = eventloop.poll().await { + use Event::{Incoming as In, Outgoing as Out}; + + match event { + Out(_) => (), + In(Incoming::ConnAck(_)) => println!("Connected to MQTT!"), + In(Incoming::PingResp | Incoming::SubAck(_)) => (), + + In(Incoming::Publish(Publish { topic, payload, .. })) => { + println!("{} -> {:?}", &topic, &payload); - Event::Incoming(Incoming::Publish(Publish { topic, payload, .. })) => { - println!("{} {:?}", &topic, &payload); match topic.split('/').collect::>()[..] { - [prefix, "connect", conn_name] if prefix == args.mqtt_topic_prefix.as_str() => { - match serde_json::from_slice::(&payload) { - Ok(connect) => { - let slave = Slave(connect.slave); - // println!("{:?}", connect); - let status = match connect.settings { - ModbusProto::Tcp { ref host, port } => { - let socket_addr = - format!("{}:{}", host, port).parse().unwrap(); - let mut modbus = - tcp::connect_slave(socket_addr, slave).await.unwrap(); - ConnectStatus { - connect: connect, - status: ConnectState::Connected, - } - } - ModbusProto::Rtu { ref tty, baud_rate } => { - let builder = tokio_serial::new(tty, baud_rate); - let port = - tokio_serial::SerialStream::open(&builder).unwrap(); - let mut modbus = - rtu::connect_slave(port, slave).await.unwrap(); - ConnectStatus { - connect: connect, - status: ConnectState::Connected, - } - } - }; - client - .publish( - format!("{}/status/{}", args.mqtt_topic_prefix, conn_name) - .as_str(), - QoS::AtMostOnce, - false, - serde_json::to_vec(&status).unwrap(), - ) - .await - .unwrap(); - } - Err(err) => { - client - .publish( - format!("{}/status/{}", args.mqtt_topic_prefix, conn_name) - .as_str(), - QoS::AtMostOnce, - false, - serde_json::to_vec(&json!({ - "status": ConnectState::Errored, - "error": err.to_string(), - })) - .unwrap(), - ) - .await - .unwrap(); - } - } + [p, "connect", conn_name] if p == prefix.as_str() => { + registry + .send(RegistryCommand::Connect { + id: conn_name.to_string(), + details: payload, + }) + .await + .unwrap(); } _ => (), }; @@ -198,6 +266,96 @@ async fn main() { } } } + + rx_loop_handler.await.unwrap(); +} + +type ConnectionId = String; + +#[derive(Debug)] +enum RegistryCommand { + Connect { + id: ConnectionId, + details: bytes::Bytes, + }, + Disconnect(ConnectionId), +} + +async fn connection_registry( + prefix: String, + dispatcher: mpsc::Sender, + mut rx: mpsc::Receiver, +) { + println!("Starting connection registry..."); + while let Some(command) = rx.recv().await { + use RegistryCommand::*; + match command { + Connect { id, details } => { + println!("Connection {}: {:?}", id, &details); + let prefix = prefix.clone(); + let dispatcher = dispatcher.clone(); + tokio::spawn(async move { + handle_connect(dispatcher, id, prefix, details).await; + }); + } + _ => println!("unimplemented"), + } + } +} + +async fn handle_connect( + dispatcher: mpsc::Sender, + id: ConnectionId, + topic_prefix: String, + payload: bytes::Bytes, +) { + println!("Starting connection handler for {}", id); + match serde_json::from_slice::(&payload) { + Ok(connect) => { + let slave = Slave(connect.slave); + // println!("{:?}", connect); + + let status = match connect.settings { + ModbusProto::Tcp { ref host, port } => { + let socket_addr = format!("{}:{}", host, port).parse().unwrap(); + let mut modbus = tcp::connect_slave(socket_addr, slave).await.unwrap(); + ConnectStatus { + connect: connect, + status: ConnectState::Connected, + } + } + ModbusProto::Rtu { ref tty, baud_rate } => { + let builder = tokio_serial::new(tty, baud_rate); + let port = tokio_serial::SerialStream::open(&builder).unwrap(); + let mut modbus = rtu::connect_slave(port, slave).await.unwrap(); + ConnectStatus { + connect: connect, + status: ConnectState::Connected, + } + } + }; + dispatcher + .send(DispatchCommand::Publish { + topic: format!("{}/status/{}", topic_prefix, id), + payload: serde_json::to_vec(&status).unwrap(), + }) + .await + .unwrap(); + } + Err(err) => { + dispatcher + .send(DispatchCommand::Publish { + topic: format!("{}/status/{}", topic_prefix, id), + payload: serde_json::to_vec(&json!({ + "status": ConnectState::Errored, + "error": format!("Invalid config: {}", err.to_string()), + })) + .unwrap(), + }) + .await + .unwrap(); + } + } } // async fn requests(client: AsyncClient) {