correctly handle updated alerts

This commit is contained in:
Sebastian Hugentobler 2024-05-11 18:37:41 +02:00
parent a8c0be2c05
commit ac0ac8a10d
Signed by: shu
GPG key ID: BB32CF3CA052C2F0
34 changed files with 775 additions and 114 deletions

21
app/Cargo.toml Normal file
View file

@ -0,0 +1,21 @@
[package]
name = "alert-me"
version = "0.1.0"
edition = "2021"
[dependencies]
entity = { path = "../entity" }
migration = { path = "../migration" }
clap = { version = "4.5.4", features = ["derive"] }
config = "0.14.0"
futures = "0.3.30"
reqwest = { version = "0.12.4", features = ["json", "rustls-tls"], default-features = false }
rumqttc = "0.24.0"
sea-orm = { workspace = true, features = [ "with-time", "sqlx-sqlite", "sqlx-postgres", "runtime-tokio-rustls", "macros" ] }
serde = { workspace = true }
serde_json = "1.0.116"
thiserror = "1.0.59"
time = { workspace = true, features = ["macros", "serde", "formatting", "parsing" ] }
tokio = { version = "1.37.0", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

7
app/src/alert_swiss.rs Normal file
View file

@ -0,0 +1,7 @@
use crate::json::alerts::Alerts;
const ALERT_SWISS_URL: &str = "https://www.alert.swiss/content/alertswiss-internet/en/home/_jcr_content/polyalert.alertswiss_alerts.actual.json";
pub async fn fetch_alerts() -> Result<Alerts, reqwest::Error> {
reqwest::get(ALERT_SWISS_URL).await?.json::<Alerts>().await
}

15
app/src/cli.rs Normal file
View file

@ -0,0 +1,15 @@
//! Cli interface.
use clap::Parser;
/// Push Alertswiss notifications to MQTT.
#[derive(Parser)]
#[command(version, about, long_about = None)]
pub struct Cli {
/// Update interval in seconds
#[arg(short, long, default_value_t = 10)]
pub interval: u64,
/// Database connection string where timestamps are stored (SQLite, Postgres or MySql)
#[arg(short, long, default_value_t = format!("sqlite::memory:"))]
pub connection: String,
}

18
app/src/config.rs Normal file
View file

@ -0,0 +1,18 @@
use crate::cli::Cli;
/// Application configuration.
pub struct Config {
/// Update interval in seconds
pub interval: u64,
/// Database connection string where timestamps are stored (SQLite, Postgres or MySql)
pub connection: String,
}
impl From<Cli> for Config {
fn from(value: Cli) -> Self {
Self {
interval: value.interval,
connection: value.connection,
}
}
}

View file

@ -0,0 +1,50 @@
use entity::alerts;
use migration::{Migrator, MigratorTrait};
use sea_orm::{ActiveModelTrait, Database, DatabaseConnection, DbErr, EntityTrait, Set};
use tracing::debug;
use crate::json::alert::Alert;
pub struct Db {
connection: DatabaseConnection,
}
impl Db {
pub async fn is_alert_updated(&self, alert: &Alert) -> Result<bool, DbErr> {
let db_alert = entity::alerts::Entity::find_by_id(&alert.identifier)
.one(&self.connection)
.await?;
Ok(match db_alert {
Some(db_alert) => db_alert.publish_date < alert.publish_date,
None => true,
})
}
pub async fn save_alert(&self, alert: &Alert) -> Result<(), DbErr> {
let db_alert = entity::alerts::Entity::find_by_id(&alert.identifier)
.one(&self.connection)
.await?;
let alert = alerts::ActiveModel {
id: Set(alert.identifier.to_owned()),
publish_date: Set(alert.publish_date),
};
match db_alert {
Some(_) => alert.save(&self.connection).await?,
None => alert.insert(&self.connection).await?.into(),
};
Ok(())
}
}
pub async fn connect(connection_string: &str) -> Result<Db, DbErr> {
debug!("connecting to {connection_string}...");
let connection: DatabaseConnection = Database::connect(connection_string).await?;
Migrator::up(&connection, None).await?;
Ok(Db { connection })
}

13
app/src/error.rs Normal file
View file

@ -0,0 +1,13 @@
use sea_orm::DbErr;
use thiserror::Error;
#[derive(Error, Debug)]
#[error("opds error")]
pub enum Error {
/// Error rendering OPDS.
#[error("opds error")]
DbError(#[from] DbErr),
/// Error fetching data from calibre.
#[error("data error")]
FetchError(#[from] reqwest::Error),
}

40
app/src/json/alert.rs Normal file
View file

@ -0,0 +1,40 @@
use serde::{Deserialize, Serialize};
use time::PrimitiveDateTime;
use super::{
area::Area, contact::Contact, description::Description, image::Image,
image_description::ImageDescription, image_title::ImageTitle, link::Link, text::Text,
title::Title,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Alert {
pub identifier: String,
pub gtrans_origin: Option<String>,
#[serde(deserialize_with = "super::datetime::deserialize_alert")]
pub sent: PrimitiveDateTime,
pub title: Title,
pub description: Description,
pub nation_wide: bool,
pub banner: bool,
pub instructions: Vec<Text>,
pub sender: String,
pub publisher_name: Option<String>,
pub link: Option<String>,
pub links: Vec<Link>,
pub contact: Contact,
pub event: String,
pub all_clear: bool,
pub severity: String,
pub test_alert: bool,
pub technical_test_alert: bool,
#[serde(deserialize_with = "super::datetime::deserialize_alert")]
pub publish_date: PrimitiveDateTime,
pub areas: Vec<Area>,
pub images: Vec<Image>,
pub image_titles: Vec<ImageTitle>,
pub image_descriptions: Vec<ImageDescription>,
pub event_icon_path: String,
pub reference: String,
}

13
app/src/json/alerts.rs Normal file
View file

@ -0,0 +1,13 @@
use super::alert::Alert;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Alerts {
pub heartbeat_age_in_millis: u64,
#[serde(deserialize_with = "super::datetime::deserialize_alerts")]
pub render_time: OffsetDateTime,
#[serde(rename = "alerts")]
pub list: Vec<Alert>,
}

13
app/src/json/area.rs Normal file
View file

@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};
use super::{circle::Circle, description::Description, polygon::Polygon, region::Region};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Area {
pub description: Description,
pub region_type: String,
pub polygons: Vec<Polygon>,
pub circles: Vec<Circle>,
pub regions: Vec<Region>,
}

10
app/src/json/circle.rs Normal file
View file

@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};
use super::coordinate::Coordinate;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Circle {
pub center_position: Coordinate,
pub radius: String,
}

8
app/src/json/contact.rs Normal file
View file

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Contact {
pub contact: String,
pub contact_origin: Option<String>,
}

View file

@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Coordinate {
#[serde(rename = "0")]
pub zero: String,
#[serde(rename = "1")]
pub one: String,
}

26
app/src/json/datetime.rs Normal file
View file

@ -0,0 +1,26 @@
use serde::{Deserialize, Deserializer};
use time::format_description::FormatItem;
use time::macros::format_description;
use time::{OffsetDateTime, PrimitiveDateTime};
const ALERT_FORMAT: &[FormatItem<'_>] =
format_description!("[weekday repr:short], [day].[month].[year], [hour]:[minute]");
const ALERTS_FORMAT: &[FormatItem<'_>] = format_description!(
"[day].[month].[year] [hour]:[minute]:[second].[subsecond] [offset_hour sign:mandatory][offset_minute]"
);
pub fn deserialize_alert<'de, D>(deserializer: D) -> Result<PrimitiveDateTime, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
PrimitiveDateTime::parse(s, ALERT_FORMAT).map_err(serde::de::Error::custom)
}
pub fn deserialize_alerts<'de, D>(deserializer: D) -> Result<OffsetDateTime, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
OffsetDateTime::parse(s, ALERTS_FORMAT).map_err(serde::de::Error::custom)
}

View file

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Description {
pub description: String,
pub description_gtrans_origin: Option<String>,
}

10
app/src/json/image.rs Normal file
View file

@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Image {
pub uri: String,
pub digest: String,
pub index: u64,
pub size: String,
}

View file

@ -0,0 +1,11 @@
use serde::{Deserialize, Serialize};
use super::description::Description;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ImageDescription {
pub index: u64,
#[serde(flatten)]
pub description: Description,
}

View file

@ -0,0 +1,12 @@
use serde::{Deserialize, Serialize};
use super::title::Title;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ImageTitle {
pub index: u64,
#[serde(flatten)]
pub title: Title,
pub alt_text: String,
}

8
app/src/json/link.rs Normal file
View file

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Link {
pub href: String,
pub text: String,
}

9
app/src/json/polygon.rs Normal file
View file

@ -0,0 +1,9 @@
use serde::{Deserialize, Serialize};
use super::coordinate::Coordinate;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Polygon {
pub coordinates: Vec<Coordinate>,
}

7
app/src/json/region.rs Normal file
View file

@ -0,0 +1,7 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Region {
pub region: String,
}

8
app/src/json/text.rs Normal file
View file

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Text {
pub text: String,
pub text_origin: Option<String>,
}

8
app/src/json/title.rs Normal file
View file

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Title {
pub title: String,
pub title_gtrans_origin: Option<String>,
}

81
app/src/lib.rs Normal file
View file

@ -0,0 +1,81 @@
use std::time::Duration;
use config::Config;
use datastore::rdbms::Db;
use error::Error;
use futures::{stream, StreamExt};
use tokio::time;
use tracing::{error, info};
use crate::json::alert::Alert;
pub mod alert_swiss;
pub mod cli;
pub mod config;
pub mod error;
pub mod datastore {
pub mod rdbms;
}
mod json {
pub mod alert;
pub mod alerts;
pub mod area;
pub mod circle;
pub mod contact;
pub mod coordinate;
pub mod datetime;
pub mod description;
pub mod image;
pub mod image_description;
pub mod image_title;
pub mod link;
pub mod polygon;
pub mod region;
pub mod text;
pub mod title;
}
pub mod logging;
async fn handle_alerts(db: &Db) -> Result<Vec<Alert>, Error> {
info!("checking alerts");
let alerts = alert_swiss::fetch_alerts().await?;
let alerts: Vec<Alert> = alerts
.list
.into_iter()
.filter(|alert| !alert.identifier.starts_with("TEST-"))
.collect();
let new_alerts: Vec<Alert> = stream::iter(alerts)
.filter_map(|alert| async move {
if db.is_alert_updated(&alert).await.ok().unwrap_or(false) {
Some(alert)
} else {
None
}
})
.collect()
.await;
stream::iter(new_alerts.clone())
.for_each_concurrent(None, |alert| async move {
info!("saving alert {}", alert.identifier);
match db.save_alert(&alert).await {
Ok(_) => {}
Err(e) => error!("failed to save alert: {e}"),
};
})
.await;
Ok(new_alerts)
}
pub async fn run(config: Config) -> Result<(), Error> {
let mut interval = time::interval(Duration::from_secs(config.interval));
let db = &datastore::rdbms::connect(&config.connection).await?;
loop {
interval.tick().await;
let new_alerts = handle_alerts(db).await?;
}
}

14
app/src/logging.rs Normal file
View file

@ -0,0 +1,14 @@
use tracing::debug;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
pub fn setup(bin_name: &str) {
let default_config = format!("{}=info,tower_http=info,sqlx=error", bin_name);
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| default_config.into()),
)
.with(tracing_subscriber::fmt::layer().with_target(true))
.init();
debug!("tracing/logging is setup");
}

18
app/src/main.rs Normal file
View file

@ -0,0 +1,18 @@
use std::process;
use alert_me::{cli::Cli, logging};
use clap::Parser;
use tracing::error;
#[tokio::main]
async fn main() {
logging::setup("alert_me");
let args = Cli::parse();
let config = args.into();
if let Err(e) = alert_me::run(config).await {
error!("Application error: {e}");
process::exit(1);
}
}