add mqtt publishing

This commit is contained in:
Sebastian Hugentobler 2024-05-11 21:27:22 +02:00
parent 966373b8fa
commit 84dcea771d
Signed by: shu
GPG Key ID: BB32CF3CA052C2F0
7 changed files with 102 additions and 1 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
target target
result result
mosquitto

View File

@ -12,4 +12,20 @@ pub struct Cli {
/// Database connection string where timestamps are stored (SQLite, Postgres or MySql) /// Database connection string where timestamps are stored (SQLite, Postgres or MySql)
#[arg(short, long, default_value_t = format!("sqlite::memory:"))] #[arg(short, long, default_value_t = format!("sqlite::memory:"))]
pub connection: String, pub connection: String,
/// MQTT broker where alerts get published
#[arg(short, long)]
pub mqtt_broker: String,
/// MQTT port
#[arg(short = 'p', long, default_value_t = 1883)]
pub mqtt_port: u16,
/// MQTT user
#[arg(short = 'u', long)]
pub mqtt_user: String,
/// MQTT password
#[arg(short = 'P', long)]
pub mqtt_password: String,
} }

View File

@ -1,11 +1,25 @@
use crate::cli::Cli; use crate::cli::Cli;
/// Mqtt configuration
pub struct MqttConfig {
/// Mqtt broker host
pub broker: String,
/// Mqtt broker port
pub port: u16,
/// Mqtt user name with publishing rights
pub user: String,
/// Password for the publishing userr
pub password: String,
}
/// Application configuration. /// Application configuration.
pub struct Config { pub struct Config {
/// Update interval in seconds /// Update interval in seconds
pub interval: u64, pub interval: u64,
/// Database connection string where timestamps are stored (SQLite, Postgres or MySql) /// Database connection string where timestamps are stored (SQLite, Postgres or MySql)
pub connection: String, pub connection: String,
/// Mqtt Configuration
pub mqtt: MqttConfig,
} }
impl From<Cli> for Config { impl From<Cli> for Config {
@ -13,6 +27,12 @@ impl From<Cli> for Config {
Self { Self {
interval: value.interval, interval: value.interval,
connection: value.connection, connection: value.connection,
mqtt: MqttConfig {
broker: value.mqtt_broker,
port: value.mqtt_port,
user: value.mqtt_user,
password: value.mqtt_password,
},
} }
} }
} }

View File

@ -10,4 +10,10 @@ pub enum Error {
/// Error fetching data from alertswiss /// Error fetching data from alertswiss
#[error("data error")] #[error("data error")]
FetchError(#[from] reqwest::Error), FetchError(#[from] reqwest::Error),
/// Error publishing to MQTT
#[error("mqtt client error")]
MqttCLientError(#[from] rumqttc::ClientError),
/// Error publishing to MQTT
#[error("mqtt connection error")]
MqttConnectionError(#[from] rumqttc::ConnectionError),
} }

View File

@ -35,6 +35,7 @@ mod json {
pub mod title; pub mod title;
} }
pub mod logging; pub mod logging;
pub mod mqtt;
async fn handle_alerts(db: &Db) -> Result<Vec<Alert>, Error> { async fn handle_alerts(db: &Db) -> Result<Vec<Alert>, Error> {
info!("checking alerts"); info!("checking alerts");
@ -73,9 +74,14 @@ pub async fn run(config: Config) -> Result<(), Error> {
let mut interval = time::interval(Duration::from_secs(config.interval)); let mut interval = time::interval(Duration::from_secs(config.interval));
let db = &datastore::rdbms::connect(&config.connection).await?; let db = &datastore::rdbms::connect(&config.connection).await?;
let mqtt = mqtt::connect(&config.mqtt);
loop { loop {
interval.tick().await; interval.tick().await;
let new_alerts = handle_alerts(db).await?; let new_alerts = handle_alerts(db).await?;
if let Err(e) = mqtt.publish("chello".as_bytes()).await {
error!("failed pusblishing to mqtt: {e:?}")
}
} }
} }

53
app/src/mqtt.rs Normal file
View File

@ -0,0 +1,53 @@
use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS};
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tracing::debug;
use crate::{config::MqttConfig, error::Error};
pub struct Mqtt {
client: AsyncClient,
eventloop: Arc<Mutex<EventLoop>>,
event_loop_running: Arc<Mutex<bool>>,
}
impl Mqtt {
async fn run_event_loop(&self) {
let mut event_loop_running = self.event_loop_running.lock().await;
if !*event_loop_running {
*event_loop_running = true;
let eventloop = Arc::clone(&self.eventloop);
tokio::spawn(async move {
let mut eventloop = eventloop.lock().await;
while let Ok(notification) = eventloop.poll().await {
debug!("Received = {:?}", notification);
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
}
}
pub async fn publish(&self, payload: &[u8]) -> Result<(), Error> {
self.run_event_loop().await;
self.client
.publish("alerts", QoS::AtLeastOnce, false, payload)
.await?;
Ok(())
}
}
pub fn connect(config: &MqttConfig) -> Mqtt {
let mut options = MqttOptions::new("rumqtt-async", config.broker.clone(), config.port);
let options = options
.set_credentials(config.user.clone(), config.password.clone())
.to_owned();
let (client, eventloop) = AsyncClient::new(options, 10);
Mqtt {
client,
eventloop: Arc::new(Mutex::new(eventloop)),
event_loop_running: Arc::new(Mutex::new(false)),
}
}

View File

@ -9,7 +9,6 @@
outputs = outputs =
{ {
self,
nixpkgs, nixpkgs,
naersk, naersk,
fenix, fenix,