alert-me/app/src/mqtt.rs

54 lines
1.6 KiB
Rust

use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS};
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tracing::debug;
use crate::{config::MqttConfig, error::Error};
pub struct Mqtt {
client: AsyncClient,
eventloop: Arc<Mutex<EventLoop>>,
event_loop_running: Arc<Mutex<bool>>,
}
impl Mqtt {
async fn run_event_loop(&self) {
let mut event_loop_running = self.event_loop_running.lock().await;
if !*event_loop_running {
*event_loop_running = true;
let eventloop = Arc::clone(&self.eventloop);
tokio::spawn(async move {
let mut eventloop = eventloop.lock().await;
while let Ok(notification) = eventloop.poll().await {
debug!("Received = {:?}", notification);
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
}
}
pub async fn publish(&self, payload: &[u8]) -> Result<(), Error> {
self.run_event_loop().await;
self.client
.publish("alerts", QoS::AtLeastOnce, false, payload)
.await?;
Ok(())
}
}
pub fn connect(config: &MqttConfig) -> Mqtt {
let mut options = MqttOptions::new("rumqtt-async", config.broker.clone(), config.port);
let options = options
.set_credentials(config.user.clone(), config.password.clone())
.to_owned();
let (client, eventloop) = AsyncClient::new(options, 10);
Mqtt {
client,
eventloop: Arc::new(Mutex::new(eventloop)),
event_loop_running: Arc::new(Mutex::new(false)),
}
}