diff --git a/src/main.rs b/src/main.rs index bbce7ab..9bdff31 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,7 +130,12 @@ struct Register { #[serde(skip_serializing_if = "Option::is_none")] parse: Option, - #[serde(with = "humantime_serde", default = "default_register_interval")] + #[serde( + with = "humantime_serde", + default = "default_register_interval", + alias = "period", + alias = "duration" + )] interval: Duration, } @@ -204,7 +209,7 @@ async fn main() { // Modbus connection registry let registry_handle = { let prefix = args.mqtt_topic_prefix.clone(); - tokio::spawn(async move { connection_registry(prefix, dispatcher_tx, registry_rx).await }) + tokio::spawn(connection_registry(prefix, dispatcher_tx, registry_rx)) }; // MQTT Dispatcher @@ -220,9 +225,7 @@ async fn main() { } options.set_keep_alive(Duration::from_secs(5)); // TODO: make this configurable - tokio::spawn(async move { - mqtt_dispatcher(options, prefix, registry_tx, dispatcher_rx).await; - }) + tokio::spawn(mqtt_dispatcher(options, prefix, registry_tx, dispatcher_rx)) }; registry_handle.await.unwrap(); @@ -362,9 +365,7 @@ async fn connection_registry( db.insert( id.clone(), - tokio::spawn(async move { - handle_connect(dispatcher, id, prefix, details).await; - }), + tokio::spawn(handle_connect(dispatcher, id, prefix, details)), ); } _ => println!("unimplemented"), @@ -372,7 +373,7 @@ async fn connection_registry( } } -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] enum ModbusReadType { Input, Hold, @@ -396,7 +397,6 @@ async fn handle_connect( match serde_json::from_slice::(&payload) { Ok(connect) => { let unit = connect.unit; - // println!("{:?}", connect); let mut modbus = match connect.settings { ModbusProto::Tcp { ref host, port } => { @@ -464,73 +464,30 @@ async fn handle_connect( use itertools::Itertools; for (duration, registers) in &connect.input.into_iter().group_by(|r| r.interval) { - let registers: Vec = registers.collect(); - let id = id.clone(); - let modbus = modbus_tx.clone(); - let dispatcher = dispatcher.clone(); - let topic_prefix = topic_prefix.clone(); + let registers_prefix = format!("{}/input/{}", topic_prefix, id); - tokio::spawn(async move { - let mut interval = tokio::time::interval(duration); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + tokio::spawn(watch_registers( + ModbusReadType::Input, + connect.address_offset, + duration, + registers.collect(), + modbus_tx.clone(), + dispatcher.clone(), + registers_prefix, + )); + } + for (duration, registers) in &connect.hold.into_iter().group_by(|r| r.interval) { + let registers_prefix = format!("{}/hold/{}", topic_prefix, id); - loop { - interval.tick().await; - for ref r in registers.iter() { - let address = if connect.address_offset >= 0 { - r.range.address.checked_add(connect.address_offset as u16) - } else { - r.range - .address - .checked_sub(connect.address_offset.unsigned_abs() as u16) - }; - if let Some(address) = address { - println!("Polling {}", address); - - let (tx, rx) = oneshot::channel(); - - modbus - .send(ModbusCommand::Read( - ModbusReadType::Input, - address, - r.range.count.into(), - tx, - )) - .await - .unwrap(); - - let values = rx.await.unwrap().unwrap(); - - let payload = - serde_json::to_vec(&json!({ "raw": values, })).unwrap(); - - dispatcher - .send(DispatchCommand::Publish { - topic: format!( - "{}/registers/{}/{}", - topic_prefix, id, r.range.address - ), - payload: payload.clone(), - }) - .await - .unwrap(); - - if let Some(name) = &r.name { - dispatcher - .send(DispatchCommand::Publish { - topic: format!( - "{}/registers/{}/{}", - topic_prefix, id, name - ), - payload: payload, - }) - .await - .unwrap(); - } - } - } - } - }); + tokio::spawn(watch_registers( + ModbusReadType::Hold, + connect.address_offset, + duration, + registers.collect(), + modbus_tx.clone(), + dispatcher.clone(), + registers_prefix, + )); } } Err(err) => { @@ -549,20 +506,65 @@ async fn handle_connect( } } -// async fn requests(client: AsyncClient) { -// client -// .subscribe("hello/world", QoS::AtMostOnce) -// .await -// .unwrap(); +async fn watch_registers( + read_type: ModbusReadType, + address_offset: i8, + duration: Duration, + registers: Vec, + modbus: mpsc::Sender, + dispatcher: mpsc::Sender, + registers_prefix: String, +) { + let mut interval = tokio::time::interval(duration); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); -// for i in 1..=10 { -// client -// .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i]) -// .await -// .unwrap(); + loop { + interval.tick().await; + for ref r in registers.iter() { + let address = if address_offset >= 0 { + r.range.address.checked_add(address_offset as u16) + } else { + r.range + .address + .checked_sub(address_offset.unsigned_abs() as u16) + }; + if let Some(address) = address { + println!("Polling {:?} {}", read_type, address); -// time::sleep(Duration::from_secs(1)).await; -// } + let (tx, rx) = oneshot::channel(); -// time::sleep(Duration::from_secs(120)).await; -// } + modbus + .send(ModbusCommand::Read( + read_type, + address, + r.range.count.into(), + tx, + )) + .await + .unwrap(); + + let values = rx.await.unwrap().unwrap(); + + let payload = serde_json::to_vec(&json!({ "raw": values, })).unwrap(); + + dispatcher + .send(DispatchCommand::Publish { + topic: format!("{}/{}", registers_prefix, r.range.address), + payload: payload.clone(), + }) + .await + .unwrap(); + + if let Some(name) = &r.name { + dispatcher + .send(DispatchCommand::Publish { + topic: format!("{}/{}", registers_prefix, name), + payload: payload, + }) + .await + .unwrap(); + } + } + } + } +}