1
0
Fork 0

Awkwardly put into async tasks O_o

gh-action
Bo Jeanes 2022-07-17 18:14:55 +10:00
parent 6b1b679350
commit ba09d6a76a
3 changed files with 240 additions and 80 deletions

1
Cargo.lock generated
View File

@ -439,6 +439,7 @@ dependencies = [
name = "modbus-mqtt"
version = "0.1.0"
dependencies = [
"bytes",
"clap",
"rumqttc",
"serde",

View File

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bytes = "1.1.0"
clap = { version = "3.2.12", features = ["derive", "env"] }
rumqttc = { version = "0.13.0", features = ["url"], git = "https://github.com/bytebeamio/rumqtt" }
serde = { version = "1.0.139", features = ["serde_derive"] }

View File

@ -2,6 +2,7 @@ use rumqttc::{self, AsyncClient, Event, Incoming, LastWill, MqttOptions, Publish
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_modbus::prelude::*;
use clap::Parser;
@ -36,9 +37,15 @@ enum ModbusProto {
#[serde(default = "default_modbus_port")]
port: u16,
},
#[serde(rename_all = "lowercase")]
Rtu {
// tty: std::path::PathBuf,
tty: String,
baud_rate: u32,
// data_bits: tokio_serial::DataBits, // TODO: allow this to be represented as a number instead of string
// stop_bits: tokio_serial::StopBits, // TODO: allow this to be represented as a number instead of string
// flow_control: tokio_se&rial::FlowControl,
// parity: tokio_serial::Parity,
},
}
@ -46,18 +53,71 @@ fn default_modbus_port() -> u16 {
502
}
#[derive(Serialize, Deserialize)]
struct Range {
address: u16,
size: u16,
}
// TODO: `scale`, `offset`, `precision`
enum RegisterValueType {
U8,
U16,
U32,
U64,
I8,
I16,
I32,
I64,
F32,
F64,
String,
}
#[derive(Serialize, Deserialize)]
struct RegisterParse {
#[serde(default = "default_swap")]
swap_bytes: bool,
#[serde(default = "default_swap")]
swap_words: bool,
}
fn default_swap() -> bool {
false
}
#[derive(Serialize, Deserialize)]
struct Register {
#[serde(flatten)]
range: Range,
#[serde(skip_serializing_if = "Option::is_none")]
name: Option<String>,
parse: Option<RegisterParse>,
}
#[derive(Serialize, Deserialize)]
struct Connect {
#[serde(flatten)]
settings: ModbusProto,
// input_ranges: Vec<Register>,
// hold_ranges: Vec<Register>,
#[serde(default = "default_modbus_unit")]
slave: u8, // TODO make `Slave` but need custom deserializer I think
#[serde(default = "default_address_offset")]
address_offset: i8,
}
fn default_modbus_unit() -> u8 {
0
}
fn default_address_offset() -> i8 {
0
}
#[derive(Serialize)]
#[serde(rename_all = "lowercase")]
@ -85,13 +145,52 @@ enum MainStatus {
async fn main() {
let args = Cli::parse();
let mut mqttoptions = MqttOptions::new("mqtt", args.mqtt_host.as_str(), args.mqtt_port);
let (registry_tx, mut registry_rx) = mpsc::channel::<RegistryCommand>(32);
let (dispatcher_tx, mut dispatcher_rx) = mpsc::channel::<DispatchCommand>(32);
// Modbus connection registry
let registry_handle = {
let prefix = args.mqtt_topic_prefix.clone();
tokio::spawn(async move { connection_registry(prefix, dispatcher_tx, registry_rx).await })
};
// MQTT Dispatcher
let dispatcher_handle = {
let prefix = args.mqtt_topic_prefix.clone();
let mut options = MqttOptions::new(
env!("CARGO_PKG_NAME"),
args.mqtt_host.as_str(),
args.mqtt_port,
);
if let (Some(u), Some(p)) = (args.mqtt_user, args.mqtt_password) {
mqttoptions.set_credentials(u, p);
options.set_credentials(u, p);
}
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_last_will(LastWill {
topic: format!("{}/status", args.mqtt_topic_prefix).to_string(),
options.set_keep_alive(Duration::from_secs(5)); // TODO: make this configurable
tokio::spawn(async move {
mqtt_dispatcher(options, prefix, registry_tx, dispatcher_rx).await;
})
};
registry_handle.await.unwrap();
dispatcher_handle.await.unwrap();
}
#[derive(Debug)]
enum DispatchCommand {
Publish { topic: String, payload: Vec<u8> },
}
async fn mqtt_dispatcher(
mut options: MqttOptions,
prefix: String,
registry: mpsc::Sender<RegistryCommand>,
mut rx: mpsc::Receiver<DispatchCommand>,
) {
println!("Connecting to MQTT broker...");
options.set_last_will(LastWill {
topic: format!("{}/status", prefix).to_string(),
message: serde_json::to_vec(&json!({
"status": MainStatus::Stopped,
}))
@ -101,19 +200,11 @@ async fn main() {
retain: false,
});
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client
.subscribe(
format!("{}/connect/#", args.mqtt_topic_prefix),
QoS::AtMostOnce,
)
.await
.unwrap();
let (client, mut eventloop) = AsyncClient::new(options, 10);
client
.publish(
format!("{}/status", args.mqtt_topic_prefix).to_string(),
format!("{}/status", prefix).to_string(),
QoS::AtMostOnce,
false,
serde_json::to_vec(&json!({
@ -124,72 +215,49 @@ async fn main() {
.await
.unwrap();
while let Ok(event) = eventloop.poll().await {
match event {
Event::Outgoing(_) => (),
Event::Incoming(Incoming::ConnAck(_)) => println!("Connected to MQTT!"),
Event::Incoming(Incoming::PingResp | Incoming::SubAck(_)) => (),
client
.subscribe(format!("{}/connect/#", prefix), QoS::AtMostOnce)
.await
.unwrap();
Event::Incoming(Incoming::Publish(Publish { topic, payload, .. })) => {
println!("{} {:?}", &topic, &payload);
match topic.split('/').collect::<Vec<&str>>()[..] {
[prefix, "connect", conn_name] if prefix == args.mqtt_topic_prefix.as_str() => {
match serde_json::from_slice::<Connect>(&payload) {
Ok(connect) => {
let slave = Slave(connect.slave);
// println!("{:?}", connect);
let status = match connect.settings {
ModbusProto::Tcp { ref host, port } => {
let socket_addr =
format!("{}:{}", host, port).parse().unwrap();
let mut modbus =
tcp::connect_slave(socket_addr, slave).await.unwrap();
ConnectStatus {
connect: connect,
status: ConnectState::Connected,
let rx_loop_handler = {
let client = client.clone();
tokio::spawn(async move {
println!("Start dispatcher rx loop");
while let Some(command) = rx.recv().await {
match command {
DispatchCommand::Publish { topic, payload } => {
client
.publish(topic, QoS::AtMostOnce, false, payload)
.await
.unwrap();
}
}
ModbusProto::Rtu { ref tty, baud_rate } => {
let builder = tokio_serial::new(tty, baud_rate);
let port =
tokio_serial::SerialStream::open(&builder).unwrap();
let mut modbus =
rtu::connect_slave(port, slave).await.unwrap();
ConnectStatus {
connect: connect,
status: ConnectState::Connected,
}
}
})
};
client
.publish(
format!("{}/status/{}", args.mqtt_topic_prefix, conn_name)
.as_str(),
QoS::AtMostOnce,
false,
serde_json::to_vec(&status).unwrap(),
)
while let Ok(event) = eventloop.poll().await {
use Event::{Incoming as In, Outgoing as Out};
match event {
Out(_) => (),
In(Incoming::ConnAck(_)) => println!("Connected to MQTT!"),
In(Incoming::PingResp | Incoming::SubAck(_)) => (),
In(Incoming::Publish(Publish { topic, payload, .. })) => {
println!("{} -> {:?}", &topic, &payload);
match topic.split('/').collect::<Vec<&str>>()[..] {
[p, "connect", conn_name] if p == prefix.as_str() => {
registry
.send(RegistryCommand::Connect {
id: conn_name.to_string(),
details: payload,
})
.await
.unwrap();
}
Err(err) => {
client
.publish(
format!("{}/status/{}", args.mqtt_topic_prefix, conn_name)
.as_str(),
QoS::AtMostOnce,
false,
serde_json::to_vec(&json!({
"status": ConnectState::Errored,
"error": err.to_string(),
}))
.unwrap(),
)
.await
.unwrap();
}
}
}
_ => (),
};
}
@ -198,6 +266,96 @@ async fn main() {
}
}
}
rx_loop_handler.await.unwrap();
}
type ConnectionId = String;
#[derive(Debug)]
enum RegistryCommand {
Connect {
id: ConnectionId,
details: bytes::Bytes,
},
Disconnect(ConnectionId),
}
async fn connection_registry(
prefix: String,
dispatcher: mpsc::Sender<DispatchCommand>,
mut rx: mpsc::Receiver<RegistryCommand>,
) {
println!("Starting connection registry...");
while let Some(command) = rx.recv().await {
use RegistryCommand::*;
match command {
Connect { id, details } => {
println!("Connection {}: {:?}", id, &details);
let prefix = prefix.clone();
let dispatcher = dispatcher.clone();
tokio::spawn(async move {
handle_connect(dispatcher, id, prefix, details).await;
});
}
_ => println!("unimplemented"),
}
}
}
async fn handle_connect(
dispatcher: mpsc::Sender<DispatchCommand>,
id: ConnectionId,
topic_prefix: String,
payload: bytes::Bytes,
) {
println!("Starting connection handler for {}", id);
match serde_json::from_slice::<Connect>(&payload) {
Ok(connect) => {
let slave = Slave(connect.slave);
// println!("{:?}", connect);
let status = match connect.settings {
ModbusProto::Tcp { ref host, port } => {
let socket_addr = format!("{}:{}", host, port).parse().unwrap();
let mut modbus = tcp::connect_slave(socket_addr, slave).await.unwrap();
ConnectStatus {
connect: connect,
status: ConnectState::Connected,
}
}
ModbusProto::Rtu { ref tty, baud_rate } => {
let builder = tokio_serial::new(tty, baud_rate);
let port = tokio_serial::SerialStream::open(&builder).unwrap();
let mut modbus = rtu::connect_slave(port, slave).await.unwrap();
ConnectStatus {
connect: connect,
status: ConnectState::Connected,
}
}
};
dispatcher
.send(DispatchCommand::Publish {
topic: format!("{}/status/{}", topic_prefix, id),
payload: serde_json::to_vec(&status).unwrap(),
})
.await
.unwrap();
}
Err(err) => {
dispatcher
.send(DispatchCommand::Publish {
topic: format!("{}/status/{}", topic_prefix, id),
payload: serde_json::to_vec(&json!({
"status": ConnectState::Errored,
"error": format!("Invalid config: {}", err.to_string()),
}))
.unwrap(),
})
.await
.unwrap();
}
}
}
// async fn requests(client: AsyncClient) {