1
0
Fork 0

last will

gh-action
Bo Jeanes 2022-07-17 11:06:00 +10:00
parent ac7eedb066
commit 6b1b679350
2 changed files with 36 additions and 11 deletions

View File

@ -5,7 +5,7 @@ Very rough for now.
## Topic spec ## Topic spec
``` ```
prefix/status -> { "status": "running" } # last-will message here? prefix/status -> { "status": "running" } # last-will message here too
prefix/connect/<connection> <- { prefix/connect/<connection> <- {
"host": "localhost", "host": "localhost",

View File

@ -1,9 +1,7 @@
use rumqttc::{self, AsyncClient, Event, Incoming, MqttOptions, Publish, QoS}; use rumqttc::{self, AsyncClient, Event, Incoming, LastWill, MqttOptions, Publish, QoS};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
// use serde_json::Result; use serde_json::json;
use std::error::Error;
use std::time::Duration; use std::time::Duration;
use tokio::{task, time};
use tokio_modbus::prelude::*; use tokio_modbus::prelude::*;
use clap::Parser; use clap::Parser;
@ -76,6 +74,13 @@ struct ConnectStatus {
status: ConnectState, status: ConnectState,
} }
#[derive(Serialize)]
#[serde(rename_all = "lowercase")]
enum MainStatus {
Running,
Stopped,
}
#[tokio::main(worker_threads = 1)] #[tokio::main(worker_threads = 1)]
async fn main() { async fn main() {
let args = Cli::parse(); let args = Cli::parse();
@ -85,18 +90,39 @@ async fn main() {
mqttoptions.set_credentials(u, p); mqttoptions.set_credentials(u, p);
} }
mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_last_will(LastWill {
topic: format!("{}/status", args.mqtt_topic_prefix).to_string(),
message: serde_json::to_vec(&json!({
"status": MainStatus::Stopped,
}))
.unwrap()
.into(),
qos: QoS::AtMostOnce,
retain: false,
});
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client client
.subscribe(format!("{}/#", args.mqtt_topic_prefix), QoS::AtMostOnce) .subscribe(
format!("{}/connect/#", args.mqtt_topic_prefix),
QoS::AtMostOnce,
)
.await .await
.unwrap(); .unwrap();
// let socket_addr = "10.10.10.219:502".parse().unwrap(); client
// let mut modbus = tcp::connect_slave(socket_addr, Slave(0x01)).await.unwrap(); .publish(
// let data = modbus.read_input_registers(5017, 2).await.unwrap(); format!("{}/status", args.mqtt_topic_prefix).to_string(),
// println!("Response is '{:#06x?}'", data); QoS::AtMostOnce,
false,
serde_json::to_vec(&json!({
"status": MainStatus::Running,
}))
.unwrap(),
)
.await
.unwrap();
while let Ok(event) = eventloop.poll().await { while let Ok(event) = eventloop.poll().await {
match event { match event {
@ -147,7 +173,6 @@ async fn main() {
.unwrap(); .unwrap();
} }
Err(err) => { Err(err) => {
use serde_json::json;
client client
.publish( .publish(
format!("{}/status/{}", args.mqtt_topic_prefix, conn_name) format!("{}/status/{}", args.mqtt_topic_prefix, conn_name)