add a tiny bit more documentation

This commit is contained in:
Sebastian Hugentobler 2024-09-24 13:00:58 +02:00
parent 949eecffb6
commit 7b884d87db
Signed by: shu
GPG Key ID: BB32CF3CA052C2F0
5 changed files with 25 additions and 0 deletions

View File

@ -1,7 +1,9 @@
use crate::json::alerts::Alerts; use crate::json::alerts::Alerts;
// TODO: maybe make this overridable?
const ALERT_SWISS_URL: &str = "https://www.alert.swiss/content/alertswiss-internet/en/home/_jcr_content/polyalert.alertswiss_alerts.actual.json"; const ALERT_SWISS_URL: &str = "https://www.alert.swiss/content/alertswiss-internet/en/home/_jcr_content/polyalert.alertswiss_alerts.actual.json";
/// Fetch alert information from alertswiss as a json document.
pub async fn fetch_alerts() -> Result<Alerts, reqwest::Error> { pub async fn fetch_alerts() -> Result<Alerts, reqwest::Error> {
reqwest::get(ALERT_SWISS_URL).await?.json::<Alerts>().await reqwest::get(ALERT_SWISS_URL).await?.json::<Alerts>().await
} }

View File

@ -9,6 +9,7 @@ pub struct Cli {
/// Update interval in seconds /// Update interval in seconds
#[arg(short, long, default_value_t = 10)] #[arg(short, long, default_value_t = 10)]
pub interval: u64, pub interval: u64,
/// Database connection string where timestamps are stored (SQLite, Postgres or MySql) /// Database connection string where timestamps are stored (SQLite, Postgres or MySql)
#[arg(short, long, default_value_t = format!("sqlite::memory:"))] #[arg(short, long, default_value_t = format!("sqlite::memory:"))]
pub connection: String, pub connection: String,

View File

@ -1,3 +1,6 @@
//! Provide functionality to interact with a relational database
//! for storing and retrieving alert information.
use entity::alerts; use entity::alerts;
use migration::{Migrator, MigratorTrait}; use migration::{Migrator, MigratorTrait};
use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, Set}; use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, Set};
@ -14,11 +17,13 @@ pub enum RdbmsError {
DbError(#[from] sea_orm::DbErr), DbError(#[from] sea_orm::DbErr),
} }
/// Wrap a database connection and allow operations on it.
pub struct Db { pub struct Db {
connection: DatabaseConnection, connection: DatabaseConnection,
} }
impl Db { impl Db {
/// Check if an alert has been updated in the database.
pub async fn is_alert_updated(&self, alert: &Alert) -> Result<bool, RdbmsError> { pub async fn is_alert_updated(&self, alert: &Alert) -> Result<bool, RdbmsError> {
let db_alert = entity::alerts::Entity::find_by_id(&alert.identifier) let db_alert = entity::alerts::Entity::find_by_id(&alert.identifier)
.one(&self.connection) .one(&self.connection)
@ -30,6 +35,7 @@ impl Db {
}) })
} }
/// Save an alert to the database.
pub async fn save_alert(&self, alert: &Alert) -> Result<(), RdbmsError> { pub async fn save_alert(&self, alert: &Alert) -> Result<(), RdbmsError> {
let db_alert = entity::alerts::Entity::find_by_id(&alert.identifier) let db_alert = entity::alerts::Entity::find_by_id(&alert.identifier)
.one(&self.connection) .one(&self.connection)
@ -49,6 +55,9 @@ impl Db {
} }
} }
/// Establish a connection to a database.
///
/// Run all migrations.
pub async fn connect(connection_string: &str) -> Result<Db, RdbmsError> { pub async fn connect(connection_string: &str) -> Result<Db, RdbmsError> {
debug!("connecting to {connection_string}..."); debug!("connecting to {connection_string}...");

View File

@ -48,6 +48,9 @@ mod json {
pub mod logging; pub mod logging;
pub mod mqtt; pub mod mqtt;
/// Check for new alerts, save them into the local database and publish them to mqtt.
///
/// Return new alerts.
async fn handle_alerts(db: &Db) -> Result<Vec<Alert>, Error> { async fn handle_alerts(db: &Db) -> Result<Vec<Alert>, Error> {
info!("checking alerts"); info!("checking alerts");
@ -56,8 +59,10 @@ async fn handle_alerts(db: &Db) -> Result<Vec<Alert>, Error> {
.list .list
.into_iter() .into_iter()
.filter(|alert| !alert.identifier.starts_with("TEST-")) .filter(|alert| !alert.identifier.starts_with("TEST-"))
// we do not need the test alerts
.collect(); .collect();
// filter for new alerts by checking with the database
let new_alerts: Vec<Alert> = stream::iter(alerts) let new_alerts: Vec<Alert> = stream::iter(alerts)
.filter_map(|alert| async move { .filter_map(|alert| async move {
if db.is_alert_updated(&alert).await.ok().unwrap_or(false) { if db.is_alert_updated(&alert).await.ok().unwrap_or(false) {
@ -69,6 +74,7 @@ async fn handle_alerts(db: &Db) -> Result<Vec<Alert>, Error> {
.collect() .collect()
.await; .await;
// persist new alerts
stream::iter(new_alerts.clone()) stream::iter(new_alerts.clone())
.for_each_concurrent(None, |alert| async move { .for_each_concurrent(None, |alert| async move {
info!("saving alert {}", alert.identifier); info!("saving alert {}", alert.identifier);
@ -81,6 +87,7 @@ async fn handle_alerts(db: &Db) -> Result<Vec<Alert>, Error> {
Ok(new_alerts) Ok(new_alerts)
} }
/// Entry point to run the alert-me application.
pub async fn run(config: Config) -> Result<(), Error> { pub async fn run(config: Config) -> Result<(), Error> {
let mut interval = time::interval(Duration::from_secs(config.interval)); let mut interval = time::interval(Duration::from_secs(config.interval));

View File

@ -1,3 +1,5 @@
//! MQTT publishing.
use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS}; use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS};
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -5,6 +7,7 @@ use tracing::debug;
use crate::{config::MqttConfig, error::Error}; use crate::{config::MqttConfig, error::Error};
/// Encapsulate the MQTT eventloop for publishing data.
pub struct Mqtt { pub struct Mqtt {
client: AsyncClient, client: AsyncClient,
eventloop: Arc<Mutex<EventLoop>>, eventloop: Arc<Mutex<EventLoop>>,
@ -12,6 +15,7 @@ pub struct Mqtt {
} }
impl Mqtt { impl Mqtt {
/// Run the MQTT event loop to properly receive messages.
async fn run_event_loop(&self) { async fn run_event_loop(&self) {
let mut event_loop_running = self.event_loop_running.lock().await; let mut event_loop_running = self.event_loop_running.lock().await;
if !*event_loop_running { if !*event_loop_running {
@ -28,6 +32,7 @@ impl Mqtt {
} }
} }
/// Publish data to an MQTT topic with QoS `AtLeastOnce`..
pub async fn publish(&self, payload: &[u8]) -> Result<(), Error> { pub async fn publish(&self, payload: &[u8]) -> Result<(), Error> {
self.run_event_loop().await; self.run_event_loop().await;
self.client self.client
@ -38,6 +43,7 @@ impl Mqtt {
} }
} }
/// Connect and authenticate to an MQTT broker.
pub fn connect(config: &MqttConfig) -> Mqtt { pub fn connect(config: &MqttConfig) -> Mqtt {
let mut options = MqttOptions::new("rumqtt-async", config.broker.clone(), config.port); let mut options = MqttOptions::new("rumqtt-async", config.broker.clone(), config.port);
let options = options let options = options