diff --git a/src/main.rs b/src/main.rs index 3053f61..462f12f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use rumqttc::{self, AsyncClient, Event, Incoming, LastWill, MqttOptions, Publish, QoS}; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; use tokio::sync::mpsc; use tokio_modbus::prelude::*; @@ -281,22 +281,39 @@ enum RegistryCommand { Disconnect(ConnectionId), } +type RegistryDb = HashMap>; + async fn connection_registry( prefix: String, dispatcher: mpsc::Sender, mut rx: mpsc::Receiver, ) { println!("Starting connection registry..."); + let mut db: RegistryDb = HashMap::new(); + while let Some(command) = rx.recv().await { use RegistryCommand::*; match command { + Disconnect(id) => { + if let Some(handle) = db.remove(&id) { + handle.abort(); + } + } Connect { id, details } => { println!("Connection {}: {:?}", id, &details); let prefix = prefix.clone(); let dispatcher = dispatcher.clone(); - tokio::spawn(async move { - handle_connect(dispatcher, id, prefix, details).await; - }); + + if let Some(handle) = db.remove(&id) { + handle.abort(); + } + + db.insert( + id.clone(), + tokio::spawn(async move { + handle_connect(dispatcher, id, prefix, details).await; + }), + ); } _ => println!("unimplemented"), } @@ -315,25 +332,21 @@ async fn handle_connect( let slave = Slave(connect.slave); // println!("{:?}", connect); - let status = match connect.settings { + let mut modbus = match connect.settings { ModbusProto::Tcp { ref host, port } => { let socket_addr = format!("{}:{}", host, port).parse().unwrap(); - let mut modbus = tcp::connect_slave(socket_addr, slave).await.unwrap(); - ConnectStatus { - connect: connect, - status: ConnectState::Connected, - } + tcp::connect_slave(socket_addr, slave).await.unwrap() } ModbusProto::Rtu { ref tty, baud_rate } => { let builder = tokio_serial::new(tty, baud_rate); let port = tokio_serial::SerialStream::open(&builder).unwrap(); - let mut modbus = rtu::connect_slave(port, slave).await.unwrap(); - ConnectStatus { - connect: connect, - status: ConnectState::Connected, - } + rtu::connect_slave(port, slave).await.unwrap() } }; + let status = ConnectStatus { + connect: connect, + status: ConnectState::Connected, + }; dispatcher .send(DispatchCommand::Publish { topic: format!("{}/status/{}", topic_prefix, id),