1
0
Fork 0

Rough connectivity and idea

gh-action
Bo Jeanes 2022-07-17 10:41:56 +10:00
parent c63072715d
commit b398d74154
4 changed files with 232 additions and 22 deletions

40
Cargo.lock generated
View File

@ -441,8 +441,11 @@ version = "0.1.0"
dependencies = [
"clap",
"rumqttc",
"serde",
"serde_json",
"tokio",
"tokio-modbus",
"tokio-serial",
]
[[package]]
@ -656,6 +659,12 @@ dependencies = [
"base64",
]
[[package]]
name = "ryu"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695"
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -672,6 +681,37 @@ dependencies = [
"untrusted",
]
[[package]]
name = "serde"
version = "1.0.139"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0171ebb889e45aa68b44aee0859b3eede84c6f5f5c228e6f140c0b2a0a46cad6"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.139"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc1d3230c1de7932af58ad8ffbe1d784bd55efd5a9d84ac24f69c72d83543dfb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serialport"
version = "4.2.0"

View File

@ -8,5 +8,8 @@ edition = "2021"
[dependencies]
clap = { version = "3.2.12", features = ["derive", "env"] }
rumqttc = { version = "0.13.0", features = ["url"], git = "https://github.com/bytebeamio/rumqtt" }
serde = { version = "1.0.139", features = ["serde_derive"] }
serde_json = "1.0.82"
tokio = { version = "1.20.0", features = ["rt", "rt-multi-thread"] }
tokio-modbus = "0.5.3"
tokio-serial = "5.4.3"

36
README.md 100644
View File

@ -0,0 +1,36 @@
# ModbusMQTT
Very rough for now.
## Topic spec
```
prefix/status -> { "status": "running" } # last-will message here?
prefix/connect/<connection> <- {
"host": "localhost",
"port": 502,
"slave": 1,
}
prefix/status/<connection> -> {
"host": "localhost",
"port": 502,
"slave": 1,
"status": "connected"
}
prefix/logs/<connection> -> { "message": "log message", level: "level" }
prefix/connection/<connection>/monitor[/opt-name] <- {
"address": 5100,
"type": "holding|input",
"count": 1,
"interval": 10, // seconds
}
```
## Similar projects
https://github.com/Instathings/modbus2mqtt
https://github.com/TenySmart/ModbusTCP2MQTT - Sungrow inverter specific

View File

@ -1,7 +1,10 @@
use rumqttc::{self, AsyncClient, MqttOptions, QoS};
use rumqttc::{self, AsyncClient, Event, Incoming, MqttOptions, Publish, QoS};
use serde::{Deserialize, Serialize};
// use serde_json::Result;
use std::error::Error;
use std::time::Duration;
use tokio::{task, time};
use tokio_modbus::prelude::*;
use clap::Parser;
@ -20,10 +23,61 @@ struct Cli {
#[clap(short = 'P', long, env)]
mqtt_password: Option<String>,
#[clap(short = 't', long, default_value = "modbus-mqtt")]
// Where to listen for commands
mqtt_topic_prefix: String,
}
#[derive(Serialize, Deserialize)]
#[serde(untagged)]
enum ModbusProto {
Tcp {
host: String,
#[serde(default = "default_modbus_port")]
port: u16,
},
Rtu {
tty: String,
baud_rate: u32,
},
}
fn default_modbus_port() -> u16 {
502
}
#[derive(Serialize, Deserialize)]
struct Connect {
#[serde(flatten)]
settings: ModbusProto,
#[serde(default = "default_modbus_unit")]
slave: u8, // TODO make `Slave` but need custom deserializer I think
}
fn default_modbus_unit() -> u8 {
0
}
#[derive(Serialize)]
#[serde(rename_all = "lowercase")]
enum ConnectState {
Connected,
Disconnected,
Errored,
}
#[derive(Serialize)]
struct ConnectStatus {
#[serde(flatten)]
connect: Connect,
status: ConnectState,
}
#[tokio::main(worker_threads = 1)]
async fn main() -> Result<(), Box<dyn Error>> {
async fn main() {
let args = Cli::parse();
let mut mqttoptions = MqttOptions::new("mqtt", args.mqtt_host.as_str(), args.mqtt_port);
@ -33,31 +87,108 @@ async fn main() -> Result<(), Box<dyn Error>> {
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
requests(client).await;
time::sleep(Duration::from_secs(3)).await;
});
loop {
let event = eventloop.poll().await;
println!("{:?}", event.unwrap());
}
}
async fn requests(client: AsyncClient) {
client
.subscribe("hello/world", QoS::AtMostOnce)
.subscribe(format!("{}/#", args.mqtt_topic_prefix), QoS::AtMostOnce)
.await
.unwrap();
for i in 1..=10 {
client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; i])
.await
.unwrap();
// let socket_addr = "10.10.10.219:502".parse().unwrap();
// let mut modbus = tcp::connect_slave(socket_addr, Slave(0x01)).await.unwrap();
// let data = modbus.read_input_registers(5017, 2).await.unwrap();
// println!("Response is '{:#06x?}'", data);
time::sleep(Duration::from_secs(1)).await;
while let Ok(event) = eventloop.poll().await {
match event {
Event::Outgoing(_) => (),
Event::Incoming(Incoming::ConnAck(_)) => println!("Connected to MQTT!"),
Event::Incoming(Incoming::PingResp | Incoming::SubAck(_)) => (),
Event::Incoming(Incoming::Publish(Publish { topic, payload, .. })) => {
println!("{} {:?}", &topic, &payload);
match topic.split('/').collect::<Vec<&str>>()[..] {
[prefix, "connect", conn_name] if prefix == args.mqtt_topic_prefix.as_str() => {
match serde_json::from_slice::<Connect>(&payload) {
Ok(connect) => {
let slave = Slave(connect.slave);
// println!("{:?}", connect);
let status = match connect.settings {
ModbusProto::Tcp { ref host, port } => {
let socket_addr =
format!("{}:{}", host, port).parse().unwrap();
let mut modbus =
tcp::connect_slave(socket_addr, slave).await.unwrap();
ConnectStatus {
connect: connect,
status: ConnectState::Connected,
}
}
ModbusProto::Rtu { ref tty, baud_rate } => {
let builder = tokio_serial::new(tty, baud_rate);
let port =
tokio_serial::SerialStream::open(&builder).unwrap();
let mut modbus =
rtu::connect_slave(port, slave).await.unwrap();
ConnectStatus {
connect: connect,
status: ConnectState::Connected,
}
}
};
client
.publish(
format!("{}/status/{}", args.mqtt_topic_prefix, conn_name)
.as_str(),
QoS::AtMostOnce,
false,
serde_json::to_vec(&status).unwrap(),
)
.await
.unwrap();
}
Err(err) => {
use serde_json::json;
client
.publish(
format!("{}/status/{}", args.mqtt_topic_prefix, conn_name)
.as_str(),
QoS::AtMostOnce,
false,
serde_json::to_vec(&json!({
"status": ConnectState::Errored,
"error": err.to_string(),
}))
.unwrap(),
)
.await
.unwrap();
}
}
}
_ => (),
};
}
_ => {
println!("{:?}", event);
}
}
}
time::sleep(Duration::from_secs(120)).await;
}
// async fn requests(client: AsyncClient) {
// client
// .subscribe("hello/world", QoS::AtMostOnce)
// .await
// .unwrap();
// for i in 1..=10 {
// client
// .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i])
// .await
// .unwrap();
// time::sleep(Duration::from_secs(1)).await;
// }
// time::sleep(Duration::from_secs(120)).await;
// }