From e58123f4961e7f3e7900f92bc2010aed05bfe525 Mon Sep 17 00:00:00 2001 From: Compositr Date: Sat, 2 Nov 2024 21:05:03 +1100 Subject: [PATCH] feat: multithreading! --- src/handlers.rs | 82 ++++++++++++++++++++++++++++++------------------- src/main.rs | 9 +++--- src/threads.rs | 81 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 35 deletions(-) create mode 100644 src/threads.rs diff --git a/src/handlers.rs b/src/handlers.rs index 008a95a..6cd8042 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,18 +1,23 @@ -use crate::http::{ - requests::{Request, RequestStatus, URL}, - responses::{Body, Response, UnitOrBoxedError}, +use crate::{ + http::{ + requests::{Request, RequestStatus, URL}, + responses::{Body, Response, UnitOrBoxedError}, + }, + threads::ThreadPool, }; use std::{ collections::HashMap, net::{Shutdown, TcpListener}, + sync::Arc, }; -type DynHandlerFn = dyn Fn(Request) -> Response; +type DynHandlerFn = dyn Fn(Request) -> Response + Send + Sync; type BoxedHandlerFn = Box; +type HandlersMap = HashMap>; // Collection of handlers for requests pub struct Handlers { - matchers: HashMap, + matchers: HandlersMap, } impl Handlers { @@ -31,17 +36,28 @@ impl Handlers { /// /// path: Path to match (no trailing /) /// handler: Function to handle the request. Must return a Response - pub fn add_handler(&mut self, path: &str, handler: impl Fn(Request) -> Response + 'static) { - self.matchers.insert(path.to_string(), Box::from(handler)); + pub fn add_handler( + &mut self, + path: &str, + handler: impl Fn(Request) -> Response + Send + Sync + 'static, + ) { + self.matchers + .insert(path.to_string(), Arc::new(Box::from(handler))); } - /// Bind these handlers to a listener in order to handle incoming requests - /// You will need to pass in a TcpListener + /// Bind these handlers to a listener in order to handle incoming requests. + /// You will need to pass in a TcpListener. + /// This method creates a ThreadPool to handle incoming requests so no multithreading is needed on the part of the caller. /// /// !! Call this *after* adding all your handlers with add_handler !! /// /// listener: TcpListener to bind to - pub fn bind(&self, listener: TcpListener) { + pub fn bind(&self, listener: TcpListener) -> UnitOrBoxedError { + let pool = match ThreadPool::build(4) { + Some(pool) => pool, + None => return Err(Box::from("Failed to create ThreadPool")), + }; + for stream in listener.incoming() { match stream { Ok(stream) => { @@ -51,40 +67,55 @@ impl Handlers { stream.shutdown(Shutdown::Both).unwrap_or_else(|_| { eprintln!("Failed to close malformed HTTP stream") }); - return; + continue; } }; - self.handle_req(request) - .unwrap_or_else(|_| eprintln!("Failed to send handle request")); + let handler = match Handlers::match_handler(&self.matchers, &request.url) { + Some(handler) => handler.clone(), + None => { + Response::new(request, 404, Body::Static("Not Found")).send()?; + continue; + } + }; + + pool.execute(move || { + handler.as_ref()(request) + .send() + .unwrap_or_else(|_| eprintln!("Failed to send response")); + }) + .unwrap_or_else(|_| { + eprintln!("Failed to send job to ThreadPool"); + }); } Err(e) => { eprintln!("Failed to establish connection: {}", e); - return; } - } + }; } + + return Ok(()); } - fn match_handler(&self, url: &URL) -> Option<&BoxedHandlerFn> { - 'matching_loop: for (path, handler) in self.matchers.iter() { + fn match_handler<'a>(matchers: &'a HandlersMap, url: &URL) -> Option<&'a Arc> { + 'matching_loop: for (path, handler) in matchers.iter() { // Exact match if path == &url.path { return Some(handler); }; // Segment matching - let url_segements = url.path.split('/').collect::>(); + let url_segments = url.path.split('/').collect::>(); let path_segments = path.split('/').collect::>(); // If the URL has more segments than the path, it can't match // or if the path has no segments, it can't match - if (url_segements.len() != path_segments.len()) || (path_segments.len() == 0) { + if (url_segments.len() != path_segments.len()) || (path_segments.len() == 0) { continue; } // Check each segment of the url - for (url_segment, path_segment) in url_segements.iter().zip(path_segments.iter()) { + for (url_segment, path_segment) in url_segments.iter().zip(path_segments.iter()) { if path_segment.starts_with('[') { // e.g. /path/[id] if path_segment.ends_with(']') { @@ -115,15 +146,4 @@ impl Handlers { None } - - fn handle_req(&self, req: Request) -> UnitOrBoxedError { - match self.match_handler(&req.url) { - Some(handler) => handler(req).send()?, - None => { - return Response::new(req, 404, Body::Static("Not Found")).send(); - } - }; - - Ok(()) - } } diff --git a/src/main.rs b/src/main.rs index b3808b6..bebff72 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::{net::TcpListener, process, rc::Rc}; +use std::{net::TcpListener, process, sync::Arc}; use ctrlc; use http::responses::{Body, Response}; @@ -13,6 +13,7 @@ use indoc::indoc; mod handlers; mod http; mod icons; +mod threads; fn main() { println!("luciders starting..."); @@ -31,7 +32,7 @@ fn main() { }) .unwrap_or_else(|_| fatal("Failed to set termination signal handler")); - let icons_rc = Rc::new(match icons::Icons::build() { + let icons_rc = Arc::new(match icons::Icons::build() { Ok(icons) => icons, Err(e) => fatal(e), }); @@ -79,7 +80,7 @@ fn main() { }); handlers.add_handler("/icons/[icon].svg", { - let icons = Rc::clone(&icons_rc); + let icons = Arc::clone(&icons_rc); move |req| { if req.method != "GET" { return Response::new(req, 405, Body::Static("Method Not Allowed")); @@ -109,7 +110,7 @@ fn main() { }); handlers.add_handler("/icons/[icon].png", { - let icons = Rc::clone(&icons_rc); + let icons = Arc::clone(&icons_rc); move |req| { if req.method != "GET" { return Response::new(req, 405, Body::Static("Method Not Allowed")); diff --git a/src/threads.rs b/src/threads.rs new file mode 100644 index 0000000..f8e8d03 --- /dev/null +++ b/src/threads.rs @@ -0,0 +1,81 @@ +use std::{ + sync::{mpsc, Arc, Mutex}, + thread, +}; + +type Job = Box; + +pub struct ThreadPool { + sender: Option>, + workers: Vec, +} + +impl ThreadPool { + /// Create a new ThreadPool + /// + /// size: Number of threads to spawn + pub fn build(size: usize) -> Option { + let (sender, receiver) = mpsc::channel(); + // Wrap the receiver in an Arc and Mutex so we can share it among threads and only have one user at a time + let receiver = Arc::new(Mutex::new(receiver)); + + let mut workers = Vec::with_capacity(size); + for id in 0..size { + match Worker::build(id, receiver.clone()) { + Some(worker) => workers.push(worker), + None => return None, + } + } + + Some(ThreadPool { sender: Some(sender), workers }) + } + + pub fn execute(&self, f: F) -> Result<(), mpsc::SendError> + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + + match self.sender.as_ref() { + Some(sender) => sender.send(job), + None => Err(mpsc::SendError(job)), + } + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + drop(self.sender.take()); + + for worker in &mut self.workers { + if let Some(thread) = worker.thread.take() { + thread.join().expect("Failed to join worker thread on shutdown!") + } + } + } +} + +struct Worker { + id: usize, + thread: Option>, +} + +impl Worker { + fn build(id: usize, receiver: Arc>>) -> Option { + let builder = thread::Builder::new().name(format!("worker-{}", id)); + + let thread = match builder.spawn(move || loop { + let job = match receiver.lock().expect("Failed to lock receiver! Maybe the Mutex is poisoned?").recv() { + Ok(job) => job, + Err(_) => break, + }; + + job(); + }) { + Ok(thread) => Some(thread), + Err(_) => return None, + }; + + Some(Worker { id, thread }) + } +}