From 84dcea771d384663420e9e88c1041caa2e7f81a8 Mon Sep 17 00:00:00 2001 From: Sebastian Hugentobler Date: Sat, 11 May 2024 21:27:22 +0200 Subject: [PATCH] add mqtt publishing --- .gitignore | 1 + app/src/cli.rs | 16 ++++++++++++++ app/src/config.rs | 20 ++++++++++++++++++ app/src/error.rs | 6 ++++++ app/src/lib.rs | 6 ++++++ app/src/mqtt.rs | 53 +++++++++++++++++++++++++++++++++++++++++++++++ flake.nix | 1 - 7 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 app/src/mqtt.rs diff --git a/.gitignore b/.gitignore index 4075fb2..d9ab711 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target result +mosquitto diff --git a/app/src/cli.rs b/app/src/cli.rs index 745698b..0d898c0 100644 --- a/app/src/cli.rs +++ b/app/src/cli.rs @@ -12,4 +12,20 @@ pub struct Cli { /// Database connection string where timestamps are stored (SQLite, Postgres or MySql) #[arg(short, long, default_value_t = format!("sqlite::memory:"))] pub connection: String, + + /// MQTT broker where alerts get published + #[arg(short, long)] + pub mqtt_broker: String, + + /// MQTT port + #[arg(short = 'p', long, default_value_t = 1883)] + pub mqtt_port: u16, + + /// MQTT user + #[arg(short = 'u', long)] + pub mqtt_user: String, + + /// MQTT password + #[arg(short = 'P', long)] + pub mqtt_password: String, } diff --git a/app/src/config.rs b/app/src/config.rs index 08d79cf..dc22a19 100644 --- a/app/src/config.rs +++ b/app/src/config.rs @@ -1,11 +1,25 @@ use crate::cli::Cli; +/// Mqtt configuration +pub struct MqttConfig { + /// Mqtt broker host + pub broker: String, + /// Mqtt broker port + pub port: u16, + /// Mqtt user name with publishing rights + pub user: String, + /// Password for the publishing userr + pub password: String, +} + /// Application configuration. pub struct Config { /// Update interval in seconds pub interval: u64, /// Database connection string where timestamps are stored (SQLite, Postgres or MySql) pub connection: String, + /// Mqtt Configuration + pub mqtt: MqttConfig, } impl From for Config { @@ -13,6 +27,12 @@ impl From for Config { Self { interval: value.interval, connection: value.connection, + mqtt: MqttConfig { + broker: value.mqtt_broker, + port: value.mqtt_port, + user: value.mqtt_user, + password: value.mqtt_password, + }, } } } diff --git a/app/src/error.rs b/app/src/error.rs index dfc91b7..1869764 100644 --- a/app/src/error.rs +++ b/app/src/error.rs @@ -10,4 +10,10 @@ pub enum Error { /// Error fetching data from alertswiss #[error("data error")] FetchError(#[from] reqwest::Error), + /// Error publishing to MQTT + #[error("mqtt client error")] + MqttCLientError(#[from] rumqttc::ClientError), + /// Error publishing to MQTT + #[error("mqtt connection error")] + MqttConnectionError(#[from] rumqttc::ConnectionError), } diff --git a/app/src/lib.rs b/app/src/lib.rs index 7e7b5f7..159dc25 100644 --- a/app/src/lib.rs +++ b/app/src/lib.rs @@ -35,6 +35,7 @@ mod json { pub mod title; } pub mod logging; +pub mod mqtt; async fn handle_alerts(db: &Db) -> Result, Error> { info!("checking alerts"); @@ -73,9 +74,14 @@ pub async fn run(config: Config) -> Result<(), Error> { let mut interval = time::interval(Duration::from_secs(config.interval)); let db = &datastore::rdbms::connect(&config.connection).await?; + let mqtt = mqtt::connect(&config.mqtt); loop { interval.tick().await; + let new_alerts = handle_alerts(db).await?; + if let Err(e) = mqtt.publish("chello".as_bytes()).await { + error!("failed pusblishing to mqtt: {e:?}") + } } } diff --git a/app/src/mqtt.rs b/app/src/mqtt.rs new file mode 100644 index 0000000..657cf75 --- /dev/null +++ b/app/src/mqtt.rs @@ -0,0 +1,53 @@ +use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::Mutex; +use tracing::debug; + +use crate::{config::MqttConfig, error::Error}; + +pub struct Mqtt { + client: AsyncClient, + eventloop: Arc>, + event_loop_running: Arc>, +} + +impl Mqtt { + async fn run_event_loop(&self) { + let mut event_loop_running = self.event_loop_running.lock().await; + if !*event_loop_running { + *event_loop_running = true; + + let eventloop = Arc::clone(&self.eventloop); + tokio::spawn(async move { + let mut eventloop = eventloop.lock().await; + while let Ok(notification) = eventloop.poll().await { + debug!("Received = {:?}", notification); + tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + } + } + + pub async fn publish(&self, payload: &[u8]) -> Result<(), Error> { + self.run_event_loop().await; + self.client + .publish("alerts", QoS::AtLeastOnce, false, payload) + .await?; + + Ok(()) + } +} + +pub fn connect(config: &MqttConfig) -> Mqtt { + let mut options = MqttOptions::new("rumqtt-async", config.broker.clone(), config.port); + let options = options + .set_credentials(config.user.clone(), config.password.clone()) + .to_owned(); + + let (client, eventloop) = AsyncClient::new(options, 10); + Mqtt { + client, + eventloop: Arc::new(Mutex::new(eventloop)), + event_loop_running: Arc::new(Mutex::new(false)), + } +} diff --git a/flake.nix b/flake.nix index b278be2..930ed88 100644 --- a/flake.nix +++ b/flake.nix @@ -9,7 +9,6 @@ outputs = { - self, nixpkgs, naersk, fenix,