split bank ad socket server up

This commit is contained in:
Sebastian Hugentobler 2022-03-16 09:51:29 +01:00
parent 3d9c98eeca
commit c69654a924
22 changed files with 342 additions and 43 deletions

10
socket-server/Cargo.toml Normal file
View file

@ -0,0 +1,10 @@
[package]
name = "socket-server"
version = "0.1.0"
edition = "2021"
[dependencies]
bank = { path = "../bank" }
anyhow = "1.0.55"
log = "0.4.14"
pretty_env_logger = "0.4.0"

View file

@ -0,0 +1,33 @@
use std::io::Write;
use std::net::TcpStream;
use std::sync::{Arc, RwLock};
use anyhow::Result;
use bank::bank::Bank;
use crate::commands::Command;
use crate::protocol;
pub struct CloseAccount;
impl Command for CloseAccount {
fn execute(&self, bank: Arc<RwLock<Bank>>, data: &[u8], mut stream: &TcpStream) -> Result<usize> {
debug!("account nr bytes {:?}", data);
let nr = String::from_utf8_lossy(data).to_string();
info!("closing account {}...", nr);
let bank = bank.read().unwrap();
let written = match bank.accounts.get(&nr) {
Some(acc) => {
let mut acc = acc.write().unwrap();
stream.write(&protocol::account_passivate(acc.passivate()))?
}
None => stream.write(&protocol::account_passivate(false))?
};
Ok(written)
}
}

View file

@ -0,0 +1,28 @@
use std::io::Write;
use std::net::TcpStream;
use std::sync::{Arc, RwLock};
use anyhow::Result;
use bank::bank::Bank;
use crate::commands::Command;
use crate::protocol;
pub struct CreateAccount;
impl Command for CreateAccount {
fn execute(&self, bank: Arc<RwLock<Bank>>, data: &[u8], mut stream: &TcpStream) -> Result<usize> {
debug!("owner nr bytes {:?}", data);
let owner = String::from_utf8_lossy(data);
info!("creating new account with owner {}...", owner);
let mut bank = bank.write().unwrap();
let nr = bank.create_account(owner.into());
info!("created account {}", nr);
let written = stream.write(&protocol::account_nr(&nr))?;
Ok(written)
}
}

View file

@ -0,0 +1,38 @@
use std::io::Write;
use std::net::TcpStream;
use std::sync::{Arc, RwLock};
use anyhow::Result;
use bank::bank::Bank;
use crate::commands::Command;
use crate::protocol;
pub struct Deposit;
impl Command for Deposit {
fn execute(&self, bank: Arc<RwLock<Bank>>, data: &[u8], mut stream: &TcpStream) -> Result<usize> {
let value_bytes: [u8; 8] = <[u8; 8]>::try_from(data[..8].to_vec().as_slice())?;
debug!("value bytes {:?}", value_bytes);
let nr_bytes = &data[8..].to_vec();
debug!("nr bytes {:?}", nr_bytes);
let amount = f64::from_be_bytes(value_bytes);
let nr = String::from_utf8_lossy(nr_bytes).to_string();
info!("depositing {} into {}...", amount, nr);
let bank = bank.read().unwrap();
let written = match bank.accounts.get(&nr) {
Some(acc) => {
let mut acc = acc.write().unwrap();
stream.write(&protocol::deposit(acc.deposit(amount)))?
}
None => stream.write(&protocol::error(2))?
};
Ok(written)
}
}

View file

@ -0,0 +1,20 @@
use std::io::Write;
use std::net::TcpStream;
use std::sync::{Arc, RwLock};
use anyhow::Result;
use bank::bank::Bank;
use crate::commands::Command;
use crate::protocol;
pub struct Fail;
impl Command for Fail {
fn execute(&self, _: Arc<RwLock<Bank>>, error_code: &[u8], mut stream: &TcpStream) -> Result<usize> {
error!("sending error code {}", error_code[0]);
let written = stream.write(&protocol::error(error_code[0]))?;
Ok(written)
}
}

View file

@ -0,0 +1,33 @@
use std::io::Write;
use std::net::TcpStream;
use std::sync::{Arc, RwLock};
use anyhow::Result;
use bank::bank::Bank;
use crate::commands::Command;
use crate::protocol;
pub struct GetAccount;
impl Command for GetAccount {
fn execute(&self, bank: Arc<RwLock<Bank>>, data: &[u8], mut stream: &TcpStream) -> Result<usize> {
debug!("account nr bytes {:?}", data);
let nr = String::from_utf8_lossy(data).to_string();
info!("getting account {:?}...", nr);
let bank = bank.read().unwrap();
let written = match bank.accounts.get(&nr) {
Some(acc) => {
let acc = acc.read().unwrap();
stream.write(&protocol::account(&acc))?
}
None => stream.write(&protocol::error(2))?
};
Ok(written)
}
}

View file

@ -0,0 +1,27 @@
use std::io::Write;
use std::net::TcpStream;
use std::sync::{Arc, RwLock};
use anyhow::Result;
use bank::bank::Bank;
use crate::commands::Command;
use crate::protocol;
pub struct GetAccountNrs;
impl Command for GetAccountNrs {
fn execute(&self, bank: Arc<RwLock<Bank>>, _: &[u8], mut stream: &TcpStream) -> Result<usize> {
info!("getting account numbers...");
let bank = bank.read().unwrap();
let nrs: Vec<String> = bank.account_numbers()
.into_iter()
.collect();
let written = stream.write(&protocol::account_nrs(&nrs))?;
Ok(written)
}
}

View file

@ -0,0 +1,78 @@
use std::collections::HashMap;
use std::net::TcpStream;
use std::sync::{Arc, RwLock};
use anyhow::Result;
use bank::account::AccountError;
use bank::bank::Bank;
use crate::commands::close_account::CloseAccount;
use crate::commands::create_account::CreateAccount;
use crate::commands::deposit::Deposit;
use crate::commands::fail::Fail;
use crate::commands::get_account::GetAccount;
use crate::commands::get_account_nrs::GetAccountNrs;
use crate::commands::pong::Pong;
use crate::commands::withdraw::Withdraw;
mod pong;
mod fail;
mod close_account;
mod create_account;
mod deposit;
mod get_account;
mod get_account_nrs;
mod withdraw;
pub trait Command: Sync + Send {
fn execute(&self, bank: Arc<RwLock<Bank>>, data: &[u8], stream: &TcpStream) -> Result<usize>;
}
pub struct Commands {
cmds: HashMap<u8, Box<dyn Command>>,
}
struct WebAccountError(AccountError);
impl From<&WebAccountError> for u8 {
fn from(error: &WebAccountError) -> Self {
match error.0 {
AccountError::Overdraw() => 11,
AccountError::Inactive() => 12,
AccountError::InvalidAmount() => 13,
}
}
}
pub fn error_to_code(error: &AccountError) -> u8 {
match error {
AccountError::Overdraw() => 11,
AccountError::Inactive() => 12,
AccountError::InvalidAmount() => 13,
}
}
impl Commands {
pub fn new() -> Self {
let mut cmds = HashMap::new();
cmds.insert(1_u8, Box::new(Pong) as Box<dyn Command>);
cmds.insert(2_u8, Box::new(CreateAccount) as Box<dyn Command>);
cmds.insert(3_u8, Box::new(GetAccount) as Box<dyn Command>);
cmds.insert(4_u8, Box::new(GetAccountNrs) as Box<dyn Command>);
cmds.insert(5_u8, Box::new(CloseAccount) as Box<dyn Command>);
cmds.insert(6_u8, Box::new(Deposit) as Box<dyn Command>);
cmds.insert(7_u8, Box::new(Withdraw) as Box<dyn Command>);
Self {
cmds
}
}
pub fn run(&self, bank: Arc<RwLock<Bank>>, cmd: u8, data: &[u8], stream: &TcpStream) -> Result<usize> {
match self.cmds.get(&cmd) {
None => Fail.execute(bank, &[1_u8], stream),
Some(cmd) => cmd.execute(bank, data, stream)
}
}
}

View file

@ -0,0 +1,20 @@
use std::io::Write;
use std::net::TcpStream;
use std::sync::{Arc, RwLock};
use anyhow::Result;
use bank::bank::Bank;
use crate::commands::Command;
use crate::protocol;
pub struct Pong;
impl Command for Pong {
fn execute(&self, _: Arc<RwLock<Bank>>, _: &[u8], mut stream: &TcpStream) -> Result<usize> {
info!("sending 'pong'");
let written = stream.write(&[protocol::PONG])?;
Ok(written)
}
}

View file

@ -0,0 +1,38 @@
use std::io::Write;
use std::net::TcpStream;
use std::sync::{Arc, RwLock};
use anyhow::Result;
use bank::bank::Bank;
use crate::commands::Command;
use crate::protocol;
pub struct Withdraw;
impl Command for Withdraw {
fn execute(&self, bank: Arc<RwLock<Bank>>, data: &[u8], mut stream: &TcpStream) -> Result<usize> {
let value_bytes: [u8; 8] = <[u8; 8]>::try_from(data[..8].to_vec().as_slice())?;
debug!("value bytes {:?}", value_bytes);
let nr_bytes = &data[8..].to_vec();
debug!("nr bytes {:?}", nr_bytes);
let amount = f64::from_be_bytes(value_bytes);
let nr = String::from_utf8_lossy(nr_bytes).to_string();
info!("withdrawing {} from {}...", amount, nr);
let bank = bank.read().unwrap();
let written = match bank.accounts.get(&nr) {
Some(acc) => {
let mut acc = acc.write().unwrap();
stream.write(&protocol::withdraw(acc.withdraw(amount)))?
}
None => stream.write(&protocol::error(2))?
};
Ok(written)
}
}

15
socket-server/src/main.rs Normal file
View file

@ -0,0 +1,15 @@
#[macro_use]
extern crate log;
mod protocol;
mod server;
mod threadpool;
mod commands;
fn main() {
pretty_env_logger::init();
let host = "127.0.0.1:1234";
info!("running on {}", host);
server::run(host, 8).expect("failed to run server");
}

View file

@ -0,0 +1,88 @@
use anyhow::Result;
use bank::account::{Account, AccountError};
use crate::commands::error_to_code;
pub const START: [u8; 2] = [0xde, 0xad];
pub const PONG: u8 = 0b0010_0000;
fn to_error_code(error: anyhow::Error, default: u8) -> u8 {
match error.root_cause().downcast_ref::<AccountError>() {
Some(e) => error_to_code(e),
None => default,
}
}
pub fn account_nr(nr: &str) -> Vec<u8> {
let mut response = vec![PONG];
response.append(&mut nr.as_bytes().to_vec());
debug!("account number bytes: {:?}", response);
response
}
pub fn account_passivate(was_passivated: bool) -> Vec<u8> {
let is_active_byte: u8 = if was_passivated { 1 } else { 0 };
vec![PONG | is_active_byte]
}
pub fn deposit(result: Result<()>) -> Vec<u8> {
match result {
Err(e) => error(to_error_code(e, 10)).to_vec(),
Ok(_) => vec![PONG]
}
}
pub fn withdraw(result: Result<()>) -> Vec<u8> {
match result {
Err(e) => error(to_error_code(e, 10)).to_vec(),
Ok(_) => vec![PONG]
}
}
pub fn account_nrs(nrs: &[String]) -> Vec<u8> {
let mut response = vec![PONG];
nrs.iter().for_each(|key| {
let key_bytes = key.as_bytes();
debug!("account number bytes: {:?}", key_bytes);
response.push(key_bytes.len() as u8);
response.append(&mut key_bytes.to_vec());
});
debug!("account numbers bytes: {:?}", response);
response
}
pub fn account(account: &Account) -> Vec<u8> {
let is_active_byte: u8 = if account.is_active { 1 } else { 0 };
let mut response = vec![PONG | is_active_byte];
let nr_bytes = account.number.as_bytes();
let mut nr_bytes_sized = vec![nr_bytes.len() as u8];
nr_bytes_sized.append(&mut nr_bytes.to_vec());
debug!("nr bytes: {:?}", nr_bytes_sized);
let owner_bytes = account.owner.as_bytes();
let mut owner_bytes_sized = vec![owner_bytes.len() as u8];
owner_bytes_sized.append(&mut owner_bytes.to_vec());
debug!("owner bytes: {:?}", owner_bytes_sized);
let balance: [u8; 8] = account.balance.to_be_bytes();
debug!("balance bytes: {:?}", balance);
response.append(&mut nr_bytes_sized);
response.append(&mut owner_bytes_sized);
response.append(&mut balance.to_vec());
debug!("account response: {:?}", response);
response
}
pub fn error(code: u8) -> [u8; 1] {
[0b0100_0000 | code]
}

View file

@ -0,0 +1,61 @@
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, RwLock};
use anyhow::{bail, Result};
use bank::bank::Bank;
use crate::commands::Commands;
use crate::protocol;
use crate::threadpool::ThreadPool;
pub fn run(host: &str, threads: usize) -> Result<()> {
let listener = TcpListener::bind(host)?;
let pool = ThreadPool::new(threads);
let cmds = Arc::new(Commands::new());
let bank = Arc::new(RwLock::new(Bank::new()));
for stream in listener.incoming() {
let cmds = cmds.clone();
let bank = bank.clone();
let stream = stream?;
pool.execute(move || {
if let Err(e) = handle_connection(bank, &cmds, stream) {
error!("{}", e);
}
});
}
Ok(())
}
fn handle_connection(bank: Arc<RwLock<Bank>>, cmds: &Commands, mut stream: TcpStream) -> Result<()> {
const BUF_SIZE: usize = 64;
let mut data: Vec<u8> = vec![];
let mut buffer = [0; BUF_SIZE];
let mut read = stream.read(&mut buffer)?;
while read > 0 {
data.append(&mut buffer[..read].to_vec());
read = stream.read(&mut buffer)?;
}
if data[..2] != protocol::START {
bail!("got {:?} as first bytes, not my problem", &data[..2]);
}
let cmd: u8 = (data[2] & 0b1111_0000) >> 4;
trace!("got command {}", cmd);
cmds.run(bank, cmd, &data[3..], &stream)?;
stream.flush()?;
Ok(())
}

View file

@ -0,0 +1,94 @@
use std::sync::Arc;
use std::sync::mpsc;
use std::sync::Mutex;
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NewJob(Job),
Terminate,
}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
trace!("Sending terminate message to all workers.");
for _ in &self.workers {
self.sender.send(Message::Terminate).unwrap();
}
trace!("Shutting down all workers.");
for worker in &mut self.workers {
trace!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
trace!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
trace!("Worker {} was told to terminate.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}