initial commit
This commit is contained in:
commit
17a8e573c0
40 changed files with 4009 additions and 0 deletions
8
mqtt-protocol/Cargo.toml
Normal file
8
mqtt-protocol/Cargo.toml
Normal file
|
@ -0,0 +1,8 @@
|
|||
[package]
|
||||
name = "mqtt-protocol"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
description.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
128
mqtt-protocol/src/connect.rs
Normal file
128
mqtt-protocol/src/connect.rs
Normal file
|
@ -0,0 +1,128 @@
|
|||
use crate::fixed_header::FixedHeader;
|
||||
|
||||
// [MQTT 5.0: 3.1.3.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349242)
|
||||
const ALLOWED_CLIENT_ID_CHARS: &str =
|
||||
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
||||
// [MQTT 5.0: 3.1.3.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349242)
|
||||
const MAX_CLIENT_ID_SIZE: usize = 23;
|
||||
|
||||
// [MQTT 5.0: 3.1.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_CONNECT_Fixed_Header)
|
||||
const FIXED_CONNECT_TYPE: u8 = 0x01;
|
||||
// [MQTT 5.0: 3.1.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_CONNECT_Fixed_Header)
|
||||
const FIXED_CONNECT_FLAGS: u8 = 0x00;
|
||||
|
||||
// "MQTT" [MQTT 5.0: 3.1.2.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349225)
|
||||
const PROTOCOL_NAME: [u8; 6] = [0x00, 0x04, 0x4d, 0x51, 0x54, 0x54];
|
||||
// "version 5" [MQTT 5.0: 3.1.2.2](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349227)
|
||||
const PROTOCOL_VERSION: [u8; 1] = [0x05];
|
||||
// no user name, no will qos, no will, no clean start [MQTT 5.0: 3.1.2.3 - 3.1.2.9](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349229)
|
||||
const CONNECT_FLAGS: [u8; 1] = [0x00];
|
||||
// turn off keep alive mechanism [MQTT 5.0: 3.1.2.10](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Keep_Alive_1)
|
||||
const KEEP_ALIVE: [u8; 2] = [0x00, 0x00];
|
||||
// no properties [MQTT 5.0: 3.1.2.11](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc511988523)
|
||||
const PROPERTIES: [u8; 1] = [0x00];
|
||||
|
||||
// calculate position in the data array (those that are constant)
|
||||
const PROTOCOL_NAME_IDX: usize = 0;
|
||||
const PROTOCOL_VERSION_IDX: usize = PROTOCOL_NAME_IDX + PROTOCOL_NAME.len();
|
||||
const CONNECT_FLAGS_IDX: usize = PROTOCOL_VERSION_IDX + PROTOCOL_VERSION.len();
|
||||
const KEEP_ALIVE_IDX: usize = CONNECT_FLAGS_IDX + CONNECT_FLAGS.len();
|
||||
const PROPERTIES_IDX: usize = KEEP_ALIVE_IDX + KEEP_ALIVE.len();
|
||||
const PAYLOAD_IDX: usize = PROPERTIES_IDX + PROPERTIES.len();
|
||||
|
||||
// max size the variable header + payload of a connect packet
|
||||
const MAX_PACKET_LENGTH: usize = PROTOCOL_NAME.len()
|
||||
+ PROTOCOL_VERSION.len()
|
||||
+ CONNECT_FLAGS.len()
|
||||
+ KEEP_ALIVE.len()
|
||||
+ PROPERTIES.len()
|
||||
+ MAX_CLIENT_ID_SIZE
|
||||
+ 2; // two bytes for the client id length
|
||||
|
||||
// max size a whole connect packet can be
|
||||
const MAX_CONNECT_LENGTH: usize = MAX_PACKET_LENGTH + 5; // 5 is the max length of the fixed header
|
||||
|
||||
pub struct Connect {
|
||||
// byte representation of the connect packet
|
||||
pub data: [u8; MAX_CONNECT_LENGTH],
|
||||
// actual length of the whole connect packet
|
||||
pub length: usize,
|
||||
}
|
||||
|
||||
impl Connect {
|
||||
/// Create an MQTT connect packet.
|
||||
///
|
||||
/// See [MQTT 5.0: 3.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_CONNECT_%E2%80%93_Connection)
|
||||
/// for the byte structure.
|
||||
///
|
||||
/// `client_id` is how the client identifies itself. According to
|
||||
/// [MQTT 5.0: 3.1.3.1]((https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc385349242)
|
||||
/// this is not strictly necessary but a server does not have to accept this.
|
||||
///
|
||||
/// The same holds for the limitation of 23 bytes for the `client_id`.
|
||||
pub fn new(client_id: &str) -> Self {
|
||||
assert!(client_id
|
||||
.chars()
|
||||
.all(|x| ALLOWED_CLIENT_ID_CHARS.contains(x)));
|
||||
|
||||
let client_id_data = client_id.as_bytes();
|
||||
assert!(!client_id_data.is_empty() && client_id_data.len() <= MAX_CLIENT_ID_SIZE);
|
||||
|
||||
let client_id_length = client_id_data.len() as u16;
|
||||
let high_byte = ((client_id_length & 0xFF00) >> 8) as u8;
|
||||
let low_byte = (client_id_length & 0x00FF) as u8;
|
||||
|
||||
let mut payload_data = [0u8; MAX_CLIENT_ID_SIZE + 2];
|
||||
payload_data[0] = high_byte;
|
||||
payload_data[1] = low_byte;
|
||||
payload_data[2..2 + client_id_data.len()].copy_from_slice(client_id_data);
|
||||
|
||||
let packet_length = MAX_PACKET_LENGTH - MAX_CLIENT_ID_SIZE + client_id_data.len();
|
||||
let fixed_header = FixedHeader::new(
|
||||
FIXED_CONNECT_TYPE,
|
||||
FIXED_CONNECT_FLAGS,
|
||||
packet_length as u32,
|
||||
);
|
||||
|
||||
let mut data = [0u8; MAX_CONNECT_LENGTH];
|
||||
data[0] = fixed_header.type_flags;
|
||||
data[1..1 + fixed_header.length]
|
||||
.copy_from_slice(&fixed_header.remaining_length[0..fixed_header.length]);
|
||||
let fixed_offset = fixed_header.length + 1;
|
||||
|
||||
data[fixed_offset + PROTOCOL_NAME_IDX..fixed_offset + PROTOCOL_VERSION_IDX]
|
||||
.copy_from_slice(&PROTOCOL_NAME);
|
||||
data[fixed_offset + PROTOCOL_VERSION_IDX..fixed_offset + CONNECT_FLAGS_IDX]
|
||||
.copy_from_slice(&PROTOCOL_VERSION);
|
||||
data[fixed_offset + CONNECT_FLAGS_IDX..fixed_offset + KEEP_ALIVE_IDX]
|
||||
.copy_from_slice(&CONNECT_FLAGS);
|
||||
data[fixed_offset + KEEP_ALIVE_IDX..fixed_offset + PROPERTIES_IDX]
|
||||
.copy_from_slice(&KEEP_ALIVE);
|
||||
data[fixed_offset + PROPERTIES_IDX..fixed_offset + PAYLOAD_IDX]
|
||||
.copy_from_slice(&PROPERTIES);
|
||||
data[fixed_offset + PAYLOAD_IDX..fixed_offset + PAYLOAD_IDX + client_id_data.len() + 2]
|
||||
.copy_from_slice(&payload_data[0..client_id_data.len() + 2]);
|
||||
|
||||
Self {
|
||||
data,
|
||||
length: packet_length + fixed_header.length + 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Connect;
|
||||
|
||||
#[test]
|
||||
fn test_encode_decode_length_1_byte() {
|
||||
let connect = Connect::new("Tomato");
|
||||
assert_eq!(
|
||||
connect.data[0..connect.length],
|
||||
[
|
||||
0x10, 0x13, 0x00, 0x04, 0x4d, 0x51, 0x54, 0x54, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x06, 0x54, 0x6f, 0x6d, 0x61, 0x74, 0x6f
|
||||
]
|
||||
)
|
||||
}
|
||||
}
|
28
mqtt-protocol/src/fixed_header.rs
Normal file
28
mqtt-protocol/src/fixed_header.rs
Normal file
|
@ -0,0 +1,28 @@
|
|||
use super::variable_length::encode_length;
|
||||
|
||||
pub struct FixedHeader {
|
||||
pub type_flags: u8,
|
||||
// length of variable header + payload, encoded as a variable byte integer
|
||||
// ([MQTT 5.0: 1.5.5](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc473619950))
|
||||
pub remaining_length: [u8; 4],
|
||||
// how many bytes in the remaining length field are relevant (left to right)
|
||||
pub length: usize,
|
||||
}
|
||||
|
||||
impl FixedHeader {
|
||||
/// Create a fixed header ([MQTT 5.0: 2.1.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc511988498)) for an MQTT packet.
|
||||
///
|
||||
/// - `packet_type`: [MQTT 5.0: 2.1.2](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc353481061)
|
||||
/// - `flags`: [MQTT 5.0: 2.1.3](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc353481062)
|
||||
/// - `remaining_length`: Length of the variable header + payload.
|
||||
pub fn new(packet_type: u8, flags: u8, remaining_length: u32) -> Self {
|
||||
let mut remaining_length_data = [0u8; 4];
|
||||
let length = encode_length(remaining_length, &mut remaining_length_data);
|
||||
|
||||
Self {
|
||||
type_flags: packet_type << 4 | (flags & 0x0F),
|
||||
remaining_length: remaining_length_data,
|
||||
length: length.into(),
|
||||
}
|
||||
}
|
||||
}
|
10
mqtt-protocol/src/lib.rs
Normal file
10
mqtt-protocol/src/lib.rs
Normal file
|
@ -0,0 +1,10 @@
|
|||
//! Implement just enough of the MQTT protocol to be able to publish data with
|
||||
//! QOS 0.
|
||||
//! All formats are taken from [MQTT Version 5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html).
|
||||
|
||||
#![cfg_attr(not(test), no_std)]
|
||||
|
||||
pub mod connect;
|
||||
pub mod fixed_header;
|
||||
pub mod publish;
|
||||
pub mod variable_length;
|
80
mqtt-protocol/src/publish.rs
Normal file
80
mqtt-protocol/src/publish.rs
Normal file
|
@ -0,0 +1,80 @@
|
|||
use crate::fixed_header::FixedHeader;
|
||||
|
||||
// [MQTT 5.0: 3.3.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc359155681)
|
||||
const FIXED_PUBLISH_TYPE: u8 = 0x03;
|
||||
// no flags set [MQTT 5.0: 3.3.1](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc359155681)
|
||||
const FIXED_PUBLISH_FLAGS: u8 = 0x00;
|
||||
|
||||
// no properties [MQTT 5.0: 3.3.2.3](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc511988586)
|
||||
const PROPERTIES: [u8; 1] = [0x00];
|
||||
|
||||
// technically this is 65535 (see https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_UTF-8_Encoded_String)
|
||||
// but we do not need that here
|
||||
pub const MAX_TOPIC_LENGTH: usize = 254;
|
||||
|
||||
// this is again arbitrarilly constricted, technically it could be as long as the variable length
|
||||
// encoding allows (minus fixed and variable header)
|
||||
const MAX_PAYLOAD_LENGTH: usize = 128;
|
||||
// 7 is 5 + 2 which is max length of the fixed header and the two bytes used to encode the topic
|
||||
// length
|
||||
const MAX_PUBLISH_LENGTH: usize = MAX_TOPIC_LENGTH + 7 + MAX_PAYLOAD_LENGTH;
|
||||
|
||||
/// Encode a string to an MQTT length encoded string
|
||||
/// [MQTT 5.0: 1.5.4](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc462729066)
|
||||
fn encode_mqtt_str(topic: &str, encode_buffer: &mut [u8; MAX_TOPIC_LENGTH]) -> usize {
|
||||
let topic_data = topic.as_bytes();
|
||||
assert!(topic_data.len() <= MAX_TOPIC_LENGTH);
|
||||
|
||||
encode_buffer[0] = 0; // no need for the high bits as our max topic length is too small
|
||||
encode_buffer[1] = topic_data.len() as u8;
|
||||
encode_buffer[2..2 + topic_data.len()].copy_from_slice(topic_data);
|
||||
|
||||
topic_data.len() + 2
|
||||
}
|
||||
|
||||
pub struct Publish {
|
||||
// byte representation of the publish packet
|
||||
pub data: [u8; MAX_PUBLISH_LENGTH],
|
||||
// actual length of the whole publish packet
|
||||
pub length: usize,
|
||||
}
|
||||
|
||||
impl Publish {
|
||||
/// Create an MQTT publish packet.
|
||||
///
|
||||
/// See [MQTT 5.0: 3.3](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc384800410)
|
||||
/// for the byte structure.
|
||||
///
|
||||
/// `topic`: defines the topic to publish to
|
||||
/// `payload` value to publish as is
|
||||
pub fn new(topic: &str, payload: &[u8]) -> Self {
|
||||
let mut topic_data = [0u8; MAX_TOPIC_LENGTH];
|
||||
let topic_length = encode_mqtt_str(topic, &mut topic_data);
|
||||
|
||||
let packet_length = PROPERTIES.len() + topic_length + 2 + payload.len();
|
||||
let fixed_header = FixedHeader::new(
|
||||
FIXED_PUBLISH_TYPE,
|
||||
FIXED_PUBLISH_FLAGS,
|
||||
packet_length as u32,
|
||||
);
|
||||
|
||||
let mut data = [0u8; MAX_PUBLISH_LENGTH];
|
||||
data[0] = fixed_header.type_flags;
|
||||
data[1..1 + fixed_header.length]
|
||||
.copy_from_slice(&fixed_header.remaining_length[0..fixed_header.length]);
|
||||
let fixed_offset = fixed_header.length + 1;
|
||||
|
||||
let topic_idx = fixed_offset;
|
||||
let properties_idx = topic_idx + topic_length;
|
||||
let payload_idx = properties_idx + PROPERTIES.len();
|
||||
|
||||
data[topic_idx..properties_idx].copy_from_slice(&topic_data[0..topic_length]);
|
||||
data[properties_idx..payload_idx].copy_from_slice(&PROPERTIES);
|
||||
data[payload_idx..payload_idx + payload.len()].copy_from_slice(payload);
|
||||
|
||||
Self {
|
||||
data,
|
||||
length: packet_length + fixed_header.length + 1,
|
||||
}
|
||||
}
|
||||
}
|
94
mqtt-protocol/src/variable_length.rs
Normal file
94
mqtt-protocol/src/variable_length.rs
Normal file
|
@ -0,0 +1,94 @@
|
|||
/// Encode an integer as a variable byte integer
|
||||
/// [MQTT 5.0: 1.5.5](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc473619950)
|
||||
pub fn encode_length(length: u32, encoded_length: &mut [u8; 4]) -> u8 {
|
||||
let mut length = length;
|
||||
|
||||
let mut idx = 0;
|
||||
while idx < encoded_length.len() && length > 0 {
|
||||
let mut encoded_byte = (length % 128) as u8;
|
||||
length /= 128;
|
||||
|
||||
if length > 0 {
|
||||
encoded_byte |= 0x80; // Set the highest bit
|
||||
}
|
||||
encoded_length[idx] = encoded_byte;
|
||||
|
||||
idx += 1;
|
||||
}
|
||||
|
||||
idx as u8
|
||||
}
|
||||
|
||||
/// Decode a variable byte integer as an integer.
|
||||
/// [MQTT 5.0: 1.5.5](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc473619950)
|
||||
pub fn decode_length(bytes: [u8; 4]) -> u32 {
|
||||
let mut length = 0u32;
|
||||
let mut multiplier = 1u32;
|
||||
|
||||
let mut idx = 0;
|
||||
let mut byte = bytes[idx];
|
||||
|
||||
while idx < bytes.len() && byte & 0x80 != 0 {
|
||||
length += ((byte & 0x7F) as u32) * multiplier;
|
||||
multiplier *= 128;
|
||||
|
||||
idx += 1;
|
||||
byte = bytes[idx]
|
||||
}
|
||||
length += ((byte & 0x7F) as u32) * multiplier;
|
||||
|
||||
length
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::variable_length::{decode_length, encode_length};
|
||||
|
||||
#[test]
|
||||
fn test_encode_decode_length_1_byte() {
|
||||
let mut length = [0u8; 4];
|
||||
let byte_length = encode_length(16, &mut length);
|
||||
|
||||
assert_eq!(byte_length, 1);
|
||||
assert_eq!(length, [0x10, 0x00, 0x00, 0x00]);
|
||||
|
||||
let length = decode_length(length);
|
||||
assert_eq!(length, 16);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_decode_length_2_bytes() {
|
||||
let mut length = [0u8; 4];
|
||||
let byte_length = encode_length(568, &mut length);
|
||||
|
||||
assert_eq!(byte_length, 2);
|
||||
assert_eq!(length, [0xb8, 0x04, 0x00, 0x00]);
|
||||
|
||||
let length = decode_length(length);
|
||||
assert_eq!(length, 568);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_decode_length_3_bytes() {
|
||||
let mut length = [0u8; 4];
|
||||
let byte_length = encode_length(85734, &mut length);
|
||||
|
||||
assert_eq!(byte_length, 3);
|
||||
assert_eq!(length, [0xe6, 0x9d, 0x05, 0x00]);
|
||||
|
||||
let length = decode_length(length);
|
||||
assert_eq!(length, 85734);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_length_4_bytes() {
|
||||
let mut length = [0u8; 4];
|
||||
let byte_length = encode_length(8573471, &mut length);
|
||||
|
||||
assert_eq!(byte_length, 4);
|
||||
assert_eq!(length, [0x9f, 0xa4, 0x8b, 0x04]);
|
||||
|
||||
let length = decode_length(length);
|
||||
assert_eq!(length, 8573471);
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue