1
0
Fork 0

Store connection handler in a map db

gh-action
Bo Jeanes 2022-07-17 18:40:10 +10:00
parent ba09d6a76a
commit fa867cbfe5
1 changed files with 28 additions and 15 deletions

View File

@ -1,7 +1,7 @@
use rumqttc::{self, AsyncClient, Event, Incoming, LastWill, MqttOptions, Publish, QoS}; use rumqttc::{self, AsyncClient, Event, Incoming, LastWill, MqttOptions, Publish, QoS};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use std::time::Duration; use std::{collections::HashMap, time::Duration};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_modbus::prelude::*; use tokio_modbus::prelude::*;
@ -281,22 +281,39 @@ enum RegistryCommand {
Disconnect(ConnectionId), Disconnect(ConnectionId),
} }
type RegistryDb = HashMap<ConnectionId, tokio::task::JoinHandle<()>>;
async fn connection_registry( async fn connection_registry(
prefix: String, prefix: String,
dispatcher: mpsc::Sender<DispatchCommand>, dispatcher: mpsc::Sender<DispatchCommand>,
mut rx: mpsc::Receiver<RegistryCommand>, mut rx: mpsc::Receiver<RegistryCommand>,
) { ) {
println!("Starting connection registry..."); println!("Starting connection registry...");
let mut db: RegistryDb = HashMap::new();
while let Some(command) = rx.recv().await { while let Some(command) = rx.recv().await {
use RegistryCommand::*; use RegistryCommand::*;
match command { match command {
Disconnect(id) => {
if let Some(handle) = db.remove(&id) {
handle.abort();
}
}
Connect { id, details } => { Connect { id, details } => {
println!("Connection {}: {:?}", id, &details); println!("Connection {}: {:?}", id, &details);
let prefix = prefix.clone(); let prefix = prefix.clone();
let dispatcher = dispatcher.clone(); let dispatcher = dispatcher.clone();
tokio::spawn(async move {
handle_connect(dispatcher, id, prefix, details).await; if let Some(handle) = db.remove(&id) {
}); handle.abort();
}
db.insert(
id.clone(),
tokio::spawn(async move {
handle_connect(dispatcher, id, prefix, details).await;
}),
);
} }
_ => println!("unimplemented"), _ => println!("unimplemented"),
} }
@ -315,25 +332,21 @@ async fn handle_connect(
let slave = Slave(connect.slave); let slave = Slave(connect.slave);
// println!("{:?}", connect); // println!("{:?}", connect);
let status = match connect.settings { let mut modbus = match connect.settings {
ModbusProto::Tcp { ref host, port } => { ModbusProto::Tcp { ref host, port } => {
let socket_addr = format!("{}:{}", host, port).parse().unwrap(); let socket_addr = format!("{}:{}", host, port).parse().unwrap();
let mut modbus = tcp::connect_slave(socket_addr, slave).await.unwrap(); tcp::connect_slave(socket_addr, slave).await.unwrap()
ConnectStatus {
connect: connect,
status: ConnectState::Connected,
}
} }
ModbusProto::Rtu { ref tty, baud_rate } => { ModbusProto::Rtu { ref tty, baud_rate } => {
let builder = tokio_serial::new(tty, baud_rate); let builder = tokio_serial::new(tty, baud_rate);
let port = tokio_serial::SerialStream::open(&builder).unwrap(); let port = tokio_serial::SerialStream::open(&builder).unwrap();
let mut modbus = rtu::connect_slave(port, slave).await.unwrap(); rtu::connect_slave(port, slave).await.unwrap()
ConnectStatus {
connect: connect,
status: ConnectState::Connected,
}
} }
}; };
let status = ConnectStatus {
connect: connect,
status: ConnectState::Connected,
};
dispatcher dispatcher
.send(DispatchCommand::Publish { .send(DispatchCommand::Publish {
topic: format!("{}/status/{}", topic_prefix, id), topic: format!("{}/status/{}", topic_prefix, id),