diff --git a/README.md b/README.md index b4bd6b488634fc48b8b76733c96cb88f813c53b4..7451d117f2f3610baf79135b389ab8c16e57fe04 100644 --- a/README.md +++ b/README.md @@ -5,3 +5,18 @@ Before building: ```sh cargo vendor ``` + +## Monitoring + +After creating the swarm: + +```sh +docker stack deploy --compose-file docker-compose.yml stackhash +python docker/monitor.py +``` + +To delete the stack: + +```sh +docker stack rm stackhash +``` diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000000000000000000000000000000000000..21c2d488e8caf522bc902f13c164ef8f8c82c1e5 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,12 @@ +version: "3.9" + +services: + server: + image: itytophile/hash-server + ports: + - "3000:3000" + slave: + image: itytophile/hash-slave + depends_on: + - server + command: ws://server:3000/ws \ No newline at end of file diff --git a/docker/monitor.py b/docker/monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..7469cf97a1d8aadce2e31a1811d44b7b56e218ba --- /dev/null +++ b/docker/monitor.py @@ -0,0 +1,39 @@ +import websocket +import signal +import sys +import os +import math + +ws = websocket.create_connection("ws://127.0.0.1:3000/ws") +ws.send("queue length") +print("Successfully connected to server") + + +def handler(signum, frame): + print("\nClosing WS connection...") + # timeout=0 car il reçoit pas le closing frame ??? + ws.close(websocket.STATUS_GOING_AWAY, timeout=0) + sys.exit() + + +signal.signal(signal.SIGINT, handler) + +REQUIRED_REPLICAS_COUNT_FOR_LENGTH = [ + (2, 1), + (4, 2), + (8, 4), + (math.inf, 8), +] # (length, count) + +replicas_count = 1 + +while True: + queue_length = int.from_bytes(ws.recv(), "big") + print("queue_length =", queue_length) + for (length, count) in REQUIRED_REPLICAS_COUNT_FOR_LENGTH: + if queue_length <= length: + if replicas_count != count: + print(f"Updating slaves: {replicas_count} ->", count) + os.system(f"docker service scale stackhash_slave={count}") + replicas_count = count + break diff --git a/src/main.rs b/src/main.rs index 1c5fbd82c0932882e149ec355e680f29b8c744e8..b1293a24aad4c5e129df95168e90b4db049b1edf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use futures::{ sink::SinkExt, stream::{SplitSink, SplitStream, StreamExt}, }; -use msg::Message; +use msg::ToSlaveMessage; use std::{ net::SocketAddr, sync::{Arc, Mutex}, @@ -27,13 +27,14 @@ use tracing::{debug, info, warn, Level}; struct AppState { slaves_id_order: Mutex>, request_queue: Mutex)>>, - broadcast_tx: broadcast::Sender, + tx_to_slaves: broadcast::Sender, + tx_to_listeners: broadcast::Sender, } #[derive(Parser, Debug)] #[clap(about, version, author)] struct Args { - #[clap(short, long, default_value = "127.0.0.1:3000")] + #[clap(short, long, default_value = "0.0.0.0:3000")] address: String, } @@ -51,12 +52,14 @@ async fn main() { let addr: SocketAddr = args.address.parse().expect("Can't parse provided address"); - let (broadcast_tx, _) = broadcast::channel(100); + let (tx_to_slaves, _) = broadcast::channel(100); + let (tx_to_listeners, _) = broadcast::channel(100); let app_state = Arc::new(AppState { slaves_id_order: Mutex::new(Vec::new()), request_queue: Mutex::new(Vec::new()), - broadcast_tx, + tx_to_slaves, + tx_to_listeners, }); let app = Router::new() @@ -83,10 +86,17 @@ async fn websocket_handler( } async fn websocket(stream: WebSocket, state: Arc) { - let (mut tx, mut rx) = stream.split(); + let (mut tx_to_client, mut rx_from_client) = stream.split(); - if let Some(Ok(ws::Message::Text(msg))) = rx.next().await { - if msg == "slave" { + let msg = if let Some(Ok(ws::Message::Text(msg))) = rx_from_client.next().await { + msg + } else { + warn!("Error with unknown client"); + return; + }; + + match msg.as_str() { + "slave" => { let slave_id = { // On utilise ce bloc pour drop le mutex le plus tôt possible let mut slaves_id_order = state.slaves_id_order.lock().unwrap(); @@ -98,13 +108,13 @@ async fn websocket(stream: WebSocket, state: Arc) { info!("Slave connected with id = {}", slave_id); tokio::spawn(master_to_slave_relay_task( - state.broadcast_tx.subscribe(), + state.tx_to_slaves.subscribe(), Arc::clone(&state), slave_id, - tx, + tx_to_client, )); - slave_listening_task(rx, slave_id, &state).await; + slave_listening_task(rx_from_client, slave_id, &state).await; state .slaves_id_order @@ -113,49 +123,119 @@ async fn websocket(stream: WebSocket, state: Arc) { .retain(|&id| id != slave_id); info!("Slave {} disconnected", slave_id); + } + "queue length" => { + info!("Listener connected"); - // Arrêt de la tâche d'écoute de l'esclave associé - if let Err(err) = state - .broadcast_tx - .send(Message::SlaveDisconnected(slave_id)) - { - warn!("Can't broadcast: {}", err) - }; - } else { - info!("Dude connected"); + let mut rx = state.tx_to_listeners.subscribe(); - execute_msg(msg, &state); + tokio::spawn(async move { + while let Ok(new_size) = rx.recv().await { + if let Err(err) = tx_to_client + .send(ws::Message::Binary(new_size.to_be_bytes().to_vec())) + .await + { + // Permet de jeter cette tâche qui ne sert plus + // car le listener n'est plus connecté + warn!("Can't send to listener: {}", err); + break; + } + } + }); - while let Some(Ok(msg)) = rx.next().await { + while let Some(Ok(msg)) = rx_from_client.next().await { + match msg { + ws::Message::Close(_) => break, + msg => warn!("Unknown message from listener: {:?}", msg), + } + } + + info!("Listener disconnected"); + } + "dude" => { + info!("Dude connected"); + + while let Some(Ok(msg)) = rx_from_client.next().await { match msg { // On est poli, on pong quand on a un ping ws::Message::Ping(payload) => { debug!("Ping received from dude"); - if tx.send(ws::Message::Pong(payload)).await.is_err() { + if tx_to_client.send(ws::Message::Pong(payload)).await.is_err() { break; } } - ws::Message::Text(msg) => execute_msg(msg, &state), + ws::Message::Text(msg) => match ToSlaveMessage::try_from(msg.as_str()) { + Ok(message) => { + info!("Dude wants to {:?}", message); + + match message { + ToSlaveMessage::Search(hash, range) => { + let mut request_queue = state.request_queue.lock().unwrap(); + + if request_queue.is_empty() { + broadcast_message( + &state.tx_to_slaves, + ToSlaveMessage::Search(hash.clone(), range.clone()), + ); + } else { + debug!("Search request pushed to queue"); + } + + request_queue.push((hash, range)); + + broadcast_message(&state.tx_to_listeners, request_queue.len()) + } + ToSlaveMessage::Stop => { + let mut request_queue = state.request_queue.lock().unwrap(); + if !request_queue.is_empty() { + request_queue.remove(0); + + broadcast_message( + &state.tx_to_listeners, + request_queue.len(), + ); + + broadcast_message(&state.tx_to_slaves, message); + + if !request_queue.is_empty() { + info!("Sending queued request: {:?}", request_queue[0]); + + let (hash, range) = &request_queue[0]; + broadcast_message( + &state.tx_to_slaves, + ToSlaveMessage::Search(hash.clone(), range.clone()), + ); + } + } else { + warn!("Nothing to stop") + } + } + message => broadcast_message(&state.tx_to_slaves, message), + } + } + Err(err) => warn!("{:?}", err), + }, _ => warn!("Non textual message from dude: {:?}", msg), } } info!("Dude disconnected") } - } else { - warn!("Error with unknown client"); + msg => { + warn!("Unknown type provided: {}", msg) + } } } async fn master_to_slave_relay_task( - mut broadcast_rx: broadcast::Receiver, + mut rx_from_master: broadcast::Receiver, state_clone: Arc, slave_id: u32, - mut tx: SplitSink, + mut tx_to_slave: SplitSink, ) { - while let Ok(msg) = broadcast_rx.recv().await { + while let Ok(msg) = rx_from_master.recv().await { let text = match msg { - Message::Search(hash, range) => { + ToSlaveMessage::Search(hash, range) => { let (begin, end) = (range.start, range.end); let slaves_id_order = state_clone.slaves_id_order.lock().unwrap(); let slaves_count = slaves_id_order.len(); @@ -186,18 +266,12 @@ async fn master_to_slave_relay_task( get_word_from_number(slave_end) ) } - Message::Stop => "stop".to_owned(), - Message::Exit => "exit".to_owned(), - Message::SlaveDisconnected(id) => { - if id == slave_id { - break; - } else { - continue; - } - } + ToSlaveMessage::Stop => "stop".to_owned(), + ToSlaveMessage::Exit => "exit".to_owned(), }; // arrêt de la boucle à la moindre erreur - if tx.send(ws::Message::Text(text)).await.is_err() { + if let Err(err) = tx_to_slave.send(ws::Message::Text(text)).await { + warn!("Can't send to slave: {}", err); break; } } @@ -222,23 +296,22 @@ async fn slave_listening_task( "Slave {} found the word {} behind the hash {}. Now stopping all slaves...", slave_id, word, hash ); - - if let Err(err) = state.broadcast_tx.send(Message::Stop) { - warn!("Can't broadcast: {}", err) - } + broadcast_message(&state.tx_to_slaves, ToSlaveMessage::Stop); let mut request_queue = state.request_queue.lock().unwrap(); + request_queue.remove(0); + + broadcast_message(&state.tx_to_listeners, request_queue.len()); + if !request_queue.is_empty() { info!("Sending queued request: {:?}", request_queue[0]); let (hash, range) = &request_queue[0]; - if let Err(err) = state - .broadcast_tx - .send(Message::Search(hash.clone(), range.clone())) - { - warn!("Can't broadcast: {}", err) - } + broadcast_message( + &state.tx_to_slaves, + ToSlaveMessage::Search(hash.clone(), range.clone()), + ); } } _ => warn!("Unknown request from slave {}: {}", slave_id, msg), @@ -252,31 +325,9 @@ async fn slave_listening_task( } } -fn execute_msg(msg: String, state: &Arc) { - match Message::try_from(msg.as_str()) { - Ok(message) => { - info!("Dude wants to {:?}", message); - - if let Message::Search(hash, range) = message { - let mut request_queue = state.request_queue.lock().unwrap(); - - if request_queue.is_empty() { - if let Err(err) = state - .broadcast_tx - .send(Message::Search(hash.clone(), range.clone())) - { - warn!("Can't broadcast: {}", err) - } - } else { - debug!("Search request pushed to queue"); - } - - request_queue.push((hash, range)) - } else if let Err(err) = state.broadcast_tx.send(message) { - warn!("Can't broadcast: {}", err) - } - } - Err(err) => warn!("{:?}", err), +fn broadcast_message(tx: &broadcast::Sender, message: T) { + if let Err(err) = tx.send(message) { + warn!("Can't broadcast: {}", err) } } diff --git a/src/msg.rs b/src/msg.rs index 1fca39dd0bee3b7d9b00e671f259b05907dc5fd9..0110ed50899f38f7ccabffb54df196efd083b6e2 100644 --- a/src/msg.rs +++ b/src/msg.rs @@ -1,12 +1,10 @@ use alphabet::get_number_from_word; #[derive(Debug, Clone)] -pub enum Message { +pub enum ToSlaveMessage { Search(String, std::ops::Range), Stop, Exit, - // Seulement pour le serveur, pour quitter la tâche d'écoute de l'esclave associé - SlaveDisconnected(u32), } #[derive(Debug)] @@ -15,7 +13,7 @@ pub enum ConversionError { WordProblem(String), } -impl TryFrom<&str> for Message { +impl TryFrom<&str> for ToSlaveMessage { type Error = ConversionError; fn try_from(value: &str) -> Result { @@ -24,7 +22,7 @@ impl TryFrom<&str> for Message { &["search", hash, begin, end] => { match (get_number_from_word(begin), get_number_from_word(end)) { (Ok(begin_num), Ok(end_num)) => { - Ok(Message::Search(hash.to_owned(), begin_num..end_num)) + Ok(ToSlaveMessage::Search(hash.to_owned(), begin_num..end_num)) } (Ok(_), Err(err)) => Err(ConversionError::WordProblem(format!( "Problem with end word: {}", @@ -40,8 +38,8 @@ impl TryFrom<&str> for Message { ))), } } - ["stop"] => Ok(Message::Stop), - ["exit"] => Ok(Message::Exit), + ["stop"] => Ok(ToSlaveMessage::Stop), + ["exit"] => Ok(ToSlaveMessage::Exit), _ => Err(ConversionError::UnknownRequest), } } diff --git a/src/slave.rs b/src/slave.rs index 263f9c98803819fdf61bd28498945699f41a2b51..526ff7119173d6b59429aefdf5e75d89ef95859f 100644 --- a/src/slave.rs +++ b/src/slave.rs @@ -18,7 +18,7 @@ type WebSocketSender = SplitSink>, Mes #[derive(Parser, Debug)] #[clap(about, version, author)] struct Args { - #[clap(default_value = "ws://127.0.0.1:3000/ws")] + #[clap(default_value = "ws://0.0.0.0:3000/ws")] ws_address: String, }