@ -766,7 +766,7 @@ dependencies = [
name = "modbus-mqtt"
version = "0.1.0"
version = "0.2.0"
dependencies = [

@ -1,6 +1,6 @@
name = "modbus-mqtt"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
authors = ["Bo Jeanes <me@bjeanes.com>"]
description = "A bridge between Modbus devices and MQTT"

pub type Result<T> = std::result::Result<T, Error>;
pub type Result<T> = std::result::Result<T, Error>;
@ -398,10 +151,6 @@ pub type Result<T> = std::result::Result<T, Error>;
@ -1,9 +1,11 @@
use crate::modbus::{self};
use crate::Error;
use rust_decimal::prelude::Zero;
use serde::Deserialize;
use tokio::sync::oneshot;
use tokio::{select, sync::mpsc};
use tokio_modbus::client::{rtu, tcp, Context as ModbusClient};
use tracing::{debug, error};
use tracing::{debug, error, warn};
use crate::{mqtt, shutdown::Shutdown};
@ -18,25 +20,32 @@ pub(crate) async fn run(
// Can unwrap because if MQTT handler is bad, we have nothing to do here.
mqtt.publish("state", "connecting").await.unwrap();
let address_offset = config.address_offset;
match config.settings.connect(config.unit).await {
Ok(client) => {
// Can unwrap because if MQTT handler is bad, we have nothing to do here.
mqtt.publish("state", "connected").await.unwrap();
// Create handle and send to caller
let (tx, rx) = mpsc::channel(32);
handle_tx.send(Ok(Handle { tx })).unwrap();
let conn = Connection {
mqtt: mqtt.clone(),
if let Err(error) = conn.run().await {
error!(?error, "Modbus connection failed");
// we are shutting down here, so don't care if this fails
let _ = mqtt.publish("state", "disconnected").await;
Err(error) => handle_tx.send(Err(error.into())).unwrap(),
@ -47,30 +56,182 @@ pub(crate) async fn run(
struct Connection {
client: ModbusClient,
address_offset: i8,
mqtt: mqtt::Handle,
shutdown: Shutdown,
rx: mpsc::Receiver<Message>,
rx: mpsc::Receiver<Command>,
tx: mpsc::Sender<Command>,
pub struct Handle {
tx: mpsc::Sender<Message>,
tx: mpsc::Sender<Command>,
impl Handle {
pub async fn write_register(&self, address: u16, data: Vec<Word>) -> crate::Result<Vec<Word>> {
let (tx, rx) = oneshot::channel();
.send(Command::Write(address, data, tx))
.map_err(|_| Error::SendError)?;
rx.await.map_err(|_| Error::RecvError)?
pub async fn read_input_register(
address: u16,
quantity: u8,
) -> crate::Result<Vec<Word>> {
self.read_register(ReadType::Input, address, quantity).await
pub async fn read_holding_register(
address: u16,
quantity: u8,
) -> crate::Result<Vec<Word>> {
self.read_register(ReadType::Holding, address, quantity)
async fn read_register(
reg_type: ReadType,
address: u16,
quantity: u8,
) -> crate::Result<Vec<Word>> {
let (tx, rx) = oneshot::channel();
.send(Command::Read(reg_type, address, quantity, tx))
.map_err(|_| Error::SendError)?;
rx.await.map_err(|_| Error::RecvError)?
type Word = u16;
type Response = oneshot::Sender<crate::Result<Vec<Word>>>;
#[derive(Clone, Copy, Debug)]
enum ReadType {
enum Message {}
enum Command {
Read(ReadType, u16, u8, Response),
Write(u16, Vec<Word>, Response),
impl Connection {
pub async fn run(mut self) -> crate::Result<()> {
let mut input_registers = self.mqtt.subscribe("input/+").await?;
let mut holding_registers = self.mqtt.subscribe("holding/+").await?;
// TODO: if we get a new register definition for an existing register, how do we avoid redundant (and possibly
// conflicting) tasks? Should MQTT component only allow one subscriber per topic filter, replacing the old one
// when it gets a new subscribe request?
loop {
select! {
Some(msg) = self.rx.recv() => { debug!(?msg); },
Some(reg) = input_registers.recv() => {},
Some(reg) = holding_registers.recv() => {},
Some(cmd) = self.rx.recv() => { self.process_command(cmd).await; },
_ = self.shutdown.recv() => {
return Ok(());
fn handle(&self) -> Handle {
Handle {
tx: self.tx.clone(),
/// Apply address offset to address.
/// Panics if offset would overflow or underflow the address.
fn adjust_address(&self, address: u16) -> u16 {
if self.address_offset.is_zero() {
return address;
// TODO: use `checked_add_signed()` once stabilised: https://doc.rust-lang.org/std/primitive.u16.html#method.checked_add_signed
let adjusted_address = if self.address_offset >= 0 {
address.checked_add(self.address_offset as u16)
} else {
address.checked_sub(self.address_offset.unsigned_abs() as u16)
if let Some(address) = adjusted_address {
} else {
error!(address, offset = self.address_offset,);
// panic!("Address offset would underflow/overflow")
async fn process_command(&mut self, cmd: Command) {
use tokio_modbus::prelude::Reader;
let (tx, response) = match cmd {
Command::Read(ReadType::Input, address, count, tx) => {
let address = self.adjust_address(address);
.read_input_registers(address, count as u16)
Command::Read(ReadType::Holding, address, count, tx) => {
let address = self.adjust_address(address);
.read_holding_registers(address, count as u16)
Command::Write(address, data, tx) => {
let address = self.adjust_address(address);
data.len() as u16,
// This might be transient, so don't kill connection. We may be able to discriminate on the error to determine
// which errors are transient and which are conclusive.
// Some errors that we have observed:
// Error { kind: UnexpectedEof, message: "failed to fill whole buffer" }'
// Custom { kind: InvalidData, error: "Invalid data length: 0" }'
// Os { code: 36, kind: Uncategorized, message: "Operation now in progress" }'
// Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" }
if let Err(error) = &response {
warn!(?error, "modbus command error");
// This probably just means that the register task died or is no longer monitoring the response.
if let Err(response) = tx.send(response.map_err(Into::into)) {
warn!(?response, "error sending response");
#[derive(Debug, Deserialize)]

@ -6,6 +6,10 @@ use serde_json::value::RawValue as RawJSON;
use tokio::select;
use tracing::{debug, error, info};
NOTE: Should this be a connection _registry_ of sorts which also restarts connections which die?
/// The topic filter under the prefix to look for connection configs
const TOPIC: &str = "+/connect";
@ -88,10 +92,10 @@ async fn connect(config: Config<'_>, mqtt: mqtt::Handle, shutdown: Shutdown) ->
} = config;
mqtt.publish("state", "connecting").await?;
let connection_handler = connection::run(settings, mqtt.clone(), shutdown).await?;
// TODO: consider waiting 1 second before sending the registers to MQTT, to ensure that the connection is listening.
for reg in input {
let mqtt = mqtt.scoped("input");
if let Ok(r) = serde_json::from_slice::<register::AddressedRegister>(reg.get().as_bytes()) {

@ -164,6 +164,9 @@ impl Connection {
for filter in &filters {
let channel = channel.clone();
// NOTE: Curently allows multiple components to watch the same topic filter, but if there is no need
// for this, it might make more sense to have it _replace_ the channel, so that old (stale)
// components automatically finish running.
match self.subscriptions.get_mut(&filter.path) {
Some(channels) => channels.push(channel),
None => {