diff --git a/app/src/alert_swiss.rs b/app/src/alert_swiss.rs index 058a55c..6469a8c 100644 --- a/app/src/alert_swiss.rs +++ b/app/src/alert_swiss.rs @@ -1,7 +1,9 @@ use crate::json::alerts::Alerts; +// TODO: maybe make this overridable? const ALERT_SWISS_URL: &str = "https://www.alert.swiss/content/alertswiss-internet/en/home/_jcr_content/polyalert.alertswiss_alerts.actual.json"; +/// Fetch alert information from alertswiss as a json document. pub async fn fetch_alerts() -> Result { reqwest::get(ALERT_SWISS_URL).await?.json::().await } diff --git a/app/src/cli.rs b/app/src/cli.rs index 0d898c0..c6badc8 100644 --- a/app/src/cli.rs +++ b/app/src/cli.rs @@ -9,6 +9,7 @@ pub struct Cli { /// Update interval in seconds #[arg(short, long, default_value_t = 10)] pub interval: u64, + /// Database connection string where timestamps are stored (SQLite, Postgres or MySql) #[arg(short, long, default_value_t = format!("sqlite::memory:"))] pub connection: String, diff --git a/app/src/datastore/rdbms.rs b/app/src/datastore/rdbms.rs index 09ace30..ced9f0f 100644 --- a/app/src/datastore/rdbms.rs +++ b/app/src/datastore/rdbms.rs @@ -1,3 +1,6 @@ +//! Provide functionality to interact with a relational database +//! for storing and retrieving alert information. + use entity::alerts; use migration::{Migrator, MigratorTrait}; use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, Set}; @@ -14,11 +17,13 @@ pub enum RdbmsError { DbError(#[from] sea_orm::DbErr), } +/// Wrap a database connection and allow operations on it. pub struct Db { connection: DatabaseConnection, } impl Db { + /// Check if an alert has been updated in the database. pub async fn is_alert_updated(&self, alert: &Alert) -> Result { let db_alert = entity::alerts::Entity::find_by_id(&alert.identifier) .one(&self.connection) @@ -30,6 +35,7 @@ impl Db { }) } + /// Save an alert to the database. pub async fn save_alert(&self, alert: &Alert) -> Result<(), RdbmsError> { let db_alert = entity::alerts::Entity::find_by_id(&alert.identifier) .one(&self.connection) @@ -49,6 +55,9 @@ impl Db { } } +/// Establish a connection to a database. +/// +/// Run all migrations. pub async fn connect(connection_string: &str) -> Result { debug!("connecting to {connection_string}..."); diff --git a/app/src/lib.rs b/app/src/lib.rs index 46c997f..cc769e1 100644 --- a/app/src/lib.rs +++ b/app/src/lib.rs @@ -48,6 +48,9 @@ mod json { pub mod logging; pub mod mqtt; +/// Check for new alerts, save them into the local database and publish them to mqtt. +/// +/// Return new alerts. async fn handle_alerts(db: &Db) -> Result, Error> { info!("checking alerts"); @@ -56,8 +59,10 @@ async fn handle_alerts(db: &Db) -> Result, Error> { .list .into_iter() .filter(|alert| !alert.identifier.starts_with("TEST-")) + // we do not need the test alerts .collect(); + // filter for new alerts by checking with the database let new_alerts: Vec = stream::iter(alerts) .filter_map(|alert| async move { if db.is_alert_updated(&alert).await.ok().unwrap_or(false) { @@ -69,6 +74,7 @@ async fn handle_alerts(db: &Db) -> Result, Error> { .collect() .await; + // persist new alerts stream::iter(new_alerts.clone()) .for_each_concurrent(None, |alert| async move { info!("saving alert {}", alert.identifier); @@ -81,6 +87,7 @@ async fn handle_alerts(db: &Db) -> Result, Error> { Ok(new_alerts) } +/// Entry point to run the alert-me application. pub async fn run(config: Config) -> Result<(), Error> { let mut interval = time::interval(Duration::from_secs(config.interval)); diff --git a/app/src/mqtt.rs b/app/src/mqtt.rs index 657cf75..45e8640 100644 --- a/app/src/mqtt.rs +++ b/app/src/mqtt.rs @@ -1,3 +1,5 @@ +//! MQTT publishing. + use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS}; use std::{sync::Arc, time::Duration}; use tokio::sync::Mutex; @@ -5,6 +7,7 @@ use tracing::debug; use crate::{config::MqttConfig, error::Error}; +/// Encapsulate the MQTT eventloop for publishing data. pub struct Mqtt { client: AsyncClient, eventloop: Arc>, @@ -12,6 +15,7 @@ pub struct Mqtt { } impl Mqtt { + /// Run the MQTT event loop to properly receive messages. async fn run_event_loop(&self) { let mut event_loop_running = self.event_loop_running.lock().await; if !*event_loop_running { @@ -28,6 +32,7 @@ impl Mqtt { } } + /// Publish data to an MQTT topic with QoS `AtLeastOnce`.. pub async fn publish(&self, payload: &[u8]) -> Result<(), Error> { self.run_event_loop().await; self.client @@ -38,6 +43,7 @@ impl Mqtt { } } +/// Connect and authenticate to an MQTT broker. pub fn connect(config: &MqttConfig) -> Mqtt { let mut options = MqttOptions::new("rumqtt-async", config.broker.clone(), config.port); let options = options