1
0
Fork 0

Implement monitoring holding registers

gh-action
Bo Jeanes 2022-07-19 21:35:33 +10:00
parent cb56bcc66c
commit a183f62ed3
1 changed files with 91 additions and 89 deletions

View File

@ -130,7 +130,12 @@ struct Register {
#[serde(skip_serializing_if = "Option::is_none")]
parse: Option<RegisterParse>,
#[serde(with = "humantime_serde", default = "default_register_interval")]
#[serde(
with = "humantime_serde",
default = "default_register_interval",
alias = "period",
alias = "duration"
)]
interval: Duration,
}
@ -204,7 +209,7 @@ async fn main() {
// Modbus connection registry
let registry_handle = {
let prefix = args.mqtt_topic_prefix.clone();
tokio::spawn(async move { connection_registry(prefix, dispatcher_tx, registry_rx).await })
tokio::spawn(connection_registry(prefix, dispatcher_tx, registry_rx))
};
// MQTT Dispatcher
@ -220,9 +225,7 @@ async fn main() {
}
options.set_keep_alive(Duration::from_secs(5)); // TODO: make this configurable
tokio::spawn(async move {
mqtt_dispatcher(options, prefix, registry_tx, dispatcher_rx).await;
})
tokio::spawn(mqtt_dispatcher(options, prefix, registry_tx, dispatcher_rx))
};
registry_handle.await.unwrap();
@ -362,9 +365,7 @@ async fn connection_registry(
db.insert(
id.clone(),
tokio::spawn(async move {
handle_connect(dispatcher, id, prefix, details).await;
}),
tokio::spawn(handle_connect(dispatcher, id, prefix, details)),
);
}
_ => println!("unimplemented"),
@ -372,7 +373,7 @@ async fn connection_registry(
}
}
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
enum ModbusReadType {
Input,
Hold,
@ -396,7 +397,6 @@ async fn handle_connect(
match serde_json::from_slice::<Connect>(&payload) {
Ok(connect) => {
let unit = connect.unit;
// println!("{:?}", connect);
let mut modbus = match connect.settings {
ModbusProto::Tcp { ref host, port } => {
@ -464,73 +464,30 @@ async fn handle_connect(
use itertools::Itertools;
for (duration, registers) in &connect.input.into_iter().group_by(|r| r.interval) {
let registers: Vec<Register> = registers.collect();
let id = id.clone();
let modbus = modbus_tx.clone();
let dispatcher = dispatcher.clone();
let topic_prefix = topic_prefix.clone();
let registers_prefix = format!("{}/input/{}", topic_prefix, id);
tokio::spawn(async move {
let mut interval = tokio::time::interval(duration);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
interval.tick().await;
for ref r in registers.iter() {
let address = if connect.address_offset >= 0 {
r.range.address.checked_add(connect.address_offset as u16)
} else {
r.range
.address
.checked_sub(connect.address_offset.unsigned_abs() as u16)
};
if let Some(address) = address {
println!("Polling {}", address);
let (tx, rx) = oneshot::channel();
modbus
.send(ModbusCommand::Read(
tokio::spawn(watch_registers(
ModbusReadType::Input,
address,
r.range.count.into(),
tx,
))
.await
.unwrap();
let values = rx.await.unwrap().unwrap();
let payload =
serde_json::to_vec(&json!({ "raw": values, })).unwrap();
dispatcher
.send(DispatchCommand::Publish {
topic: format!(
"{}/registers/{}/{}",
topic_prefix, id, r.range.address
),
payload: payload.clone(),
})
.await
.unwrap();
if let Some(name) = &r.name {
dispatcher
.send(DispatchCommand::Publish {
topic: format!(
"{}/registers/{}/{}",
topic_prefix, id, name
),
payload: payload,
})
.await
.unwrap();
connect.address_offset,
duration,
registers.collect(),
modbus_tx.clone(),
dispatcher.clone(),
registers_prefix,
));
}
}
}
}
});
for (duration, registers) in &connect.hold.into_iter().group_by(|r| r.interval) {
let registers_prefix = format!("{}/hold/{}", topic_prefix, id);
tokio::spawn(watch_registers(
ModbusReadType::Hold,
connect.address_offset,
duration,
registers.collect(),
modbus_tx.clone(),
dispatcher.clone(),
registers_prefix,
));
}
}
Err(err) => {
@ -549,20 +506,65 @@ async fn handle_connect(
}
}
// async fn requests(client: AsyncClient) {
// client
// .subscribe("hello/world", QoS::AtMostOnce)
// .await
// .unwrap();
async fn watch_registers(
read_type: ModbusReadType,
address_offset: i8,
duration: Duration,
registers: Vec<Register>,
modbus: mpsc::Sender<ModbusCommand>,
dispatcher: mpsc::Sender<DispatchCommand>,
registers_prefix: String,
) {
let mut interval = tokio::time::interval(duration);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// for i in 1..=10 {
// client
// .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i])
// .await
// .unwrap();
loop {
interval.tick().await;
for ref r in registers.iter() {
let address = if address_offset >= 0 {
r.range.address.checked_add(address_offset as u16)
} else {
r.range
.address
.checked_sub(address_offset.unsigned_abs() as u16)
};
if let Some(address) = address {
println!("Polling {:?} {}", read_type, address);
// time::sleep(Duration::from_secs(1)).await;
// }
let (tx, rx) = oneshot::channel();
// time::sleep(Duration::from_secs(120)).await;
// }
modbus
.send(ModbusCommand::Read(
read_type,
address,
r.range.count.into(),
tx,
))
.await
.unwrap();
let values = rx.await.unwrap().unwrap();
let payload = serde_json::to_vec(&json!({ "raw": values, })).unwrap();
dispatcher
.send(DispatchCommand::Publish {
topic: format!("{}/{}", registers_prefix, r.range.address),
payload: payload.clone(),
})
.await
.unwrap();
if let Some(name) = &r.name {
dispatcher
.send(DispatchCommand::Publish {
topic: format!("{}/{}", registers_prefix, name),
payload: payload,
})
.await
.unwrap();
}
}
}
}
}