feat: multithreading!

This commit is contained in:
Compositr 2024-11-02 21:05:03 +11:00
parent ea57e2842d
commit e58123f496
3 changed files with 137 additions and 35 deletions

View file

@ -1,18 +1,23 @@
use crate::http::{ use crate::{
http::{
requests::{Request, RequestStatus, URL}, requests::{Request, RequestStatus, URL},
responses::{Body, Response, UnitOrBoxedError}, responses::{Body, Response, UnitOrBoxedError},
},
threads::ThreadPool,
}; };
use std::{ use std::{
collections::HashMap, collections::HashMap,
net::{Shutdown, TcpListener}, net::{Shutdown, TcpListener},
sync::Arc,
}; };
type DynHandlerFn = dyn Fn(Request) -> Response; type DynHandlerFn = dyn Fn(Request) -> Response + Send + Sync;
type BoxedHandlerFn = Box<DynHandlerFn>; type BoxedHandlerFn = Box<DynHandlerFn>;
type HandlersMap = HashMap<String, Arc<BoxedHandlerFn>>;
// Collection of handlers for requests // Collection of handlers for requests
pub struct Handlers { pub struct Handlers {
matchers: HashMap<String, BoxedHandlerFn>, matchers: HandlersMap,
} }
impl Handlers { impl Handlers {
@ -31,17 +36,28 @@ impl Handlers {
/// ///
/// path: Path to match (no trailing /) /// path: Path to match (no trailing /)
/// handler: Function to handle the request. Must return a Response /// handler: Function to handle the request. Must return a Response
pub fn add_handler(&mut self, path: &str, handler: impl Fn(Request) -> Response + 'static) { pub fn add_handler(
self.matchers.insert(path.to_string(), Box::from(handler)); &mut self,
path: &str,
handler: impl Fn(Request) -> Response + Send + Sync + 'static,
) {
self.matchers
.insert(path.to_string(), Arc::new(Box::from(handler)));
} }
/// Bind these handlers to a listener in order to handle incoming requests /// Bind these handlers to a listener in order to handle incoming requests.
/// You will need to pass in a TcpListener /// You will need to pass in a TcpListener.
/// This method creates a ThreadPool to handle incoming requests so no multithreading is needed on the part of the caller.
/// ///
/// !! Call this *after* adding all your handlers with add_handler !! /// !! Call this *after* adding all your handlers with add_handler !!
/// ///
/// listener: TcpListener to bind to /// listener: TcpListener to bind to
pub fn bind(&self, listener: TcpListener) { pub fn bind(&self, listener: TcpListener) -> UnitOrBoxedError {
let pool = match ThreadPool::build(4) {
Some(pool) => pool,
None => return Err(Box::from("Failed to create ThreadPool")),
};
for stream in listener.incoming() { for stream in listener.incoming() {
match stream { match stream {
Ok(stream) => { Ok(stream) => {
@ -51,40 +67,55 @@ impl Handlers {
stream.shutdown(Shutdown::Both).unwrap_or_else(|_| { stream.shutdown(Shutdown::Both).unwrap_or_else(|_| {
eprintln!("Failed to close malformed HTTP stream") eprintln!("Failed to close malformed HTTP stream")
}); });
return; continue;
} }
}; };
self.handle_req(request) let handler = match Handlers::match_handler(&self.matchers, &request.url) {
.unwrap_or_else(|_| eprintln!("Failed to send handle request")); Some(handler) => handler.clone(),
None => {
Response::new(request, 404, Body::Static("Not Found")).send()?;
continue;
}
};
pool.execute(move || {
handler.as_ref()(request)
.send()
.unwrap_or_else(|_| eprintln!("Failed to send response"));
})
.unwrap_or_else(|_| {
eprintln!("Failed to send job to ThreadPool");
});
} }
Err(e) => { Err(e) => {
eprintln!("Failed to establish connection: {}", e); eprintln!("Failed to establish connection: {}", e);
return;
}
}
} }
};
} }
fn match_handler(&self, url: &URL) -> Option<&BoxedHandlerFn> { return Ok(());
'matching_loop: for (path, handler) in self.matchers.iter() { }
fn match_handler<'a>(matchers: &'a HandlersMap, url: &URL) -> Option<&'a Arc<BoxedHandlerFn>> {
'matching_loop: for (path, handler) in matchers.iter() {
// Exact match // Exact match
if path == &url.path { if path == &url.path {
return Some(handler); return Some(handler);
}; };
// Segment matching // Segment matching
let url_segements = url.path.split('/').collect::<Vec<&str>>(); let url_segments = url.path.split('/').collect::<Vec<&str>>();
let path_segments = path.split('/').collect::<Vec<&str>>(); let path_segments = path.split('/').collect::<Vec<&str>>();
// If the URL has more segments than the path, it can't match // If the URL has more segments than the path, it can't match
// or if the path has no segments, it can't match // or if the path has no segments, it can't match
if (url_segements.len() != path_segments.len()) || (path_segments.len() == 0) { if (url_segments.len() != path_segments.len()) || (path_segments.len() == 0) {
continue; continue;
} }
// Check each segment of the url // Check each segment of the url
for (url_segment, path_segment) in url_segements.iter().zip(path_segments.iter()) { for (url_segment, path_segment) in url_segments.iter().zip(path_segments.iter()) {
if path_segment.starts_with('[') { if path_segment.starts_with('[') {
// e.g. /path/[id] // e.g. /path/[id]
if path_segment.ends_with(']') { if path_segment.ends_with(']') {
@ -115,15 +146,4 @@ impl Handlers {
None None
} }
fn handle_req(&self, req: Request) -> UnitOrBoxedError {
match self.match_handler(&req.url) {
Some(handler) => handler(req).send()?,
None => {
return Response::new(req, 404, Body::Static("Not Found")).send();
}
};
Ok(())
}
} }

View file

@ -1,4 +1,4 @@
use std::{net::TcpListener, process, rc::Rc}; use std::{net::TcpListener, process, sync::Arc};
use ctrlc; use ctrlc;
use http::responses::{Body, Response}; use http::responses::{Body, Response};
@ -13,6 +13,7 @@ use indoc::indoc;
mod handlers; mod handlers;
mod http; mod http;
mod icons; mod icons;
mod threads;
fn main() { fn main() {
println!("luciders starting..."); println!("luciders starting...");
@ -31,7 +32,7 @@ fn main() {
}) })
.unwrap_or_else(|_| fatal("Failed to set termination signal handler")); .unwrap_or_else(|_| fatal("Failed to set termination signal handler"));
let icons_rc = Rc::new(match icons::Icons::build() { let icons_rc = Arc::new(match icons::Icons::build() {
Ok(icons) => icons, Ok(icons) => icons,
Err(e) => fatal(e), Err(e) => fatal(e),
}); });
@ -79,7 +80,7 @@ fn main() {
}); });
handlers.add_handler("/icons/[icon].svg", { handlers.add_handler("/icons/[icon].svg", {
let icons = Rc::clone(&icons_rc); let icons = Arc::clone(&icons_rc);
move |req| { move |req| {
if req.method != "GET" { if req.method != "GET" {
return Response::new(req, 405, Body::Static("Method Not Allowed")); return Response::new(req, 405, Body::Static("Method Not Allowed"));
@ -109,7 +110,7 @@ fn main() {
}); });
handlers.add_handler("/icons/[icon].png", { handlers.add_handler("/icons/[icon].png", {
let icons = Rc::clone(&icons_rc); let icons = Arc::clone(&icons_rc);
move |req| { move |req| {
if req.method != "GET" { if req.method != "GET" {
return Response::new(req, 405, Body::Static("Method Not Allowed")); return Response::new(req, 405, Body::Static("Method Not Allowed"));

81
src/threads.rs Normal file
View file

@ -0,0 +1,81 @@
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
sender: Option<mpsc::Sender<Job>>,
workers: Vec<Worker>,
}
impl ThreadPool {
/// Create a new ThreadPool
///
/// size: Number of threads to spawn
pub fn build(size: usize) -> Option<Self> {
let (sender, receiver) = mpsc::channel();
// Wrap the receiver in an Arc and Mutex so we can share it among threads and only have one user at a time
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
match Worker::build(id, receiver.clone()) {
Some(worker) => workers.push(worker),
None => return None,
}
}
Some(ThreadPool { sender: Some(sender), workers })
}
pub fn execute<F>(&self, f: F) -> Result<(), mpsc::SendError<Job>>
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
match self.sender.as_ref() {
Some(sender) => sender.send(job),
None => Err(mpsc::SendError(job)),
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().expect("Failed to join worker thread on shutdown!")
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn build(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Option<Self> {
let builder = thread::Builder::new().name(format!("worker-{}", id));
let thread = match builder.spawn(move || loop {
let job = match receiver.lock().expect("Failed to lock receiver! Maybe the Mutex is poisoned?").recv() {
Ok(job) => job,
Err(_) => break,
};
job();
}) {
Ok(thread) => Some(thread),
Err(_) => return None,
};
Some(Worker { id, thread })
}
}