use super::super::SenderID; use crate::{ action::{Action, Field, MsgAction}, message::{Message, MessageAction, MessageID}, mtterror::MTTError, name::{Name, NameID, NameType, Names}, queue::router::Queue, }; use chrono::{DateTime, Utc}; use std::{ collections::{HashMap, HashSet}, default, sync::mpsc::Receiver, thread::spawn, time::Duration, }; use uuid::Uuid; #[derive(Clone, Debug, Eq, Hash)] pub enum Include { All, Just(T), } impl PartialEq for Include { fn eq(&self, other: &Self) -> bool { match self { Include::All => true, Include::Just(data) => match other { Include::All => true, Include::Just(other_data) => data == other_data, }, } } } #[cfg(test)] mod includes { use super::*; #[test] fn does_all_equal_evberything() { let a: Include = Include::All; let b: Include = Include::Just(5); let c: Include = Include::Just(7); assert!(a == a, "all should equal all"); assert!(a == b, "all should equal just"); assert!(b == a, "just should equal all"); assert!(b == b, "same just should equal"); assert!(b != c, "different justs do not equal"); } } #[derive(Clone, Debug)] pub enum RegMsg { AddRoute(Path), AddDocName(Vec), DocumentNameID(NameID), Error(MTTError), GetNameID(Name), Ok, RemoveRoute(Route), RemoveSender(SenderID), RouteID(RouteID), } #[derive(Clone, Debug)] pub struct Register { msg: RegMsg, sender_id: SenderID, } impl Register { pub fn new(sender_id: SenderID, reg_msg: RegMsg) -> Self { Self { msg: reg_msg, sender_id: sender_id, } } pub fn get_msg(&self) -> &RegMsg { &self.msg } pub fn get_sender_id(&self) -> &SenderID { &self.sender_id } pub fn response(&self, reg_msg: RegMsg) -> Self { Self { msg: reg_msg, sender_id: self.sender_id.clone(), } } } impl MessageAction for Register {} #[cfg(test)] mod registries { use super::*; use crate::name::name_id_support::test_name_id; #[test] fn does_registry_store_data() { let name_id = test_name_id(); let sender_data_id = SenderID::new(); let inputs = [ RegMsg::DocumentNameID(name_id.clone()), RegMsg::RemoveSender(sender_data_id.clone()), ]; for regmsg in inputs.iter() { let sender_id = SenderID::new(); let reg = Register::new(sender_id.clone(), regmsg.clone()); assert_eq!(reg.doc_name(), &NameType::None); assert_eq!(reg.get_sender_id(), &sender_id); match reg.get_msg() { RegMsg::DocumentNameID(data) => assert_eq!(data, &name_id), RegMsg::RemoveSender(data) => assert_eq!(data, &sender_data_id), _ => unreachable!("should have been one of the inputs"), } } } } #[derive(Clone, Debug)] pub struct Path { pub msg_id: Include, pub doc: Include, pub action: Include, } impl Path { pub fn new(id: Include, doc: Include, action: Include) -> Self { Self { msg_id: id, doc: doc, action: action, } } pub fn for_message(name: NT, action: &MsgAction) -> Self where NT: Into, { Self { msg_id: Include::Just(MessageID::new()), doc: Include::Just(name.into()), action: Include::Just(action.into()), } } } #[cfg(test)] mod paths { use super::*; use crate::{ action::{Records, Show}, name::{Name, Names}, }; #[test] fn can_create_for_message() { let input = [ ( Name::english("one"), MsgAction::Show(Show::new(Name::english("one"))), ), ( Name::english("two"), MsgAction::Records(Records::new(vec![Name::english("two")], Names::new())), ), ]; for item in input.iter() { let path = Path::for_message(item.0.clone(), &item.1); match path.doc { Include::Just(name) => assert_eq!(name, item.0.clone().into()), _ => unreachable!("should have returned document name"), } match path.action { Include::Just(action) => assert_eq!(action, item.1.clone().into()), _ => unreachable!("should have returned action type"), } } } #[test] fn message_ids_are_unique_for_message_paths() { let count = 10; let mut ids: Vec = Vec::new(); for _ in 0..count { let path = Path::for_message(NameType::None, &MsgAction::Show(Show::new(NameType::None))); let id = match path.msg_id { Include::Just(data) => data.clone(), Include::All => unreachable!("should have been a message id"), }; assert!(!ids.contains(&id), "{:?} is duplicated in {:?}", id, ids); ids.push(id); } } } #[derive(Clone, Debug, PartialEq)] pub struct Route { pub action: Include, pub doc_id: Include, pub msg_id: Include, } impl Route { pub fn new(msg_id: Include, doc: Include, action: Include) -> Self { Self { action: action, doc_id: doc, msg_id: msg_id, } } } impl Default for Route { fn default() -> Self { Self { action: Include::All, doc_id: Include::All, msg_id: Include::All, } } } impl From for Route { fn from(value: RouteID) -> Self { Self::from(&value) } } impl From<&RouteID> for Route { fn from(value: &RouteID) -> Self { Self { action: match &value.action { Some(data) => Include::Just(data.clone()), None => Include::All, }, doc_id: match &value.doc_id { Some(doc) => Include::Just(doc.clone()), None => Include::All, }, msg_id: match &value.msg_id { Some(msg) => Include::Just(msg.clone()), None => Include::All, }, } } } #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct RouteID { action: Option, doc_id: Option, msg_id: Option, } impl From for RouteID { fn from(value: Route) -> Self { Self { action: match value.action { Include::All => None, Include::Just(action) => Some(action.clone()), }, doc_id: match value.doc_id { Include::All => None, Include::Just(doc) => Some(doc.clone()), }, msg_id: match value.msg_id { Include::All => None, Include::Just(id) => Some(id.clone()), }, } } } struct RouteStorage { data: HashMap>, } impl RouteStorage { fn new() -> Self { Self { data: HashMap::new(), } } fn add(&mut self, route: Route, sender_id: SenderID) -> RouteID { let route_id: RouteID = route.into(); let set = match self.data.get_mut(&route_id) { Some(result) => result, None => { let holder = HashSet::new(); self.data.insert(route_id.clone(), holder); self.data.get_mut(&route_id).unwrap() } }; set.insert(sender_id); route_id } fn remove_sender_id(&mut self, sender_id: &SenderID) { let mut removal: Vec = Vec::new(); for (route_id, set) in self.data.iter_mut() { set.remove(sender_id); if set.is_empty() { removal.push(route_id.clone()); } } for route_id in removal.iter() { self.data.remove(route_id); } } fn remove_route(&mut self, route: Route, sender: SenderID) { let route_id: RouteID = route.into(); let mut remove = false; match self.data.get_mut(&route_id) { Some(store) => { store.remove(&sender); if store.len() == 0 { remove = true; } } None => {} } if remove { self.data.remove(&route_id); } } fn get(&self, route: Route) -> HashSet { let mut output = HashSet::new(); for (route_id, set) in self.data.iter() { if route == route_id.into() { output = output.union(set).cloned().collect(); } } output } } #[cfg(test)] mod route_storeage { use super::*; #[test] fn can_add_routes() { let mut routes = RouteStorage::new(); let id1 = SenderID::new(); let id2 = SenderID::new(); let route1 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); let route2 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); let route_id1 = routes.add(route1.clone(), id1.clone()); let route_id2 = routes.add(route2.clone(), id2.clone()); let result1 = routes.get(route1.clone()); assert_eq!(result1.len(), 1); assert!( result1.contains(&id1), "{:?} not found in {:?}", id1, result1 ); assert_eq!(route_id1, route1.into()); let result2 = routes.get(route2.clone()); assert_eq!(result2.len(), 1); assert!( result2.contains(&id2), "{:?} not found in {:?}", id2, result2 ); assert_eq!(route_id2, route2.into()); } #[test] fn returns_empty_set_when_nothing_is_available() { let routes = RouteStorage::new(); let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); let result = routes.get(route); assert_eq!(result.len(), 0); } #[test] fn returns_all_entries_using_the_same_route() { let count = 5; let mut routes = RouteStorage::new(); let mut ids: HashSet = HashSet::new(); while ids.len() < count { ids.insert(SenderID::new()); } let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); for id in ids.iter() { routes.add(route.clone(), id.clone()); } let result = routes.get(route); assert_eq!(result, ids); } #[test] fn routes_are_not_duplicated() { let count = 5; let mut routes = RouteStorage::new(); let id = SenderID::new(); let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); for _ in 0..count { routes.add(route.clone(), id.clone()); } let result = routes.get(route); assert_eq!(result.len(), 1); assert!(result.contains(&id), "{:?} not found in {:?}", id, result); } #[test] fn overlapping_routes_are_combined() { let mut routes = RouteStorage::new(); let id1 = SenderID::new(); let id2 = SenderID::new(); let route1 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); let route2 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); routes.add(route1.clone(), id1.clone()); routes.add(route2.clone(), id2.clone()); let retrieve = Route::new(Include::All, Include::All, Include::All); let result = routes.get(retrieve); assert_eq!(result.len(), 2); assert!(result.contains(&id1), "{:?} not found in {:?}", id1, result); assert!(result.contains(&id2), "{:?} not found in {:?}", id2, result); } #[test] fn can_remove_sender_id() { let mut routes = RouteStorage::new(); let count = 5; let mut ids: HashSet = HashSet::new(); while ids.len() < count { ids.insert(SenderID::new()); } let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); for id in ids.iter() { routes.add(route.clone(), id.clone()); } let removed = ids.iter().last().unwrap().clone(); ids.remove(&removed); routes.remove_sender_id(&removed); let result = routes.get(route); assert_eq!(result, ids); } #[test] fn empty_routes_are_release_memory() { let mut routes = RouteStorage::new(); let id = SenderID::new(); let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); routes.add(route.clone(), id.clone()); routes.remove_sender_id(&id); assert_eq!(routes.data.len(), 0); } #[test] fn can_route_be_removed() { let mut routes = RouteStorage::new(); let id = SenderID::new(); let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); routes.add(route.clone(), id.clone()); routes.remove_route(route.clone(), id); assert_eq!(routes.data.len(), 0); } #[test] fn can_shared_route_be_removed() { let mut routes = RouteStorage::new(); let id1 = SenderID::new(); let id2 = SenderID::new(); let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All); routes.add(route.clone(), id1.clone()); routes.add(route.clone(), id2.clone()); routes.remove_route(route.clone(), id1); assert_eq!(routes.data.len(), 1); let ids = routes.get(route.clone()); let id = ids.iter().last().unwrap(); assert_eq!(id, &id2); } } #[derive(Clone, Debug)] pub struct Session { id: Field, expire_time: DateTime, } impl Session { const EXPIRE_IN: Duration = Duration::from_hours(1); pub fn new(id: F, time: DateTime) -> Self where F: Into, { Self { id: id.into(), expire_time: time + Self::EXPIRE_IN, } } pub fn id(&self) -> &Field { &self.id } pub fn set_id(&mut self, id: Field) -> Self { let mut output = self.clone(); output.id = id; output } fn extend(&mut self, time: DateTime) { self.expire_time = time + Session::EXPIRE_IN; } fn is_expired(&self, time: DateTime) -> bool { time > self.expire_time } } impl Default for Session { fn default() -> Self { Self { id: Field::None, expire_time: Utc::now(), } } } #[cfg(test)] mod session_entries { use super::*; use chrono::Utc; use std::time::Duration; #[test] fn is_there_a_default() { let session = Session::default(); assert_eq!(session.id(), &Field::None); } #[test] fn does_entry_return_id() { let id: Field = Uuid::new_v4().into(); let time = Utc::now(); let entry = Session::new(id.clone(), time.clone()); assert_eq!(entry.id, id); assert_eq!(entry.expire_time, time + Session::EXPIRE_IN); } #[test] fn can_determine_if_entry_expired() { let id: Field = Uuid::new_v4().into(); let time = Utc::now(); let entry = Session::new(id.clone(), time.clone()); assert!( !entry.is_expired(time + Session::EXPIRE_IN), "should not be expired" ); assert!( entry.is_expired(time + Session::EXPIRE_IN + Duration::from_secs(1)), "should be expired" ); } #[test] fn can_expiration_be_reset() { let id: Field = Uuid::new_v4().into(); let mut entry = Session::new(id.clone(), Utc::now()); let time = Utc::now(); entry.extend(time.clone()); assert_eq!(entry.expire_time, time + Session::EXPIRE_IN); } } struct SessionStorage { entries: HashMap, queue: Queue, } impl SessionStorage { fn new(queue: Queue) -> Self { Self { entries: HashMap::new(), queue: queue, } } fn get(&mut self, id: &Field) -> Session { let converted = match id { Field::Uuid(data) => Field::Uuid(data.clone()), Field::StaticString(data) => match Uuid::try_from(data.clone()) { Ok(id) => Field::Uuid(id.clone()), Err(_) => Field::None, }, _ => Field::None, }; match self.entries.get_mut(&converted) { Some(data) => { data.extend(self.queue.now()); data.clone() } None => { let mut new_id: Field = Uuid::new_v4().into(); while self.entries.contains_key(&new_id) { new_id = Uuid::new_v4().into(); } let output = Session::new(new_id.clone(), self.queue.now()); self.entries.insert(new_id, output.clone()); output } } } fn expire(&mut self) { let mut remove: Vec = Vec::new(); let time = self.queue.now(); for (id, session) in self.entries.iter() { if session.is_expired(time) { remove.push(id.clone()); } } for id in remove.iter() { self.entries.remove(id); } } } #[cfg(test)] mod session_storage { use super::*; use crate::{queue::TestClock, FieldType}; #[test] fn are_session_ids_unique() { let count = 10; let mut ids: Vec = Vec::new(); let clock = TestClock::new(); let queue = Queue::with_clock(clock.clone()); let mut sess = SessionStorage::new(queue.clone()); let expire_time = queue.now() + Session::EXPIRE_IN; for _ in 0..count { let result = sess.get(&Field::None); assert_eq!(result.expire_time, expire_time); let id = result.id().clone(); let id_type: FieldType = (&id).into(); assert_eq!(id_type, FieldType::Uuid); assert!(!ids.contains(&id), "{:?} is not unique in {:?}", id, ids); ids.push(id); } } #[test] fn are_valid_uuids_returned() { let clock = TestClock::new(); let queue = Queue::with_clock(clock.clone()); let mut sess = SessionStorage::new(queue.clone()); let data = sess.get(&Field::None); clock.advance(Duration::from_secs(5)); let time = queue.now(); let entry = sess.get(data.id()); assert_eq!(entry.id(), data.id()); assert_eq!(entry.expire_time, time + Session::EXPIRE_IN); } #[test] fn do_bad_ids_generate_new_ids() { let mut sess = SessionStorage::new(Queue::new()); let data: Field = Uuid::nil().into(); let result = sess.get(&data); assert_ne!(result.id(), &data); } #[test] fn can_string_ids_be_accepted() { let mut sess = SessionStorage::new(Queue::new()); let id = sess.get(&Field::None).id().clone(); let text: Field = match id { Field::Uuid(id) => id.to_string().into(), _ => unreachable!("entry id should always return a uuid"), }; let result = sess.get(&text); assert_eq!(result.id(), &id); } #[test] fn does_mismatched_string_produce_new_id() { let mut sess = SessionStorage::new(Queue::new()); let input = Uuid::nil(); let id: Field = input.to_string().into(); let test_data: Field = input.into(); let result = sess.get(&id); assert_ne!(result.id(), &test_data); } #[test] fn does_bad_string_produce_id() { let mut sess = SessionStorage::new(Queue::new()); let id: Field = "not a uuid".into(); let result = sess.get(&id); match result.id() { Field::Uuid(_) => {} _ => panic!("session id should always return a uuid field"), } } #[test] fn do_other_fields_return_uuid() { let mut sess = SessionStorage::new(Queue::new()); let id: Field = 2.into(); let result = sess.get(&id); match result.id() { Field::Uuid(_) => {} _ => panic!("session id should always return a uuid field"), } } #[test] fn are_expired_sessions_removed() { let clock = TestClock::new(); let queue = Queue::with_clock(clock.clone()); let mut sess = SessionStorage::new(queue.clone()); let data = sess.get(&Field::None); assert_eq!(sess.entries.len(), 1, "should have one entry"); clock.advance(Session::EXPIRE_IN); sess.expire(); assert_eq!( sess.entries.len(), 1, "entry should not have expired: expire: {:?}, time: {:?}", data.expire_time, queue.now() ); clock.advance(Duration::from_nanos(1)); sess.expire(); assert_eq!( sess.entries.len(), 0, "entry should have expired: expire: {:?}, time: {:?}", data.expire_time, queue.now() ); } } pub struct DocRegistry { doc_names: Names, queue: Queue, receiver: Receiver, routes: RouteStorage, sessions: SessionStorage, } impl DocRegistry { fn new(queue: Queue, rx: Receiver) -> Self { Self { doc_names: Names::new(), queue: queue.clone(), receiver: rx, routes: RouteStorage::new(), sessions: SessionStorage::new(queue), } } pub fn start(queue: Queue, rx: Receiver) { let mut doc_names = DocRegistry::new(queue, rx); spawn(move || { doc_names.listen(); }); } fn listen(&mut self) { loop { let mut msg = self.receiver.recv().unwrap(); match msg.get_action() { MsgAction::Register(data) => { let id = data.get_sender_id(); let reply = msg.set_action(self.register_action(data)); self.queue.forward(id, reply); } _ => match self.path_to_route(&msg.get_path()) { Ok(route) => { let session = self.sessions.get(msg.session_id()); msg.override_session(session.clone()); msg.set_route(route.clone()); for sender_id in self.routes.get(route).iter() { self.queue.forward(sender_id, msg.clone()); } } Err(err) => self.queue.send(msg.set_action(MsgAction::Error(err))), }, } } } fn path_to_route(&self, path: &Path) -> Result { let doc_id = match &path.doc { Include::Just(name) => match self.doc_names.get_id(name) { Ok(id) => Include::Just(id), Err(err) => return Err(err), }, Include::All => Include::All, }; Ok(Route::new(path.msg_id.clone(), doc_id, path.action.clone())) } fn register_action(&mut self, reg: &Register) -> Register { match reg.get_msg() { RegMsg::AddDocName(names) => match self.doc_names.add_names(names.clone()) { Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())), Err(err) => reg.response(RegMsg::Error(err)), }, RegMsg::AddRoute(path) => { let response = match self.path_to_route(path) { Ok(route) => { let id = self.routes.add(route, reg.get_sender_id().clone()); RegMsg::RouteID(id) } Err(err) => RegMsg::Error(err), }; reg.response(response) } RegMsg::GetNameID(name) => match self.doc_names.get_id(name) { Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())), Err(err) => reg.response(RegMsg::Error(err)), }, RegMsg::RemoveSender(sender_id) => { self.routes.remove_sender_id(sender_id); reg.response(RegMsg::Ok) } RegMsg::RemoveRoute(route) => { self.routes .remove_route(route.clone(), reg.get_sender_id().clone()); reg.response(RegMsg::Ok) } _ => reg.response(RegMsg::Ok), } } }