From b398d7415446e95c70d774ab2c6e49e741c5a3e1 Mon Sep 17 00:00:00 2001 From: Bo Jeanes Date: Sun, 17 Jul 2022 10:41:56 +1000 Subject: [PATCH] Rough connectivity and idea --- Cargo.lock | 40 ++++++++++++ Cargo.toml | 3 + README.md | 36 +++++++++++ src/main.rs | 175 +++++++++++++++++++++++++++++++++++++++++++++------- 4 files changed, 232 insertions(+), 22 deletions(-) create mode 100644 README.md diff --git a/Cargo.lock b/Cargo.lock index 1f8add2..80d2930 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -441,8 +441,11 @@ version = "0.1.0" dependencies = [ "clap", "rumqttc", + "serde", + "serde_json", "tokio", "tokio-modbus", + "tokio-serial", ] [[package]] @@ -656,6 +659,12 @@ dependencies = [ "base64", ] +[[package]] +name = "ryu" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" + [[package]] name = "scopeguard" version = "1.1.0" @@ -672,6 +681,37 @@ dependencies = [ "untrusted", ] +[[package]] +name = "serde" +version = "1.0.139" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0171ebb889e45aa68b44aee0859b3eede84c6f5f5c228e6f140c0b2a0a46cad6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.139" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1d3230c1de7932af58ad8ffbe1d784bd55efd5a9d84ac24f69c72d83543dfb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.82" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "serialport" version = "4.2.0" diff --git a/Cargo.toml b/Cargo.toml index da56ea1..4593a20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,8 @@ edition = "2021" [dependencies] clap = { version = "3.2.12", features = ["derive", "env"] } rumqttc = { version = "0.13.0", features = ["url"], git = "https://github.com/bytebeamio/rumqtt" } +serde = { version = "1.0.139", features = ["serde_derive"] } +serde_json = "1.0.82" tokio = { version = "1.20.0", features = ["rt", "rt-multi-thread"] } tokio-modbus = "0.5.3" +tokio-serial = "5.4.3" diff --git a/README.md b/README.md new file mode 100644 index 0000000..f1adb9a --- /dev/null +++ b/README.md @@ -0,0 +1,36 @@ +# ModbusMQTT + +Very rough for now. + +## Topic spec + +``` +prefix/status -> { "status": "running" } # last-will message here? + +prefix/connect/ <- { + "host": "localhost", + "port": 502, + "slave": 1, + } + +prefix/status/ -> { + "host": "localhost", + "port": 502, + "slave": 1, + "status": "connected" + } + +prefix/logs/ -> { "message": "log message", level: "level" } + +prefix/connection//monitor[/opt-name] <- { + "address": 5100, + "type": "holding|input", + "count": 1, + "interval": 10, // seconds +} +``` + +## Similar projects + +https://github.com/Instathings/modbus2mqtt +https://github.com/TenySmart/ModbusTCP2MQTT - Sungrow inverter specific \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 9252c8b..2aba662 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,10 @@ -use rumqttc::{self, AsyncClient, MqttOptions, QoS}; +use rumqttc::{self, AsyncClient, Event, Incoming, MqttOptions, Publish, QoS}; +use serde::{Deserialize, Serialize}; +// use serde_json::Result; use std::error::Error; use std::time::Duration; use tokio::{task, time}; +use tokio_modbus::prelude::*; use clap::Parser; @@ -20,10 +23,61 @@ struct Cli { #[clap(short = 'P', long, env)] mqtt_password: Option, + + #[clap(short = 't', long, default_value = "modbus-mqtt")] + // Where to listen for commands + mqtt_topic_prefix: String, +} + +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +enum ModbusProto { + Tcp { + host: String, + + #[serde(default = "default_modbus_port")] + port: u16, + }, + Rtu { + tty: String, + baud_rate: u32, + }, +} + +fn default_modbus_port() -> u16 { + 502 +} + +#[derive(Serialize, Deserialize)] +struct Connect { + #[serde(flatten)] + settings: ModbusProto, + + #[serde(default = "default_modbus_unit")] + slave: u8, // TODO make `Slave` but need custom deserializer I think +} + +fn default_modbus_unit() -> u8 { + 0 +} + +#[derive(Serialize)] +#[serde(rename_all = "lowercase")] +enum ConnectState { + Connected, + Disconnected, + Errored, +} + +#[derive(Serialize)] +struct ConnectStatus { + #[serde(flatten)] + connect: Connect, + status: ConnectState, } #[tokio::main(worker_threads = 1)] -async fn main() -> Result<(), Box> { +async fn main() { let args = Cli::parse(); let mut mqttoptions = MqttOptions::new("mqtt", args.mqtt_host.as_str(), args.mqtt_port); @@ -33,31 +87,108 @@ async fn main() -> Result<(), Box> { mqttoptions.set_keep_alive(Duration::from_secs(5)); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - task::spawn(async move { - requests(client).await; - time::sleep(Duration::from_secs(3)).await; - }); - loop { - let event = eventloop.poll().await; - println!("{:?}", event.unwrap()); - } -} - -async fn requests(client: AsyncClient) { client - .subscribe("hello/world", QoS::AtMostOnce) + .subscribe(format!("{}/#", args.mqtt_topic_prefix), QoS::AtMostOnce) .await .unwrap(); - for i in 1..=10 { - client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i]) - .await - .unwrap(); + // let socket_addr = "10.10.10.219:502".parse().unwrap(); + // let mut modbus = tcp::connect_slave(socket_addr, Slave(0x01)).await.unwrap(); + // let data = modbus.read_input_registers(5017, 2).await.unwrap(); + // println!("Response is '{:#06x?}'", data); - time::sleep(Duration::from_secs(1)).await; + while let Ok(event) = eventloop.poll().await { + match event { + Event::Outgoing(_) => (), + Event::Incoming(Incoming::ConnAck(_)) => println!("Connected to MQTT!"), + Event::Incoming(Incoming::PingResp | Incoming::SubAck(_)) => (), + + Event::Incoming(Incoming::Publish(Publish { topic, payload, .. })) => { + println!("{} {:?}", &topic, &payload); + match topic.split('/').collect::>()[..] { + [prefix, "connect", conn_name] if prefix == args.mqtt_topic_prefix.as_str() => { + match serde_json::from_slice::(&payload) { + Ok(connect) => { + let slave = Slave(connect.slave); + // println!("{:?}", connect); + let status = match connect.settings { + ModbusProto::Tcp { ref host, port } => { + let socket_addr = + format!("{}:{}", host, port).parse().unwrap(); + let mut modbus = + tcp::connect_slave(socket_addr, slave).await.unwrap(); + ConnectStatus { + connect: connect, + status: ConnectState::Connected, + } + } + ModbusProto::Rtu { ref tty, baud_rate } => { + let builder = tokio_serial::new(tty, baud_rate); + let port = + tokio_serial::SerialStream::open(&builder).unwrap(); + let mut modbus = + rtu::connect_slave(port, slave).await.unwrap(); + ConnectStatus { + connect: connect, + status: ConnectState::Connected, + } + } + }; + client + .publish( + format!("{}/status/{}", args.mqtt_topic_prefix, conn_name) + .as_str(), + QoS::AtMostOnce, + false, + serde_json::to_vec(&status).unwrap(), + ) + .await + .unwrap(); + } + Err(err) => { + use serde_json::json; + client + .publish( + format!("{}/status/{}", args.mqtt_topic_prefix, conn_name) + .as_str(), + QoS::AtMostOnce, + false, + serde_json::to_vec(&json!({ + "status": ConnectState::Errored, + "error": err.to_string(), + })) + .unwrap(), + ) + .await + .unwrap(); + } + } + } + _ => (), + }; + } + _ => { + println!("{:?}", event); + } + } } - - time::sleep(Duration::from_secs(120)).await; } + +// async fn requests(client: AsyncClient) { +// client +// .subscribe("hello/world", QoS::AtMostOnce) +// .await +// .unwrap(); + +// for i in 1..=10 { +// client +// .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i]) +// .await +// .unwrap(); + +// time::sleep(Duration::from_secs(1)).await; +// } + +// time::sleep(Duration::from_secs(120)).await; +// }