From 6c82b32dc02219a00be62b3ea68186f03b0d4add Mon Sep 17 00:00:00 2001 From: Imran Tierce Date: Sat, 25 Dec 2021 23:01:51 +0100 Subject: [PATCH 01/11] Queued request after stop --- src/main.rs | 73 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1c5fbd8..179ff1c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -115,12 +115,7 @@ async fn websocket(stream: WebSocket, state: Arc) { info!("Slave {} disconnected", slave_id); // Arrêt de la tâche d'écoute de l'esclave associé - if let Err(err) = state - .broadcast_tx - .send(Message::SlaveDisconnected(slave_id)) - { - warn!("Can't broadcast: {}", err) - }; + broadcast_message(&state.broadcast_tx, Message::SlaveDisconnected(slave_id)); } else { info!("Dude connected"); @@ -222,10 +217,7 @@ async fn slave_listening_task( "Slave {} found the word {} behind the hash {}. Now stopping all slaves...", slave_id, word, hash ); - - if let Err(err) = state.broadcast_tx.send(Message::Stop) { - warn!("Can't broadcast: {}", err) - } + broadcast_message(&state.broadcast_tx, Message::Stop); let mut request_queue = state.request_queue.lock().unwrap(); request_queue.remove(0); @@ -233,12 +225,10 @@ async fn slave_listening_task( info!("Sending queued request: {:?}", request_queue[0]); let (hash, range) = &request_queue[0]; - if let Err(err) = state - .broadcast_tx - .send(Message::Search(hash.clone(), range.clone())) - { - warn!("Can't broadcast: {}", err) - } + broadcast_message( + &state.broadcast_tx, + Message::Search(hash.clone(), range.clone()), + ); } } _ => warn!("Unknown request from slave {}: {}", slave_id, msg), @@ -257,29 +247,52 @@ fn execute_msg(msg: String, state: &Arc) { Ok(message) => { info!("Dude wants to {:?}", message); - if let Message::Search(hash, range) = message { - let mut request_queue = state.request_queue.lock().unwrap(); + match message { + Message::Search(hash, range) => { + let mut request_queue = state.request_queue.lock().unwrap(); - if request_queue.is_empty() { - if let Err(err) = state - .broadcast_tx - .send(Message::Search(hash.clone(), range.clone())) - { - warn!("Can't broadcast: {}", err) + if request_queue.is_empty() { + broadcast_message( + &state.broadcast_tx, + Message::Search(hash.clone(), range.clone()), + ); + } else { + debug!("Search request pushed to queue"); } - } else { - debug!("Search request pushed to queue"); - } - request_queue.push((hash, range)) - } else if let Err(err) = state.broadcast_tx.send(message) { - warn!("Can't broadcast: {}", err) + request_queue.push((hash, range)) + } + Message::Stop => { + let mut request_queue = state.request_queue.lock().unwrap(); + if !request_queue.is_empty() { + request_queue.remove(0); + broadcast_message(&state.broadcast_tx, message); + if !request_queue.is_empty() { + info!("Sending queued request: {:?}", request_queue[0]); + + let (hash, range) = &request_queue[0]; + broadcast_message( + &state.broadcast_tx, + Message::Search(hash.clone(), range.clone()), + ); + } + } else { + warn!("Nothing to stop") + } + } + message => broadcast_message(&state.broadcast_tx, message), } } Err(err) => warn!("{:?}", err), } } +fn broadcast_message(tx: &broadcast::Sender, message: Message) { + if let Err(err) = tx.send(message) { + warn!("Can't broadcast: {}", err) + } +} + // Include utf-8 file at **compile** time. async fn index() -> Html<&'static str> { Html(std::include_str!("index.html")) -- GitLab From 9a49b9f261374977d52fd01fa7ddba12d8721710 Mon Sep 17 00:00:00 2001 From: Imran Tierce Date: Sun, 26 Dec 2021 15:00:32 +0100 Subject: [PATCH 02/11] Add listener entry --- src/main.rs | 79 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 53 insertions(+), 26 deletions(-) diff --git a/src/main.rs b/src/main.rs index 179ff1c..ddcdfd5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,7 +27,8 @@ use tracing::{debug, info, warn, Level}; struct AppState { slaves_id_order: Mutex>, request_queue: Mutex)>>, - broadcast_tx: broadcast::Sender, + tx_to_slaves: broadcast::Sender, + tx_to_listeners: broadcast::Sender, } #[derive(Parser, Debug)] @@ -51,12 +52,14 @@ async fn main() { let addr: SocketAddr = args.address.parse().expect("Can't parse provided address"); - let (broadcast_tx, _) = broadcast::channel(100); + let (tx_to_slaves, _) = broadcast::channel(100); + let (tx_to_listeners, _) = broadcast::channel(100); let app_state = Arc::new(AppState { slaves_id_order: Mutex::new(Vec::new()), request_queue: Mutex::new(Vec::new()), - broadcast_tx, + tx_to_slaves, + tx_to_listeners, }); let app = Router::new() @@ -83,10 +86,17 @@ async fn websocket_handler( } async fn websocket(stream: WebSocket, state: Arc) { - let (mut tx, mut rx) = stream.split(); + let (mut tx_to_client, mut rx_from_client) = stream.split(); - if let Some(Ok(ws::Message::Text(msg))) = rx.next().await { - if msg == "slave" { + let msg = if let Some(Ok(ws::Message::Text(msg))) = rx_from_client.next().await { + msg + } else { + warn!("Error with unknown client"); + return; + }; + + match msg.as_str() { + "slave" => { let slave_id = { // On utilise ce bloc pour drop le mutex le plus tôt possible let mut slaves_id_order = state.slaves_id_order.lock().unwrap(); @@ -98,13 +108,13 @@ async fn websocket(stream: WebSocket, state: Arc) { info!("Slave connected with id = {}", slave_id); tokio::spawn(master_to_slave_relay_task( - state.broadcast_tx.subscribe(), + state.tx_to_slaves.subscribe(), Arc::clone(&state), slave_id, - tx, + tx_to_client, )); - slave_listening_task(rx, slave_id, &state).await; + slave_listening_task(rx_from_client, slave_id, &state).await; state .slaves_id_order @@ -115,18 +125,36 @@ async fn websocket(stream: WebSocket, state: Arc) { info!("Slave {} disconnected", slave_id); // Arrêt de la tâche d'écoute de l'esclave associé - broadcast_message(&state.broadcast_tx, Message::SlaveDisconnected(slave_id)); - } else { + broadcast_message(&state.tx_to_slaves, Message::SlaveDisconnected(slave_id)); + } + "queue length" => { + info!("Listener connected"); + + let mut rx = state.tx_to_listeners.subscribe(); + + while let Ok(new_size) = rx.recv().await { + if let Err(err) = tx_to_client + .send(ws::Message::Binary(new_size.to_be_bytes().to_vec())) + .await + { + warn!("Can't send to listener: {}", err); + break; + } + } + + info!("Listener disconnected"); + } + _ => { info!("Dude connected"); execute_msg(msg, &state); - while let Some(Ok(msg)) = rx.next().await { + while let Some(Ok(msg)) = rx_from_client.next().await { match msg { // On est poli, on pong quand on a un ping ws::Message::Ping(payload) => { debug!("Ping received from dude"); - if tx.send(ws::Message::Pong(payload)).await.is_err() { + if tx_to_client.send(ws::Message::Pong(payload)).await.is_err() { break; } } @@ -137,18 +165,16 @@ async fn websocket(stream: WebSocket, state: Arc) { info!("Dude disconnected") } - } else { - warn!("Error with unknown client"); } } async fn master_to_slave_relay_task( - mut broadcast_rx: broadcast::Receiver, + mut rx_from_master: broadcast::Receiver, state_clone: Arc, slave_id: u32, - mut tx: SplitSink, + mut tx_to_slave: SplitSink, ) { - while let Ok(msg) = broadcast_rx.recv().await { + while let Ok(msg) = rx_from_master.recv().await { let text = match msg { Message::Search(hash, range) => { let (begin, end) = (range.start, range.end); @@ -192,7 +218,8 @@ async fn master_to_slave_relay_task( } }; // arrêt de la boucle à la moindre erreur - if tx.send(ws::Message::Text(text)).await.is_err() { + if let Err(err) = tx_to_slave.send(ws::Message::Text(text)).await { + warn!("Can't send to slave: {}", err); break; } } @@ -217,7 +244,7 @@ async fn slave_listening_task( "Slave {} found the word {} behind the hash {}. Now stopping all slaves...", slave_id, word, hash ); - broadcast_message(&state.broadcast_tx, Message::Stop); + broadcast_message(&state.tx_to_slaves, Message::Stop); let mut request_queue = state.request_queue.lock().unwrap(); request_queue.remove(0); @@ -226,7 +253,7 @@ async fn slave_listening_task( let (hash, range) = &request_queue[0]; broadcast_message( - &state.broadcast_tx, + &state.tx_to_slaves, Message::Search(hash.clone(), range.clone()), ); } @@ -253,7 +280,7 @@ fn execute_msg(msg: String, state: &Arc) { if request_queue.is_empty() { broadcast_message( - &state.broadcast_tx, + &state.tx_to_slaves, Message::Search(hash.clone(), range.clone()), ); } else { @@ -266,13 +293,13 @@ fn execute_msg(msg: String, state: &Arc) { let mut request_queue = state.request_queue.lock().unwrap(); if !request_queue.is_empty() { request_queue.remove(0); - broadcast_message(&state.broadcast_tx, message); + broadcast_message(&state.tx_to_slaves, message); if !request_queue.is_empty() { info!("Sending queued request: {:?}", request_queue[0]); - + let (hash, range) = &request_queue[0]; broadcast_message( - &state.broadcast_tx, + &state.tx_to_slaves, Message::Search(hash.clone(), range.clone()), ); } @@ -280,7 +307,7 @@ fn execute_msg(msg: String, state: &Arc) { warn!("Nothing to stop") } } - message => broadcast_message(&state.broadcast_tx, message), + message => broadcast_message(&state.tx_to_slaves, message), } } Err(err) => warn!("{:?}", err), -- GitLab From 8423ed675d7d58980198ab081f8e92971e6dcb2f Mon Sep 17 00:00:00 2001 From: Imran Tierce Date: Sun, 26 Dec 2021 15:09:41 +0100 Subject: [PATCH 03/11] Dude has to tell its type --- src/main.rs | 95 ++++++++++++++++++++++++++--------------------------- 1 file changed, 46 insertions(+), 49 deletions(-) diff --git a/src/main.rs b/src/main.rs index ddcdfd5..58cb249 100644 --- a/src/main.rs +++ b/src/main.rs @@ -144,11 +144,9 @@ async fn websocket(stream: WebSocket, state: Arc) { info!("Listener disconnected"); } - _ => { + "dude" => { info!("Dude connected"); - execute_msg(msg, &state); - while let Some(Ok(msg)) = rx_from_client.next().await { match msg { // On est poli, on pong quand on a un ping @@ -158,13 +156,57 @@ async fn websocket(stream: WebSocket, state: Arc) { break; } } - ws::Message::Text(msg) => execute_msg(msg, &state), + ws::Message::Text(msg) => match Message::try_from(msg.as_str()) { + Ok(message) => { + info!("Dude wants to {:?}", message); + + match message { + Message::Search(hash, range) => { + let mut request_queue = state.request_queue.lock().unwrap(); + + if request_queue.is_empty() { + broadcast_message( + &state.tx_to_slaves, + Message::Search(hash.clone(), range.clone()), + ); + } else { + debug!("Search request pushed to queue"); + } + + request_queue.push((hash, range)) + } + Message::Stop => { + let mut request_queue = state.request_queue.lock().unwrap(); + if !request_queue.is_empty() { + request_queue.remove(0); + broadcast_message(&state.tx_to_slaves, message); + if !request_queue.is_empty() { + info!("Sending queued request: {:?}", request_queue[0]); + + let (hash, range) = &request_queue[0]; + broadcast_message( + &state.tx_to_slaves, + Message::Search(hash.clone(), range.clone()), + ); + } + } else { + warn!("Nothing to stop") + } + } + message => broadcast_message(&state.tx_to_slaves, message), + } + } + Err(err) => warn!("{:?}", err), + }, _ => warn!("Non textual message from dude: {:?}", msg), } } info!("Dude disconnected") } + msg => { + warn!("Unknown type provided: {}", msg) + } } } @@ -269,51 +311,6 @@ async fn slave_listening_task( } } -fn execute_msg(msg: String, state: &Arc) { - match Message::try_from(msg.as_str()) { - Ok(message) => { - info!("Dude wants to {:?}", message); - - match message { - Message::Search(hash, range) => { - let mut request_queue = state.request_queue.lock().unwrap(); - - if request_queue.is_empty() { - broadcast_message( - &state.tx_to_slaves, - Message::Search(hash.clone(), range.clone()), - ); - } else { - debug!("Search request pushed to queue"); - } - - request_queue.push((hash, range)) - } - Message::Stop => { - let mut request_queue = state.request_queue.lock().unwrap(); - if !request_queue.is_empty() { - request_queue.remove(0); - broadcast_message(&state.tx_to_slaves, message); - if !request_queue.is_empty() { - info!("Sending queued request: {:?}", request_queue[0]); - - let (hash, range) = &request_queue[0]; - broadcast_message( - &state.tx_to_slaves, - Message::Search(hash.clone(), range.clone()), - ); - } - } else { - warn!("Nothing to stop") - } - } - message => broadcast_message(&state.tx_to_slaves, message), - } - } - Err(err) => warn!("{:?}", err), - } -} - fn broadcast_message(tx: &broadcast::Sender, message: Message) { if let Err(err) = tx.send(message) { warn!("Can't broadcast: {}", err) -- GitLab From 68702265006bb7db788aace92c4b916b05602235 Mon Sep 17 00:00:00 2001 From: Imran Tierce Date: Sun, 26 Dec 2021 15:27:52 +0100 Subject: [PATCH 04/11] Rename Message to ToSlaveMessage --- src/main.rs | 38 ++++++++++++++------------------------ src/msg.rs | 12 +++++------- 2 files changed, 19 insertions(+), 31 deletions(-) diff --git a/src/main.rs b/src/main.rs index 58cb249..bb01ea6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use futures::{ sink::SinkExt, stream::{SplitSink, SplitStream, StreamExt}, }; -use msg::Message; +use msg::ToSlaveMessage; use std::{ net::SocketAddr, sync::{Arc, Mutex}, @@ -27,7 +27,7 @@ use tracing::{debug, info, warn, Level}; struct AppState { slaves_id_order: Mutex>, request_queue: Mutex)>>, - tx_to_slaves: broadcast::Sender, + tx_to_slaves: broadcast::Sender, tx_to_listeners: broadcast::Sender, } @@ -123,9 +123,6 @@ async fn websocket(stream: WebSocket, state: Arc) { .retain(|&id| id != slave_id); info!("Slave {} disconnected", slave_id); - - // Arrêt de la tâche d'écoute de l'esclave associé - broadcast_message(&state.tx_to_slaves, Message::SlaveDisconnected(slave_id)); } "queue length" => { info!("Listener connected"); @@ -156,18 +153,18 @@ async fn websocket(stream: WebSocket, state: Arc) { break; } } - ws::Message::Text(msg) => match Message::try_from(msg.as_str()) { + ws::Message::Text(msg) => match ToSlaveMessage::try_from(msg.as_str()) { Ok(message) => { info!("Dude wants to {:?}", message); match message { - Message::Search(hash, range) => { + ToSlaveMessage::Search(hash, range) => { let mut request_queue = state.request_queue.lock().unwrap(); if request_queue.is_empty() { broadcast_message( &state.tx_to_slaves, - Message::Search(hash.clone(), range.clone()), + ToSlaveMessage::Search(hash.clone(), range.clone()), ); } else { debug!("Search request pushed to queue"); @@ -175,7 +172,7 @@ async fn websocket(stream: WebSocket, state: Arc) { request_queue.push((hash, range)) } - Message::Stop => { + ToSlaveMessage::Stop => { let mut request_queue = state.request_queue.lock().unwrap(); if !request_queue.is_empty() { request_queue.remove(0); @@ -186,7 +183,7 @@ async fn websocket(stream: WebSocket, state: Arc) { let (hash, range) = &request_queue[0]; broadcast_message( &state.tx_to_slaves, - Message::Search(hash.clone(), range.clone()), + ToSlaveMessage::Search(hash.clone(), range.clone()), ); } } else { @@ -211,14 +208,14 @@ async fn websocket(stream: WebSocket, state: Arc) { } async fn master_to_slave_relay_task( - mut rx_from_master: broadcast::Receiver, + mut rx_from_master: broadcast::Receiver, state_clone: Arc, slave_id: u32, mut tx_to_slave: SplitSink, ) { while let Ok(msg) = rx_from_master.recv().await { let text = match msg { - Message::Search(hash, range) => { + ToSlaveMessage::Search(hash, range) => { let (begin, end) = (range.start, range.end); let slaves_id_order = state_clone.slaves_id_order.lock().unwrap(); let slaves_count = slaves_id_order.len(); @@ -249,15 +246,8 @@ async fn master_to_slave_relay_task( get_word_from_number(slave_end) ) } - Message::Stop => "stop".to_owned(), - Message::Exit => "exit".to_owned(), - Message::SlaveDisconnected(id) => { - if id == slave_id { - break; - } else { - continue; - } - } + ToSlaveMessage::Stop => "stop".to_owned(), + ToSlaveMessage::Exit => "exit".to_owned(), }; // arrêt de la boucle à la moindre erreur if let Err(err) = tx_to_slave.send(ws::Message::Text(text)).await { @@ -286,7 +276,7 @@ async fn slave_listening_task( "Slave {} found the word {} behind the hash {}. Now stopping all slaves...", slave_id, word, hash ); - broadcast_message(&state.tx_to_slaves, Message::Stop); + broadcast_message(&state.tx_to_slaves, ToSlaveMessage::Stop); let mut request_queue = state.request_queue.lock().unwrap(); request_queue.remove(0); @@ -296,7 +286,7 @@ async fn slave_listening_task( let (hash, range) = &request_queue[0]; broadcast_message( &state.tx_to_slaves, - Message::Search(hash.clone(), range.clone()), + ToSlaveMessage::Search(hash.clone(), range.clone()), ); } } @@ -311,7 +301,7 @@ async fn slave_listening_task( } } -fn broadcast_message(tx: &broadcast::Sender, message: Message) { +fn broadcast_message(tx: &broadcast::Sender, message: ToSlaveMessage) { if let Err(err) = tx.send(message) { warn!("Can't broadcast: {}", err) } diff --git a/src/msg.rs b/src/msg.rs index 1fca39d..0110ed5 100644 --- a/src/msg.rs +++ b/src/msg.rs @@ -1,12 +1,10 @@ use alphabet::get_number_from_word; #[derive(Debug, Clone)] -pub enum Message { +pub enum ToSlaveMessage { Search(String, std::ops::Range), Stop, Exit, - // Seulement pour le serveur, pour quitter la tâche d'écoute de l'esclave associé - SlaveDisconnected(u32), } #[derive(Debug)] @@ -15,7 +13,7 @@ pub enum ConversionError { WordProblem(String), } -impl TryFrom<&str> for Message { +impl TryFrom<&str> for ToSlaveMessage { type Error = ConversionError; fn try_from(value: &str) -> Result { @@ -24,7 +22,7 @@ impl TryFrom<&str> for Message { &["search", hash, begin, end] => { match (get_number_from_word(begin), get_number_from_word(end)) { (Ok(begin_num), Ok(end_num)) => { - Ok(Message::Search(hash.to_owned(), begin_num..end_num)) + Ok(ToSlaveMessage::Search(hash.to_owned(), begin_num..end_num)) } (Ok(_), Err(err)) => Err(ConversionError::WordProblem(format!( "Problem with end word: {}", @@ -40,8 +38,8 @@ impl TryFrom<&str> for Message { ))), } } - ["stop"] => Ok(Message::Stop), - ["exit"] => Ok(Message::Exit), + ["stop"] => Ok(ToSlaveMessage::Stop), + ["exit"] => Ok(ToSlaveMessage::Exit), _ => Err(ConversionError::UnknownRequest), } } -- GitLab From 3240ea1fa29c623067aff1b2f0691d8e1a4f660a Mon Sep 17 00:00:00 2001 From: Imran Tierce Date: Sun, 26 Dec 2021 16:16:39 +0100 Subject: [PATCH 05/11] Send requests count updates --- src/main.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index bb01ea6..c65f0da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -170,13 +170,22 @@ async fn websocket(stream: WebSocket, state: Arc) { debug!("Search request pushed to queue"); } - request_queue.push((hash, range)) + request_queue.push((hash, range)); + + broadcast_message(&state.tx_to_listeners, request_queue.len()) } ToSlaveMessage::Stop => { let mut request_queue = state.request_queue.lock().unwrap(); if !request_queue.is_empty() { request_queue.remove(0); + + broadcast_message( + &state.tx_to_listeners, + request_queue.len(), + ); + broadcast_message(&state.tx_to_slaves, message); + if !request_queue.is_empty() { info!("Sending queued request: {:?}", request_queue[0]); @@ -279,7 +288,11 @@ async fn slave_listening_task( broadcast_message(&state.tx_to_slaves, ToSlaveMessage::Stop); let mut request_queue = state.request_queue.lock().unwrap(); + request_queue.remove(0); + + broadcast_message(&state.tx_to_listeners, request_queue.len()); + if !request_queue.is_empty() { info!("Sending queued request: {:?}", request_queue[0]); @@ -301,7 +314,7 @@ async fn slave_listening_task( } } -fn broadcast_message(tx: &broadcast::Sender, message: ToSlaveMessage) { +fn broadcast_message(tx: &broadcast::Sender, message: T) { if let Err(err) = tx.send(message) { warn!("Can't broadcast: {}", err) } -- GitLab From b936c89252b95c40360a6659f50c90ef319a854d Mon Sep 17 00:00:00 2001 From: Imran Tierce Date: Sun, 26 Dec 2021 18:17:30 +0100 Subject: [PATCH 06/11] Add monitor script --- docker/monitor.py | 20 ++++++++++++++++++++ src/main.rs | 25 ++++++++++++++++++------- 2 files changed, 38 insertions(+), 7 deletions(-) create mode 100644 docker/monitor.py diff --git a/docker/monitor.py b/docker/monitor.py new file mode 100644 index 0000000..463b619 --- /dev/null +++ b/docker/monitor.py @@ -0,0 +1,20 @@ +import websocket +import signal +import sys + +ws = websocket.create_connection("ws://127.0.0.1:3000/ws") +ws.send("queue length") + + +def handler(signum, frame): + print("\nClosing WS connection...") + # timeout=0 car il reçoit pas le closing frame ??? + ws.close(websocket.STATUS_GOING_AWAY, timeout=0) + sys.exit() + + +signal.signal(signal.SIGINT, handler) + +while True: + queue_length = int.from_bytes(ws.recv(), "big") + print(queue_length) diff --git a/src/main.rs b/src/main.rs index c65f0da..38d87a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -129,13 +129,24 @@ async fn websocket(stream: WebSocket, state: Arc) { let mut rx = state.tx_to_listeners.subscribe(); - while let Ok(new_size) = rx.recv().await { - if let Err(err) = tx_to_client - .send(ws::Message::Binary(new_size.to_be_bytes().to_vec())) - .await - { - warn!("Can't send to listener: {}", err); - break; + tokio::spawn(async move { + while let Ok(new_size) = rx.recv().await { + if let Err(err) = tx_to_client + .send(ws::Message::Binary(new_size.to_be_bytes().to_vec())) + .await + { + // Permet de jeter cette tâche qui ne sert plus + // car le listener n'est plus connecté + warn!("Can't send to listener: {}", err); + break; + } + } + }); + + while let Some(Ok(msg)) = rx_from_client.next().await { + match msg { + ws::Message::Close(_) => break, + msg => warn!("Unknown message from listener: {:?}", msg), } } -- GitLab From 56366e8639f138ae1fee0c377d634cf4636a22c2 Mon Sep 17 00:00:00 2001 From: itytophile Date: Sun, 26 Dec 2021 19:47:57 +0100 Subject: [PATCH 07/11] Change default addresses --- src/main.rs | 2 +- src/slave.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 38d87a3..b1293a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,7 +34,7 @@ struct AppState { #[derive(Parser, Debug)] #[clap(about, version, author)] struct Args { - #[clap(short, long, default_value = "127.0.0.1:3000")] + #[clap(short, long, default_value = "0.0.0.0:3000")] address: String, } diff --git a/src/slave.rs b/src/slave.rs index 263f9c9..526ff71 100644 --- a/src/slave.rs +++ b/src/slave.rs @@ -18,7 +18,7 @@ type WebSocketSender = SplitSink>, Mes #[derive(Parser, Debug)] #[clap(about, version, author)] struct Args { - #[clap(default_value = "ws://127.0.0.1:3000/ws")] + #[clap(default_value = "ws://0.0.0.0:3000/ws")] ws_address: String, } -- GitLab From f15dcd79bce3d11ff6cc99c8a3dbafcc581447b8 Mon Sep 17 00:00:00 2001 From: itytophile Date: Sun, 26 Dec 2021 21:00:19 +0100 Subject: [PATCH 08/11] Add docker-compose --- docker-compose.yml | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 docker-compose.yml diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..21c2d48 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,12 @@ +version: "3.9" + +services: + server: + image: itytophile/hash-server + ports: + - "3000:3000" + slave: + image: itytophile/hash-slave + depends_on: + - server + command: ws://server:3000/ws \ No newline at end of file -- GitLab From 4be7c16aca2f42111009ab0ab4d213f1aea18b46 Mon Sep 17 00:00:00 2001 From: itytophile Date: Mon, 27 Dec 2021 00:21:02 +0100 Subject: [PATCH 09/11] The monitor can scale --- README.md | 7 +++++++ docker/monitor.py | 22 +++++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b4bd6b4..c39dd6f 100644 --- a/README.md +++ b/README.md @@ -5,3 +5,10 @@ Before building: ```sh cargo vendor ``` + +## Scaling + +```sh +docker compose up +python docker/monitor.py +``` diff --git a/docker/monitor.py b/docker/monitor.py index 463b619..459d159 100644 --- a/docker/monitor.py +++ b/docker/monitor.py @@ -1,9 +1,12 @@ import websocket import signal import sys +import os +import math ws = websocket.create_connection("ws://127.0.0.1:3000/ws") ws.send("queue length") +print("Successfully connected to server") def handler(signum, frame): @@ -15,6 +18,23 @@ def handler(signum, frame): signal.signal(signal.SIGINT, handler) +REQUIRED_REPLICAS_COUNT_FOR_LENGTH = [ + (2, 1), + (4, 2), + (8, 4), + (math.inf, 8), +] # (length, count) + +replicas_count = 1 + while True: queue_length = int.from_bytes(ws.recv(), "big") - print(queue_length) + print("queue_length =", queue_length) + for (length, count) in REQUIRED_REPLICAS_COUNT_FOR_LENGTH: + if queue_length <= length: + if replicas_count != count: + print(f"Updating slaves: {replicas_count} ->", count) + # peux pas utiliser --no-recreate https://github.com/docker/compose/issues/8940 + os.system(f"docker compose up -d --scale slave={count}") + replicas_count = count + break -- GitLab From 1678c048d2d3d5af4309c55bb7d994967c9d7c8b Mon Sep 17 00:00:00 2001 From: itytophile Date: Tue, 28 Dec 2021 00:34:55 +0100 Subject: [PATCH 10/11] Docker stack --- README.md | 12 ++++++++++-- docker/monitor.py | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c39dd6f..7451d11 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,17 @@ Before building: cargo vendor ``` -## Scaling +## Monitoring + +After creating the swarm: ```sh -docker compose up +docker stack deploy --compose-file docker-compose.yml stackhash python docker/monitor.py ``` + +To delete the stack: + +```sh +docker stack rm stackhash +``` diff --git a/docker/monitor.py b/docker/monitor.py index 459d159..5640a09 100644 --- a/docker/monitor.py +++ b/docker/monitor.py @@ -35,6 +35,6 @@ while True: if replicas_count != count: print(f"Updating slaves: {replicas_count} ->", count) # peux pas utiliser --no-recreate https://github.com/docker/compose/issues/8940 - os.system(f"docker compose up -d --scale slave={count}") + os.system(f"docker service scale stackhash_slave={count}") replicas_count = count break -- GitLab From c087aac035c24a08834bd6d8720b9ec59e13ab86 Mon Sep 17 00:00:00 2001 From: Imran Tierce Date: Tue, 28 Dec 2021 00:49:06 +0100 Subject: [PATCH 11/11] Comment removed --- docker/monitor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/docker/monitor.py b/docker/monitor.py index 5640a09..7469cf9 100644 --- a/docker/monitor.py +++ b/docker/monitor.py @@ -34,7 +34,6 @@ while True: if queue_length <= length: if replicas_count != count: print(f"Updating slaves: {replicas_count} ->", count) - # peux pas utiliser --no-recreate https://github.com/docker/compose/issues/8940 os.system(f"docker service scale stackhash_slave={count}") replicas_count = count break -- GitLab