1
0
Fork 0

Refactor modbus connection starting

refactor
Bo Jeanes 2022-09-08 10:59:50 +10:00
parent cd3cc6f19c
commit 32cd2b2e78
3 changed files with 88 additions and 94 deletions

View File

@ -1,88 +1,76 @@
use crate::modbus::{self};
use crate::Error;
use serde::Deserialize;
use std::convert::TryFrom;
use tokio::{select, sync::mpsc};
use tokio_modbus::client::{rtu, tcp, Context as ModbusClient};
use tracing::error;
use tracing::{debug, error};
use crate::{mqtt, shutdown::Shutdown};
// TODO make this into run() and have it spawn the task
pub(crate) async fn new(
pub(crate) async fn run(
config: Config,
mqtt: mqtt::Handle,
shutdown: Shutdown,
) -> crate::Result<Connection> {
let client = match config.settings {
#[cfg(feature = "winet-s")]
ModbusProto::SungrowWiNetS { ref host } => {
tokio_modbus_winets::connect_slave(host, config.unit).await?
}
) -> crate::Result<Handle> {
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
#[cfg(feature = "tcp")]
ModbusProto::Tcp { ref host, port } => {
let socket_addr = format!("{}:{}", host, port).parse()?;
tcp::connect_slave(socket_addr, config.unit).await?
}
tokio::spawn(async move {
// Can unwrap because if MQTT handler is bad, we have nothing to do here.
mqtt.publish("state", "connecting").await.unwrap();
#[cfg(feature = "rtu")]
ModbusProto::Rtu {
ref tty,
baud_rate,
data_bits,
stop_bits,
flow_control,
parity,
} => {
let builder = tokio_serial::new(tty, baud_rate)
.data_bits(data_bits)
.flow_control(flow_control)
.parity(parity)
.stop_bits(stop_bits);
let port = tokio_serial::SerialStream::open(&builder)?;
rtu::connect_slave(port, config.unit).await?
}
ModbusProto::Unknown => {
error!("Unrecognised protocol");
return Err(crate::Error::UnrecognisedModbusProtocol);
}
};
match config.settings.connect(config.unit).await {
Ok(client) => {
// Can unwrap because if MQTT handler is bad, we have nothing to do here.
mqtt.publish("state", "connected").await.unwrap();
// Create handle and send to caller
let (tx, rx) = mpsc::channel(32);
handle_tx.send(Ok(Handle { tx })).unwrap();
Ok(Connection {
rx,
let conn = Connection {
client,
mqtt,
shutdown,
})
rx,
};
if let Err(error) = conn.run().await {
error!(?error, "Modbus connection failed");
}
}
Err(error) => handle_tx.send(Err(error.into())).unwrap(),
}
});
handle_rx.await.map_err(|_| crate::Error::RecvError)?
}
pub struct Connection {
struct Connection {
client: ModbusClient,
mqtt: mqtt::Handle,
shutdown: Shutdown,
rx: mpsc::Receiver<Message>,
}
enum Message {}
#[derive(Clone)]
pub struct Handler {
#[derive(Debug)]
pub struct Handle {
tx: mpsc::Sender<Message>,
}
#[derive(Debug)]
enum Message {}
impl Connection {
pub async fn run(mut self) -> crate::Result<()> {
loop {
select! {
Some(msg) = self.rx.recv() => { debug!(?msg); },
_ = self.shutdown.recv() => {
return Ok(());
}
}
}
// pub fn handle(&self) -> Handle {}
}
}
#[derive(Debug, Deserialize)]
@ -143,6 +131,48 @@ pub(crate) enum ModbusProto {
Unknown,
}
impl ModbusProto {
// Can we use the "slave context" thing in Modbus to pass the unit later?
pub async fn connect(&self, unit: modbus::Unit) -> crate::Result<ModbusClient> {
let client = match *self {
#[cfg(feature = "winet-s")]
ModbusProto::SungrowWiNetS { ref host } => {
tokio_modbus_winets::connect_slave(host, unit).await?
}
#[cfg(feature = "tcp")]
ModbusProto::Tcp { ref host, port } => {
let socket_addr = format!("{}:{}", host, port).parse()?;
tcp::connect_slave(socket_addr, unit).await?
}
#[cfg(feature = "rtu")]
ModbusProto::Rtu {
ref tty,
baud_rate,
data_bits,
stop_bits,
flow_control,
parity,
} => {
let builder = tokio_serial::new(tty, baud_rate)
.data_bits(data_bits)
.flow_control(flow_control)
.parity(parity)
.stop_bits(stop_bits);
let port = tokio_serial::SerialStream::open(&builder)?;
rtu::connect_slave(port, unit).await?
}
ModbusProto::Unknown => {
error!("Unrecognised protocol");
Err(Error::UnrecognisedModbusProtocol)?
}
};
Ok(client)
}
}
pub(crate) fn default_modbus_port() -> u16 {
502
}
@ -166,11 +196,3 @@ pub(crate) fn default_modbus_flow_control() -> tokio_serial::FlowControl {
pub(crate) fn default_modbus_parity() -> tokio_serial::Parity {
tokio_serial::Parity::None
}
impl TryFrom<Config> for Connection {
type Error = crate::Error;
fn try_from(_value: Config) -> Result<Self, Self::Error> {
todo!()
}
}

View File

@ -90,32 +90,7 @@ async fn connect(config: Config<'_>, mqtt: mqtt::Handle, shutdown: Shutdown) ->
mqtt.publish("state", "connecting").await?;
// connection isn't able to be Send, so we have to create connection inside its run task and find out if instantiation failed by connection on a channel.
{
let mqtt = mqtt.clone();
let (tx, rx) = tokio::sync::oneshot::channel::<crate::Result<()>>();
tokio::spawn(async move {
match connection::new(settings, mqtt.clone(), shutdown).await {
Ok(connection) => {
if let Err(e) = mqtt.publish("state", "connected").await {
tx.send(Err(e))
} else {
tx.send(Ok(()))
}
.expect("unexpected closed receiver");
if let Err(error) = connection.run().await {
error!(?error, "Modbus connection quit unexpectedly");
}
}
Err(e) => {
tx.send(Err(e)).expect("unexpected closed receiver");
}
}
});
rx.await.map_err(|_| crate::Error::RecvError)??;
}
let connection_handler = connection::run(settings, mqtt.clone(), shutdown).await?;
for reg in input {
let mqtt = mqtt.scoped("input");

View File

@ -7,9 +7,6 @@ pub mod connection;
pub mod connector;
pub mod register;
pub use connection::Connection;
// pub use register::Register;
#[derive(Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ConnectState {