use crate::{ message::{ action::{Action, MsgAction}, wrapper::Message, }, mtterror::MTTError, name::{Name, NameType, Names}, queue::router::Queue, }; use std::{ collections::{HashMap, HashSet}, sync::mpsc::Receiver, thread::spawn, }; use uuid::Uuid; #[derive(Clone, Debug, Eq, Hash)] pub enum Include { All, Just(T), } impl PartialEq for Include { fn eq(&self, other: &Self) -> bool { match self { Include::All => true, Include::Just(data) => match other { Include::All => true, Include::Just(other_data) => data == other_data, }, } } } #[cfg(test)] mod includes { use super::*; #[test] fn does_all_equal_evberything() { let a: Include = Include::All; let b: Include = Include::Just(5); let c: Include = Include::Just(7); assert!(a == a, "all should equal all"); assert!(a == b, "all should equal just"); assert!(b == a, "just should equal all"); assert!(b == b, "same just should equal"); assert!(b != c, "different justs do not equal"); } } #[derive(Clone, Debug)] pub enum RegMsg { AddRoute(Path), AddDocName(Vec), DocumentNameID(Uuid), Error(MTTError), GetNameID(Name), Ok, RemoveSender(Uuid), RouteID(RouteID), } #[derive(Clone, Debug)] pub struct Register { msg: RegMsg, sender_id: Uuid, } impl Register { pub fn new(sender_id: Uuid, reg_msg: RegMsg) -> Self { Self { msg: reg_msg, sender_id: sender_id, } } pub fn get_msg(&self) -> &RegMsg { &self.msg } pub fn get_sender_id(&self) -> &Uuid { &self.sender_id } pub fn response(&self, reg_msg: RegMsg) -> Self { Self { msg: reg_msg, sender_id: self.sender_id.clone(), } } } #[derive(Clone, Debug)] pub struct Path { pub msg_id: Include, pub doc: Include, pub action: Include, } impl Path { pub fn new(id: Include, doc: Include, action: Include) -> Self { Self { msg_id: id, doc: doc, action: action, } } pub fn for_message(name: NT, action: &MsgAction) -> Self where NT: Into, { Self { msg_id: Include::Just(Uuid::new_v4()), doc: Include::Just(name.into()), action: Include::Just(action.into()), } } } #[cfg(test)] mod paths { use super::*; use crate::{ message::{action::MsgAction, wrapper::Records}, name::{Name, Names}, }; #[test] fn can_create_for_message() { let input = [ (Name::english("one"), MsgAction::Show), ( Name::english("two"), MsgAction::Records(Records::new(Names::new())), ), ]; for item in input.iter() { let path = Path::for_message(item.0.clone(), &item.1); match path.doc { Include::Just(name) => assert_eq!(name, item.0.clone().into()), _ => unreachable!("should have returned document name"), } match path.action { Include::Just(action) => assert_eq!(action, item.1.clone().into()), _ => unreachable!("should have returned action type"), } } } #[test] fn message_ids_are_unique_for_message_paths() { let count = 10; let mut ids: Vec = Vec::new(); for _ in 0..count { let path = Path::for_message(NameType::None, &MsgAction::Show); let id = match path.msg_id { Include::Just(data) => data.clone(), Include::All => unreachable!("should have been a message id"), }; assert!(!ids.contains(&id), "{:?} is duplicated in {:?}", id, ids); ids.push(id); } } } #[derive(Clone, Debug, PartialEq)] pub struct Route { pub action: Include, pub doc_id: Include, pub msg_id: Include, } impl Route { pub fn new(msg_id: Include, doc: Include, action: Include) -> Self { Self { action: action, doc_id: doc, msg_id: msg_id, } } } impl Default for Route { fn default() -> Self { Self { action: Include::All, doc_id: Include::All, msg_id: Include::All, } } } impl From for Route { fn from(value: RouteID) -> Self { Self::from(&value) } } impl From<&RouteID> for Route { fn from(value: &RouteID) -> Self { Self { action: match &value.action { Some(data) => Include::Just(data.clone()), None => Include::All, }, doc_id: match &value.doc_id { Some(doc) => Include::Just(doc.clone()), None => Include::All, }, msg_id: match &value.msg_id { Some(msg) => Include::Just(msg.clone()), None => Include::All, }, } } } #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct RouteID { action: Option, doc_id: Option, msg_id: Option, } impl From for RouteID { fn from(value: Route) -> Self { Self { action: match value.action { Include::All => None, Include::Just(action) => Some(action.clone()), }, doc_id: match value.doc_id { Include::All => None, Include::Just(doc) => Some(doc.clone()), }, msg_id: match value.msg_id { Include::All => None, Include::Just(id) => Some(id.clone()), }, } } } struct RouteStorage { data: HashMap>, } impl RouteStorage { fn new() -> Self { Self { data: HashMap::new(), } } fn add(&mut self, route: Route, sender_id: Uuid) -> RouteID { let route_id: RouteID = route.into(); let set = match self.data.get_mut(&route_id) { Some(result) => result, None => { let holder = HashSet::new(); self.data.insert(route_id.clone(), holder); self.data.get_mut(&route_id).unwrap() } }; set.insert(sender_id); route_id } fn remove_sender_id(&mut self, sender_id: &Uuid) { let mut removal: Vec = Vec::new(); for (route_id, set) in self.data.iter_mut() { set.remove(sender_id); if set.is_empty() { removal.push(route_id.clone()); } } for route_id in removal.iter() { self.data.remove(route_id); } } fn get(&self, route: Route) -> HashSet { let mut output = HashSet::new(); for (route_id, set) in self.data.iter() { if route == route_id.into() { output = output.union(set).cloned().collect(); } } output } } #[cfg(test)] mod route_storeage { use super::*; #[test] fn can_add_routes() { let mut routes = RouteStorage::new(); let id1 = Uuid::new_v4(); let id2 = Uuid::new_v4(); let route1 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); let route2 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); let route_id1 = routes.add(route1.clone(), id1.clone()); let route_id2 = routes.add(route2.clone(), id2.clone()); let result1 = routes.get(route1.clone()); assert_eq!(result1.len(), 1); assert!( result1.contains(&id1), "{:?} not found in {:?}", id1, result1 ); assert_eq!(route_id1, route1.into()); let result2 = routes.get(route2.clone()); assert_eq!(result2.len(), 1); assert!( result2.contains(&id2), "{:?} not found in {:?}", id2, result2 ); assert_eq!(route_id2, route2.into()); } #[test] fn returns_empty_set_when_nothing_is_available() { let routes = RouteStorage::new(); let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); let result = routes.get(route); assert_eq!(result.len(), 0); } #[test] fn returns_all_entries_using_the_same_route() { let count = 5; let mut routes = RouteStorage::new(); let mut ids: HashSet = HashSet::new(); while ids.len() < count { ids.insert(Uuid::new_v4()); } let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); for id in ids.iter() { routes.add(route.clone(), id.clone()); } let result = routes.get(route); assert_eq!(result, ids); } #[test] fn routes_are_not_duplicated() { let count = 5; let mut routes = RouteStorage::new(); let id = Uuid::new_v4(); let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); for _ in 0..count { routes.add(route.clone(), id.clone()); } let result = routes.get(route); assert_eq!(result.len(), 1); assert!(result.contains(&id), "{:?} not found in {:?}", id, result); } #[test] fn overlapping_routes_are_combined() { let mut routes = RouteStorage::new(); let id1 = Uuid::new_v4(); let id2 = Uuid::new_v4(); let route1 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); let route2 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); routes.add(route1.clone(), id1.clone()); routes.add(route2.clone(), id2.clone()); let retrieve = Route::new(Include::All, Include::All, Include::All); let result = routes.get(retrieve); assert_eq!(result.len(), 2); assert!(result.contains(&id1), "{:?} not found in {:?}", id1, result); assert!(result.contains(&id2), "{:?} not found in {:?}", id2, result); } #[test] fn can_remove_sender_id() { let mut routes = RouteStorage::new(); let count = 5; let mut ids: HashSet = HashSet::new(); while ids.len() < count { ids.insert(Uuid::new_v4()); } let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); for id in ids.iter() { routes.add(route.clone(), id.clone()); } let removed = ids.iter().last().unwrap().clone(); ids.remove(&removed); routes.remove_sender_id(&removed); let result = routes.get(route); assert_eq!(result, ids); } #[test] fn empty_routes_are_release_memory() { let mut routes = RouteStorage::new(); let id = Uuid::new_v4(); let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All); routes.add(route.clone(), id.clone()); routes.remove_sender_id(&id); assert_eq!(routes.data.len(), 0); } } pub struct DocRegistry { doc_names: Names, queue: Queue, receiver: Receiver, routes: RouteStorage, } impl DocRegistry { fn new(queue: Queue, rx: Receiver) -> Self { Self { doc_names: Names::new(), queue: queue, receiver: rx, routes: RouteStorage::new(), } } pub fn start(queue: Queue, rx: Receiver) { let mut doc_names = DocRegistry::new(queue, rx); spawn(move || { doc_names.listen(); }); } fn listen(&mut self) { loop { let mut msg = self.receiver.recv().unwrap(); match msg.get_action() { MsgAction::Register(data) => { let id = data.get_sender_id(); let reply = msg.response(self.register_action(data)); self.queue.forward(id, reply); } _ => match self.path_to_route(&msg.get_path()) { Ok(route) => { msg.set_route(route.clone()); for sender_id in self.routes.get(route).iter() { self.queue.forward(sender_id, msg.clone()); } } Err(err) => self.queue.send(msg.response(MsgAction::Error(err))), }, } } } fn path_to_route(&self, path: &Path) -> Result { let doc_id = match &path.doc { Include::Just(name) => match self.doc_names.get_id(name) { Ok(id) => Include::Just(id), Err(err) => return Err(err), }, Include::All => Include::All, }; Ok(Route::new(path.msg_id.clone(), doc_id, path.action.clone())) } fn register_action(&mut self, reg: &Register) -> Register { match reg.get_msg() { RegMsg::AddDocName(names) => match self.doc_names.add_names(names.clone()) { Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())), Err(err) => reg.response(RegMsg::Error(err)), }, RegMsg::AddRoute(path) => { let response = match self.path_to_route(path) { Ok(route) => { let id = self.routes.add(route, reg.get_sender_id().clone()); RegMsg::RouteID(id) } Err(err) => RegMsg::Error(err), }; reg.response(response) } RegMsg::GetNameID(name) => match self.doc_names.get_id(name) { Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())), Err(err) => reg.response(RegMsg::Error(err)), }, RegMsg::RemoveSender(sender_id) => { self.routes.remove_sender_id(sender_id); reg.response(RegMsg::Ok) } _ => reg.response(RegMsg::Ok), } } }