use chrono::prelude::*; use isolang::Language; use std::{ collections::{HashMap, HashSet}, sync::{ mpsc::{channel, Receiver, Sender}, Arc, RwLock, }, thread::spawn, time::Duration, }; use uuid::Uuid; #[cfg(test)] mod support_test { use std::time::Duration; pub static TIMEOUT: Duration = Duration::from_millis(500); } #[derive(Clone, Debug)] enum MTTError { DocumentAlreadyExists(String), DocumentFieldAlreadyExists(String, Field), DocumentFieldMissing(String), DocumentFieldNotFound(String), DocumentFieldWrongDataType(FieldType, FieldType), DocumentNotFound(String), FieldDuplicate(String, Field), NameDuplicate(Name), NameInvalidID(Uuid), NameMissingTranslation(Language), NameNotFound(Name), QueryCannotChangeData, } #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum Action { Addition, Create, Delete, Error, Query, Register, Reply, Show, Update, } impl From for Action { fn from(value: MsgAction) -> Self { match value { MsgAction::Addition(_) => Action::Addition, MsgAction::Create(_) => Action::Create, MsgAction::Delete(_) => Action::Delete, MsgAction::Error(_) => Action::Error, MsgAction::Query(_) => Action::Query, MsgAction::Register(_) => Action::Register, MsgAction::Reply(_) => Action::Reply, MsgAction::Show => Action::Show, MsgAction::Update(_) => Action::Update, } } } impl From<&MsgAction> for Action { fn from(value: &MsgAction) -> Self { let action = value.clone(); Self::from(action) } } #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum NameID { ID(Uuid), Name(String), None, } impl NameID { fn is_none(&self) -> bool { match self { Self::None => true, _ => false, } } } impl From<&str> for NameID { fn from(value: &str) -> Self { Self::Name(value.to_string()) } } impl From for NameID { fn from(value: String) -> Self { Self::Name(value) } } impl From for NameID { fn from(value: Uuid) -> Self { Self::ID(value) } } impl From<&NameID> for NameID { fn from(value: &NameID) -> Self { value.clone() } } #[derive(Clone, Debug)] enum MsgAction { Addition(Addition), Create(DocDef), // Alter // Remove Error(MTTError), Query(Query), Register(Register), Reply(Reply), Show, Delete(Delete), Update(Update), } impl From for MsgAction { fn from(value: Addition) -> Self { MsgAction::Addition(value) } } impl From for MsgAction { fn from(value: Delete) -> Self { MsgAction::Delete(value) } } impl From for MsgAction { fn from(value: DocDef) -> Self { MsgAction::Create(value) } } impl From for MsgAction { fn from(value: MTTError) -> Self { MsgAction::Error(value) } } impl From for MsgAction { fn from(value: Query) -> Self { MsgAction::Query(value) } } impl From for MsgAction { fn from(value: Register) -> Self { MsgAction::Register(value) } } impl From for MsgAction { fn from(value: Reply) -> Self { MsgAction::Reply(value) } } impl From for MsgAction { fn from(value: Update) -> Self { MsgAction::Update(value) } } #[cfg(test)] mod msgactions { use super::*; #[test] fn turn_document_definition_into_action() { let value = DocDef::new(); let result: MsgAction = value.into(); match result { MsgAction::Create(_) => {} _ => unreachable!("Got {:?}: dhould have been create", result), } } #[test] fn turn_error_into_action() { let data = "data".to_string(); let value = MTTError::DocumentAlreadyExists(data.clone()); let result: MsgAction = value.into(); match result { MsgAction::Error(result) => match result { MTTError::DocumentAlreadyExists(output) => assert_eq!(output, data), _ => unreachable!("Got {:?}: dhould have been create", result), }, _ => unreachable!("Got {:?}: dhould have been create", result), } let value = MTTError::DocumentNotFound(data.clone()); let result: MsgAction = value.into(); match result { MsgAction::Error(result) => match result { MTTError::DocumentNotFound(output) => assert_eq!(output, data), _ => unreachable!("Got {:?}: dhould have been create", result), }, _ => unreachable!("Got {:?}: dhould have been create", result), } } #[test] fn turn_query_into_action() { let value = Query::new(); let result: MsgAction = value.into(); match result { MsgAction::Query(_) => {} _ => unreachable!("Got {:?}: dhould have been query", result), } } #[test] fn turn_reply_into_action() { let value = Reply::new(); let result: MsgAction = value.into(); match result { MsgAction::Reply(_) => {} _ => unreachable!("Got {:?}: dhould have been reply", result), } } } #[derive(Clone, Debug)] struct Message { msg_id: Uuid, document_id: NameID, action: MsgAction, } impl Message { fn new(doc_id: D, action: A) -> Self where D: Into, A: Into, { Self { msg_id: Uuid::new_v4(), document_id: doc_id.into(), action: action.into(), } } fn get_message_id(&self) -> &Uuid { &self.msg_id } fn get_document_id(&self) -> &NameID { &self.document_id } fn get_action(&self) -> &MsgAction { &self.action } fn response(&self, action: A) -> Self where A: Into, { Self { msg_id: self.msg_id.clone(), document_id: self.document_id.clone(), action: action.into(), } } } #[cfg(test)] mod messages { use super::*; #[test] fn can_the_document_be_a_stringi_reference() { let dts = ["one", "two"]; for document in dts.into_iter() { let msg = Message::new(document, MsgAction::Create(DocDef::new())); match msg.get_document_id() { NameID::Name(data) => assert_eq!(data, document), _ => unreachable!("should have been a string id"), } match msg.get_action() { MsgAction::Create(_) => {} _ => unreachable!("should have been a create document"), } } } #[test] fn can_the_document_be_a_string() { let dts = ["one".to_string(), "two".to_string()]; for document in dts.into_iter() { let msg = Message::new(document.clone(), MsgAction::Query(Query::new())); match msg.get_document_id() { NameID::Name(data) => assert_eq!(data, &document), _ => unreachable!("should have been a string id"), } match msg.get_action() { MsgAction::Query(_) => {} _ => unreachable!("should have been an access query"), } } } #[test] fn can_the_document_be_an_id() { let document = Uuid::new_v4(); let msg = Message::new(document.clone(), MsgAction::Query(Query::new())); match msg.get_document_id() { NameID::ID(data) => assert_eq!(data, &document), _ => unreachable!("should have been an id"), } match msg.get_action() { MsgAction::Query(_) => {} _ => unreachable!("should have been an access query"), } } #[test] fn is_the_message_id_random() { let mut ids: Vec = Vec::new(); for _ in 0..5 { let msg = Message::new("tester", MsgAction::Create(DocDef::new())); let id = msg.get_message_id().clone(); assert!(!ids.contains(&id), "{:?} containts {}", ids, id); ids.push(id); } } #[test] fn Can_make_reply_message() { let name = "testing"; let msg = Message::new(name, MsgAction::Query(Query::new())); let responce = Reply::new(); let reply = msg.response(responce); assert_eq!(reply.get_message_id(), msg.get_message_id()); match reply.get_document_id() { NameID::Name(data) => assert_eq!(data, name), _ => unreachable!("should have been a name"), } match reply.get_action() { MsgAction::Reply(_) => {} _ => unreachable!("should have been a reply"), } } #[test] fn Can_make_error_message() { let name = "testing"; let msg = Message::new(name, MsgAction::Query(Query::new())); let err_msg = Uuid::new_v4().to_string(); let result = msg.response(MTTError::DocumentNotFound(err_msg.clone())); assert_eq!(result.get_message_id(), msg.get_message_id()); match result.get_document_id() { NameID::Name(data) => assert_eq!(data, name), _ => unreachable!("should have been a name"), } match result.get_action() { MsgAction::Error(data) => match data { MTTError::DocumentNotFound(txt) => assert_eq!(txt, &err_msg), _ => unreachable!("got {:?}, should have received not found", data), }, _ => unreachable!("should have been a reply"), } } #[test] fn can_make_a_response_message() { let doc_id = Uuid::new_v4(); let msg = Message::new(doc_id.clone(), MsgAction::Query(Query::new())); let data = Uuid::new_v4().to_string(); let result1 = msg.response(MTTError::DocumentNotFound(data.clone())); let result2 = msg.response(Reply::new()); assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result2.get_message_id(), msg.get_message_id()); assert_eq!(result1.get_document_id(), msg.get_document_id()); assert_eq!(result2.get_document_id(), msg.get_document_id()); let action1 = result1.get_action(); match action1 { MsgAction::Error(err) => match err { MTTError::DocumentNotFound(output) => assert_eq!(output, &data), _ => unreachable!("got {:?}: should have received document not found", err), }, _ => unreachable!("got {:?}: should have received error", action1), } let action2 = result2.get_action(); match action2 { MsgAction::Reply(data) => assert_eq!(data.len(), 0), _ => unreachable!("got {:?}: should have received a reply", action2), } } } #[derive(Clone, Debug, Eq, Hash)] enum Include { All, Some(T), } impl PartialEq for Include { fn eq(&self, other: &Self) -> bool { match self { Include::All => true, Include::Some(data) => match other { Include::All => true, Include::Some(other_data) => data == other_data, }, } } } #[cfg(test)] mod includes { use super::*; #[test] fn does_all_equal_evberything() { let a: Include = Include::All; let b: Include = Include::Some(5); let c: Include = Include::Some(7); assert!(a == a, "all should equal all"); assert!(a == b, "all should equal some"); assert!(b == a, "some should equal all"); assert!(b == b, "same some should equal"); assert!(b != c, "different somes do not equal"); } } #[derive(Clone, Eq, Hash, PartialEq)] struct RouteID { action: Option, doc_type: Option, msg_id: Option, } impl From for RouteID { fn from(value: Route) -> Self { Self { action: match value.action { Include::All => None, Include::Some(action) => Some(action.clone()), }, doc_type: match value.doc_type { Include::All => None, Include::Some(doc) => Some(doc.clone()), }, msg_id: match value.msg_id { Include::All => None, Include::Some(id) => Some(id.clone()), }, } } } #[derive(Clone, Debug, Eq, Hash, PartialEq)] struct Name { name: String, lang: Language, } impl Name { fn get_language(&self) -> &Language { &self.lang } fn english(name: String) -> Self { Self { name: name, lang: Language::from_639_1("en").unwrap(), } } fn japanese(name: String) -> Self { Self { name: name, lang: Language::from_639_1("ja").unwrap(), } } } impl ToString for Name { fn to_string(&self) -> String { self.name.clone() } } #[derive(Debug)] struct Names { names: HashMap, ids: HashMap>, } impl Names { fn new() -> Self { Self { names: HashMap::new(), ids: HashMap::new(), } } fn add_name(&mut self, name: Name) -> Result { if self.names.contains_key(&name) { return Err(MTTError::NameDuplicate(name)); } let mut id = Uuid::new_v4(); while self.ids.contains_key(&id) { id = Uuid::new_v4(); } self.names.insert(name.clone(), id.clone()); let mut holder: HashMap = HashMap::new(); holder.insert(name.get_language().clone(), name); self.ids.insert(id.clone(), holder); Ok(id) } fn add_translation(&mut self, name: Name, translation: Name) -> Result { let id = match self.get_id(&name) { Ok(data) => data.clone(), Err(err) => return Err(err), }; match self.get_id(&translation) { Ok(_) => return Err(MTTError::NameDuplicate(translation)), Err(_) => {} } let holder = self.ids.get_mut(&id).unwrap(); holder.insert(translation.get_language().clone(), translation.clone()); self.names.insert(translation, id); Ok(id.clone()) } fn get_name(&self, id: &Uuid, lang: &Language) -> Result { match self.ids.get(id) { Some(langdb) => match langdb.get(lang) { Some(name) => Ok(name.clone()), None => Err(MTTError::NameMissingTranslation(lang.clone())), }, None => Err(MTTError::NameInvalidID(id.clone())), } } fn get_id(&self, name: &Name) -> Result<&Uuid, MTTError> { match self.names.get(name) { Some(id) => Ok(id), None => Err(MTTError::NameNotFound(name.clone())), } } } #[cfg(test)] mod names { use super::*; #[test] fn are_name_ids_unique() { let mut names = Names::new(); let data = ["one", "two", "three", "four", "five"]; let mut ids: HashSet = HashSet::new(); for item in data.iter() { let name = Name::english(item.to_string()); ids.insert(names.add_name(name).unwrap()); } assert_eq!(ids.len(), data.len()); } #[test] fn does_id_return_name() { let mut names = Names::new(); let data = ["one", "two"]; let mut ids: HashMap = HashMap::new(); for item in data.iter() { let name = Name::english(item.to_string()); ids.insert(name.clone(), names.add_name(name).unwrap()); } for (name, id) in ids.iter() { assert_eq!( &names .get_name(id, &Language::from_639_1("en").unwrap()) .unwrap(), name ); assert_eq!(names.get_id(name).unwrap(), id); } } #[test] fn errors_on_name_not_found() { let mut names = Names::new(); let name = Name::english("missing".to_string()); let result = names.get_id(&name); match result { Ok(_) => unreachable!("got {:?}, should have been error", result), Err(err) => match err { MTTError::NameNotFound(output) => assert_eq!(output, name), _ => unreachable!("got {:?}, should have been name not found", err), }, } } #[test] fn errors_on_bad_id() { let mut names = Names::new(); let id = Uuid::new_v4(); let result = names.get_name(&id, &Language::from_639_1("en").unwrap()); match result { Ok(_) => unreachable!("got {:?}, should be invalid id error", result), Err(err) => match err { MTTError::NameInvalidID(data) => assert_eq!(data, id), _ => unreachable!("got {:?}, should have been invalid id", err), }, } } #[test] fn errors_on_missing_translation() { let mut names = Names::new(); let name = Name::english("task".to_string()); let lang = Language::from_639_1("ja").unwrap(); let id = names.add_name(name).unwrap(); let result = names.get_name(&id, &lang); match result { Ok(_) => unreachable!("got {:?}, should be invalid id error", result), Err(err) => match err { MTTError::NameMissingTranslation(data) => assert_eq!(data, lang), _ => unreachable!("got {:?}, should have been invalid id", err), }, } } #[test] fn errors_on_duplicate_names() { let mut names = Names::new(); let data = "test".to_string(); let name = Name::english(data.clone()); let id = names.add_name(name.clone()); let output = names.add_name(name.clone()); match output { Ok(_) => unreachable!( "got {:?}, should have produced duplicate name error", output ), Err(err) => match err { MTTError::NameDuplicate(result) => assert_eq!(result, name), _ => unreachable!("got {:?}, should have been duplicate name", err), }, } } #[test] fn allows_alternate_names() { let mut names = Names::new(); let data = "test".to_string(); let alt = "テスト".to_string(); let english = Name::english(data.clone()); let japanese = Name::japanese(alt.clone()); let id = names.add_name(english.clone()).unwrap(); let result = names.add_translation(english, japanese.clone()).unwrap(); assert_eq!(result, id); println!("\n{:?}", names); let output = names.get_name(&id, &Language::from_639_1("ja").unwrap()); assert_eq!(output.unwrap().to_string(), alt); assert_eq!(names.get_id(&japanese).unwrap(), &id); } #[test] fn errors_on_bad_translation() { let mut names = Names::new(); let data = "test".to_string(); let alt = "テスト".to_string(); let english = Name::english(data.clone()); let japanese = Name::japanese(alt.clone()); let result = names.add_translation(japanese.clone(), english); match result { Ok(_) => unreachable!("got {:?}, should be invalid id error", result), Err(err) => match err { MTTError::NameNotFound(output) => assert_eq!(output, japanese), _ => unreachable!("got {:?}, should have been invalid id", err), }, } } #[test] fn errors_on_translation_duplicates() { let mut names = Names::new(); let data = "test".to_string(); let alt = "テスト".to_string(); let english = Name::english(data.clone()); let japanese = Name::japanese(alt.clone()); let id = names.add_name(english.clone()).unwrap(); let id = names.add_name(japanese.clone()).unwrap(); let result = names.add_translation(english, japanese.clone()); match result { Ok(_) => unreachable!( "got {:?}, should have produced duplicate name error", result ), Err(err) => match err { MTTError::NameDuplicate(result) => assert_eq!(result, japanese), _ => unreachable!("got {:?}, should have been duplicate name", err), }, } } } #[derive(Clone, Debug)] enum RegMsg { AddRoute(RouteRequest), DocName(Name), Error(MTTError), Ok, } #[derive(Clone, Debug)] struct Register { msg: RegMsg, sender_id: Uuid, } impl Register { fn new(sender_id: Uuid, reg_msg: RegMsg) -> Self { Self { msg: reg_msg, sender_id: sender_id, } } fn get_msg(&self) -> &RegMsg { &self.msg } fn get_sender_id(&self) -> &Uuid { &self.sender_id } fn response(&self, reg_msg: RegMsg) -> Self { Self { msg: reg_msg, sender_id: self.sender_id.clone(), } } } #[derive(Clone, Debug, PartialEq)] struct Route { action: Include, doc_type: Include, msg_id: Include, } impl Route { fn new(msg_id: Include, doc: Include, action: Include) -> Self { Self { action: action, doc_type: doc, msg_id: msg_id, } } } impl From for Route { fn from(value: RouteID) -> Self { Self { action: match value.action { Some(data) => Include::Some(data.clone()), None => Include::All, }, doc_type: match value.doc_type { Some(doc) => Include::Some(doc.clone()), None => Include::All, }, msg_id: match value.msg_id { Some(msg) => Include::Some(msg.clone()), None => Include::All, }, } } } impl From<&RouteID> for Route { fn from(value: &RouteID) -> Self { Self { action: match &value.action { Some(data) => Include::Some(data.clone()), None => Include::All, }, doc_type: match &value.doc_type { Some(doc) => Include::Some(doc.clone()), None => Include::All, }, msg_id: match &value.msg_id { Some(msg) => Include::Some(msg.clone()), None => Include::All, }, } } } #[cfg(test)] mod routes { use super::*; #[test] fn can_a_route_set_action() { let actions = [Action::Query, Action::Reply]; for action in actions.into_iter() { let route = Route::new(Include::All, Include::All, Include::Some(action.clone())); match route.msg_id { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.doc_type { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.action { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, action), } } } #[test] fn can_route_set_document_by_name() { let doc_id = Uuid::new_v4(); let route = Route::new(Include::All, Include::Some(doc_id.clone()), Include::All); match route.msg_id { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.doc_type { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, doc_id), } match route.action { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } } #[test] fn can_route_set_document_by_id() { let id = Uuid::new_v4(); let route = Route::new(Include::All, Include::Some(id.clone()), Include::All); match route.msg_id { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.doc_type { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, id), } match route.action { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } } #[test] fn can_route_be_set_by_message_id() { let id = Uuid::new_v4(); let route = Route::new(Include::Some(id.clone()), Include::All, Include::All); match route.msg_id { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, id), } match route.doc_type { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.action { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } } } #[derive(Clone, Debug, Eq, Hash, PartialEq)] struct RouteRequest { msg_id: Include, doc_name: Include, action: Include, } impl RouteRequest { fn new(msg_id: Include, doc_name: Include, action: Include) -> Self { Self { msg_id: msg_id, doc_name: doc_name, action: action, } } } struct QueueData { senders: HashMap>, names: HashMap, routes: HashMap>, } impl QueueData { fn new() -> Self { Self { senders: HashMap::new(), names: HashMap::new(), routes: HashMap::new(), } } fn get_doc_id(&self, nameid: N) -> Result where N: Into, { let sender_id = match nameid.into() { NameID::Name(name) => match self.names.get(&name) { Some(id) => id.clone(), None => return Err(MTTError::DocumentNotFound(name.clone())), }, NameID::ID(id) => id.clone(), NameID::None => unreachable!("should never be none"), }; if self.senders.contains_key(&sender_id) { Ok(sender_id) } else { Err(MTTError::DocumentNotFound(sender_id.to_string())) } } fn register( &mut self, tx: Sender, name: String, routes: Vec, ) -> Result<(), MTTError> { let mut id = Uuid::new_v4(); while self.senders.contains_key(&id) { id = Uuid::new_v4(); } match self.get_doc_id(name.clone()) { Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)), Err(_) => {} } let mut holder: HashMap> = HashMap::new(); for route in routes.iter() { let doc_type = match &route.doc_name { Include::Some(doc_name) => { if doc_name == &name { Include::Some(id.clone()) } else { match self.get_doc_id(doc_name.to_string()) { Ok(doc_id) => Include::Some(doc_id), Err(err) => return Err(err), } } } Include::All => Include::All, }; let route_id: RouteID = Route::new(route.msg_id.clone(), doc_type, route.action.clone()).into(); match self.routes.get(&route_id) { Some(senders) => { let mut addition = senders.clone(); addition.push(id.clone()); holder.insert(route_id, addition); } None => { let senders = [id.clone()].to_vec(); holder.insert(route_id, senders); } } } self.senders.insert(id.clone(), tx); self.names.insert(name.clone(), id.clone()); for (route_id, senders) in holder.iter() { self.routes.insert(route_id.clone(), senders.clone()); } Ok(()) } fn send(&self, msg: Message) -> Result<(), MTTError> { let doc_id: Include = match self.get_doc_id(msg.get_document_id()) { Ok(id) => Include::Some(id.clone()), Err(err) => { let action: Action = msg.get_action().into(); if action == Action::Create { Include::Some(Uuid::nil()) } else { return Err(err); } } }; let route = Route::new( Include::Some(msg.get_message_id().clone()), doc_id, Include::Some(msg.get_action().into()), ); for (send_route, send_ids) in self.routes.iter() { if route == send_route.into() { for send_id in send_ids { let tx = self.senders.get(&send_id).unwrap(); tx.send(msg.clone()).unwrap(); } } } Ok(()) } } #[cfg(test)] mod queuedatas { use super::support_test::TIMEOUT; use super::*; use std::sync::mpsc::RecvTimeoutError; #[test] fn can_document_be_registered() { let mut queuedata = QueueData::new(); let (tx, rx) = channel(); let name = Uuid::new_v4().to_string(); let routes = [ RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Query), ), RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Reply), ), ] .to_vec(); queuedata.register(tx, name.clone(), routes).unwrap(); let msg1 = Message::new(name.clone(), MsgAction::Query(Query::new())); let msg2 = Message::new(name.clone(), MsgAction::Reply(Reply::new())); let msg3 = Message::new(name.clone(), MsgAction::Create(DocDef::new())); queuedata.send(msg1.clone()).unwrap(); queuedata.send(msg2.clone()).unwrap(); queuedata.send(msg3.clone()).unwrap(); let result1 = rx.recv_timeout(TIMEOUT).unwrap(); let result2 = rx.recv_timeout(TIMEOUT).unwrap(); match rx.recv_timeout(TIMEOUT) { Ok(_) => unreachable!("should have timed out"), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } assert_eq!(result1.get_message_id(), msg1.get_message_id()); assert_eq!(result2.get_message_id(), msg2.get_message_id()); match result1.get_action() { MsgAction::Query(_) => {} _ => unreachable!("should have been a query"), } match result2.get_action() { MsgAction::Reply(_) => {} _ => unreachable!("should have been a query"), } } #[test] fn does_register_fail_on_duplicate_documents() { let mut queuedata = QueueData::new(); let (tx1, _) = channel(); let (tx2, _) = channel(); let name = Uuid::new_v4().to_string(); queuedata .register(tx1, name.to_string(), Vec::new()) .unwrap(); match queuedata.register(tx2, name.to_string(), Vec::new()) { Ok(_) => unreachable!("duplicates should create an error"), Err(err) => match err { MTTError::DocumentAlreadyExists(result) => assert_eq!(result, name), _ => unreachable!("should have been document does not exists"), }, } } #[test] fn does_bad_route_prevent_registration() { let mut queuedata = QueueData::new(); let (tx, rx) = channel(); let good = "good"; let bad = Uuid::new_v4().to_string(); let routes = [ RouteRequest::new( Include::All, Include::Some(good.to_string()), Include::Some(Action::Query), ), RouteRequest::new( Include::All, Include::Some(bad.clone()), Include::Some(Action::Reply), ), ] .to_vec(); match queuedata.register(tx, good.to_string(), routes) { Ok(_) => unreachable!("should produce an error"), Err(err) => match err { MTTError::DocumentNotFound(result) => assert_eq!(result, bad), _ => unreachable!("Shouuld be document not found"), }, } assert_eq!(queuedata.senders.len(), 0, "should not add to senders"); assert_eq!(queuedata.names.len(), 0, "should not add to names"); assert_eq!(queuedata.routes.len(), 0, "should nor add to routes"); } #[test] fn is_sender_only_added_once_to_routes() { let mut queuedata = QueueData::new(); let (tx, rx) = channel(); let name = "project"; let routes = [ RouteRequest::new( Include::All, Include::Some(name.to_string()), Include::Some(Action::Query), ), RouteRequest::new( Include::All, Include::Some(name.to_string()), Include::Some(Action::Query), ), ] .to_vec(); queuedata.register(tx, name.to_string(), routes).unwrap(); for senders in queuedata.routes.values() { assert_eq!(senders.len(), 1, "should be no double entries"); } } #[test] fn does_a_bad_document_name_fail() { let docname = Uuid::new_v4().to_string(); let queuedata = QueueData::new(); let msg = Message::new(docname.clone(), MsgAction::Query(Query::new())); match queuedata.send(msg) { Ok(_) => unreachable!("should have been an error"), Err(data) => match data { MTTError::DocumentNotFound(doc) => assert_eq!(doc, docname), _ => unreachable!("should have been a not found error"), }, } } #[test] fn is_send_okay_if_no_one_is_listening() { let mut queuedata = QueueData::new(); let name = "something"; let (tx, _) = channel(); queuedata .register(tx, name.to_string(), Vec::new()) .unwrap(); let msg = Message::new("something", MsgAction::Create(DocDef::new())); match queuedata.send(msg) { Ok(_) => {} Err(err) => unreachable!("got {:?}: should not error", err), } } #[test] fn can_more_than_one_document_respond() { let mut queuedata = QueueData::new(); let name1 = "task"; let name2 = "work"; let action = MsgAction::Query(Query::new()); let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); let routes = [RouteRequest::new( Include::All, Include::Some(name1.to_string()), Include::All, )] .to_vec(); queuedata .register(tx1, name1.to_string(), routes.clone()) .unwrap(); queuedata .register(tx2, name2.to_string(), routes.clone()) .unwrap(); let msg = Message::new(name1, action.clone()); queuedata.send(msg.clone()).unwrap(); let result1 = rx1.recv_timeout(TIMEOUT).unwrap(); let result2 = rx2.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result1.get_message_id(), result2.get_message_id()); } } struct DocRegistry { doc_names: Vec, queue: Queue, receiver: Receiver, routes: HashMap, } impl DocRegistry { fn new(queue: Queue, rx: Receiver) -> Self { Self { doc_names: Vec::new(), queue: queue, receiver: rx, routes: HashMap::new(), } } fn start(queue: Queue, rx: Receiver) { let mut doc_names = DocRegistry::new(queue, rx); spawn(move || { doc_names.listen(); }); } fn listen(&mut self) { loop { let msg = self.receiver.recv().unwrap(); match msg.get_action() { MsgAction::Register(data) => { let id = data.get_sender_id(); let reply = msg.response(self.register_action(data)); self.queue.forward(id, reply); } _ => {} } } } fn register_action(&mut self, reg: &Register) -> Register { match reg.get_msg() { RegMsg::DocName(name) => { if self.doc_names.contains(name) { reg.response(RegMsg::Error(MTTError::DocumentAlreadyExists( name.to_string(), ))) } else { self.doc_names.push(name.clone()); reg.response(RegMsg::Ok) } } RegMsg::AddRoute(route) => { self.routes .insert(route.clone(), reg.get_sender_id().clone()); reg.response(RegMsg::Ok) } _ => reg.response(RegMsg::Ok), } } } struct Router { doc_registry: Sender, senders: HashMap>, } impl Router { fn new(tx: Sender) -> Self { Self { doc_registry: tx, senders: HashMap::new(), } } fn add_sender(&mut self, sender: Sender) -> Uuid { let mut id = Uuid::new_v4(); while self.senders.contains_key(&id) { id = Uuid::new_v4(); } self.senders.insert(id.clone(), sender); id } fn forward(&self, id: &Uuid, msg: Message) { self.senders.get(id).unwrap().send(msg).unwrap(); } fn send(&self, msg: Message) { self.doc_registry.send(msg).unwrap(); } } #[cfg(test)] mod routers { use super::{support_test::TIMEOUT, *}; #[test] fn can_pass_message() { let (tx, rx) = channel(); let router = Router::new(tx); let msg = Message::new("task", Query::new()); router.send(msg.clone()); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); } #[test] fn can_forward_message() { let (tx, _) = channel(); let mut router = Router::new(tx); let (sender, receiver) = channel(); let id = router.add_sender(sender); let msg = Message::new("wiki", Query::new()); router.forward(&id, msg.clone()); let result = receiver.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); } #[test] fn sender_ids_are_unique() { let (tx, _) = channel(); let mut router = Router::new(tx); let count = 10; let mut holder: HashSet = HashSet::new(); for _ in 0..count { let (tx, _) = channel(); holder.insert(router.add_sender(tx)); } assert_eq!(holder.len(), count, "had duplicate keys"); } } #[derive(Clone)] struct Queue { router: Arc>, // // // queue_data: Arc>, } impl Queue { fn new() -> Self { let (tx, rx) = channel(); let output = Self { router: Arc::new(RwLock::new(Router::new(tx))), // // // queue_data: Arc::new(RwLock::new(QueueData::new())), }; DocRegistry::start(output.clone(), rx); output } fn add_sender(&mut self, sender: Sender) -> Uuid { let mut router = self.router.write().unwrap(); router.add_sender(sender) } fn forward(&self, id: &Uuid, msg: Message) { let router = self.router.read().unwrap(); router.forward(id, msg); } fn send(&self, msg: Message) -> Result<(), MTTError> { let router = self.router.read().unwrap(); router.send(msg.clone()); // // // if msg.get_document_id().is_none() { Ok(()) } else { let queuedata = self.queue_data.read().unwrap(); queuedata.send(msg) } } // // // fn register( &mut self, tx: Sender, name: String, routes: Vec, ) -> Result<(), MTTError> { let mut queuedata = self.queue_data.write().unwrap(); queuedata.register(tx, name, routes) } } #[cfg(test)] mod queues { use super::{support_test::TIMEOUT, *}; use std::sync::mpsc::RecvTimeoutError; struct TestQueue { sender_id: Uuid, queue: Queue, receiver: Receiver, doc_id: HashMap, doc_rx: HashMap>, } impl TestQueue { fn new() -> Self { let mut queue = Queue::new(); let (tx, rx) = channel(); let id = queue.add_sender(tx); Self { sender_id: id, queue: queue, receiver: rx, doc_id: HashMap::new(), doc_rx: HashMap::new(), } } fn add_document(&mut self, name: String) { let (tx, rx) = channel(); let id = self.add_sender(tx); let reg_msg = Register::new(id.clone(), RegMsg::DocName(Name::english(name.clone()))); let msg = Message::new(NameID::None, reg_msg); self.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); self.doc_id.insert(name.clone(), id); self.doc_rx.insert(name.clone(), rx); } fn get_preset_id(&self) -> &Uuid { &self.sender_id } fn get_preset_rx(&self) -> &Receiver { &self.receiver } fn get_doc_rx_id(&self, name: &str) -> &Uuid { self.doc_id.get(name).unwrap() } fn get_doc_rx(&self, name: &str) -> &Receiver { self.doc_rx.get(name).unwrap() } fn add_sender(&mut self, sender: Sender) -> Uuid { self.queue.add_sender(sender) } fn forward(&self, id: &Uuid, msg: Message) { self.queue.forward(id, msg); } fn send(&self, msg: Message) -> Result<(), MTTError> { self.queue.send(msg) } } #[test] fn can_forward_message() { let mut queue = TestQueue::new(); let msg = Message::new("wiki", Query::new()); queue.forward(queue.get_preset_id(), msg.clone()); let result = queue.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); } #[test] fn sender_ids_are_unique() { let mut queue = Queue::new(); let count = 10; let mut holder: HashSet = HashSet::new(); for _ in 0..count { let (tx, _) = channel(); holder.insert(queue.add_sender(tx)); } assert_eq!(holder.len(), count, "had duplicate keys"); } #[test] fn can_register_document_name() { let mut queue = TestQueue::new(); let doc_name = Name::english(Uuid::new_v4().to_string()); let reg_msg = Register::new( queue.get_preset_id().clone(), RegMsg::DocName(doc_name.clone()), ); let msg = Message::new(NameID::None, reg_msg); queue.send(msg.clone()).unwrap(); let result = queue.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); let action = result.get_action(); match action { MsgAction::Register(data) => match data.get_msg() { RegMsg::Ok => {} _ => unreachable!("got {:?}, should have been register ok", action), }, _ => unreachable!("got {:?}, should have been register ok", action), } } #[test] fn errors_on_duplicate_names() { let mut queue = TestQueue::new(); let receiver = queue.get_preset_rx(); let doc_name = Name::english(Uuid::new_v4().to_string()); let reg_msg = Register::new( queue.get_preset_id().clone(), RegMsg::DocName(doc_name.clone()), ); let msg = Message::new(NameID::None, reg_msg.clone()); queue.send(msg.clone()).unwrap(); receiver.recv_timeout(TIMEOUT).unwrap(); let msg2 = Message::new(NameID::None, reg_msg.clone()); queue.send(msg.clone()).unwrap(); let result = receiver.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); let action = result.get_action(); match action { MsgAction::Register(data) => match data.get_msg() { RegMsg::Error(err) => match err { MTTError::DocumentAlreadyExists(name) => { assert_eq!(name.to_string(), doc_name.to_string()) } _ => unreachable!("got {:?}, should have been duplicate error", err), }, _ => unreachable!("got {:?}, should have been error", data), }, _ => unreachable!("got {:?}, should have been register ok", action), } } #[test] #[ignore] fn can_register_routes() { let mut queue = TestQueue::new(); let names = ["task", "recipe"]; for name in names.iter() { queue.add_document(name.to_string()); } let route_req = RouteRequest::new(Include::All, Include::All, Include::All); let reg_msg = RegMsg::AddRoute(route_req); let reg = Register::new(queue.get_doc_rx_id(names[0]).clone(), reg_msg); let msg = Message::new(NameID::None, reg); queue.send(msg).unwrap(); queue.get_doc_rx(names[0]).recv_timeout(TIMEOUT).unwrap(); let msg = Message::new(NameID::None, Query::new()); queue.send(msg.clone()).unwrap(); let result = queue.get_doc_rx(names[0]).recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); match queue.get_doc_rx(names[1]).recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } } struct CreateDoc { queue: Queue, rx: Receiver, } impl CreateDoc { fn new(queue: Queue, rx: Receiver) -> Self { Self { queue: queue, rx: rx, } } fn start(mut queue: Queue) { let (tx, rx) = channel(); let routes = [RouteRequest::new( Include::All, Include::All, Include::Some(Action::Create), )] .to_vec(); let id = queue.register(tx, "document".to_string(), routes).unwrap(); let doc = CreateDoc::new(queue, rx); spawn(move || { doc.listen(); }); } fn listen(&self) { loop { let msg = self.rx.recv().unwrap(); DocumentFile::start(self.queue.clone(), msg); } } } #[derive(Clone, Debug, PartialEq)] enum FieldType { Boolean, DateTime, Duration, Integer, None, StaticString, Uuid, } impl FieldType { fn get_default(&self) -> Field { match self { FieldType::Boolean => false.into(), FieldType::DateTime => Utc::now().into(), FieldType::Duration => Duration::from_secs(0).into(), FieldType::Integer => 0.into(), FieldType::None => Field::None, FieldType::StaticString => "".into(), FieldType::Uuid => Uuid::new_v4().into(), } } } impl From<&Field> for FieldType { fn from(value: &Field) -> Self { match value { Field::Boolean(_) => Self::Boolean, Field::DateTime(_) => Self::DateTime, Field::Duration(_) => Self::Duration, Field::Integer(_) => Self::Integer, Field::None => Self::None, Field::StaticString(_) => Self::StaticString, Field::Uuid(_) => Self::Uuid, } } } #[cfg(test)] mod fieldtypes { use super::*; #[test] fn can_get_defaults_for_uuid() { let ftype = FieldType::Uuid; let mut ids: Vec = Vec::new(); for _ in 0..5 { let result = ftype.get_default(); match result { Field::Uuid(data) => { assert!( !ids.contains(&data), "found duplicate id {:?} in {:?}", data, ids ); ids.push(data.clone()); } _ => unreachable!("got {:?}: should have been uuid", result), } } } #[test] fn can_get_defaults_for_static_string() { let ftype = FieldType::StaticString; let result = ftype.get_default(); match result { Field::StaticString(data) => assert_eq!(data, ""), _ => unreachable!("got {:?}: should have been static string", result), } } } #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum Field { Boolean(bool), DateTime(DateTime), Duration(Duration), Integer(i128), None, StaticString(String), Uuid(Uuid), } impl Field { fn get_type(&self) -> FieldType { self.into() } } impl From for Field { fn from(value: bool) -> Self { Self::Boolean(value) } } impl From> for Field { fn from(value: DateTime) -> Self { Self::DateTime(value) } } impl From for Field { fn from(value: Duration) -> Self { Self::Duration(value) } } impl From for Field { fn from(value: String) -> Self { Self::StaticString(value) } } impl From<&str> for Field { fn from(value: &str) -> Self { Self::from(value.to_string()) } } impl From for Field { fn from(value: Uuid) -> Self { Self::Uuid(value) } } impl From for Field { fn from(value: i128) -> Self { Self::Integer(value) } } #[cfg(test)] mod fields { use super::*; #[test] fn can_create_static_string() { let data = Uuid::new_v4().to_string(); let result: Field = data.clone().into(); match result.clone() { Field::StaticString(output) => assert_eq!(output, data), _ => unreachable!("got {:?}: should have been static string", result), } assert_eq!(result.get_type(), FieldType::StaticString); } #[test] fn can_create_from_str() { let holder = ["one", "two"]; for data in holder.into_iter() { let result: Field = data.into(); match result.clone() { Field::StaticString(output) => assert_eq!(output, data), _ => unreachable!("got {:?}: should have been static string", result), } assert_eq!(result.get_type(), FieldType::StaticString); } } #[test] fn create_from_uuid() { let data = Uuid::new_v4(); let result: Field = data.clone().into(); match result.clone() { Field::Uuid(output) => assert_eq!(output, data), _ => unreachable!("got {:?}: should have been uuid", result), } assert_eq!(result.get_type(), FieldType::Uuid); } #[test] fn create_from_datatime() { let data = Utc::now(); let result: Field = data.into(); match result.clone() { Field::DateTime(output) => assert_eq!(output, data), _ => unreachable!("got {:?}: should have been uuid", result), } assert_eq!(result.get_type(), FieldType::DateTime); } } #[derive(Clone, Debug)] struct FieldSetting { fieldtype: FieldType, default_value: Option, } impl FieldSetting { fn new(ftype: FieldType) -> Self { Self { fieldtype: ftype, default_value: None, } } fn set_default(&mut self, value: Calculation) -> Result<(), MTTError> { let data = value.calculate(); match self.validate(Some(data)) { Ok(_) => {} Err(err) => return Err(err), } self.default_value = Some(value); Ok(()) } fn validate(&self, value: Option) -> Result { match value { Some(data) => { let vft: FieldType = (&data).into(); if vft != self.fieldtype { return Err(MTTError::DocumentFieldWrongDataType( self.fieldtype.clone(), vft, )); } Ok(data.clone()) } None => match &self.default_value { Some(calc) => Ok(calc.calculate()), None => Err(MTTError::DocumentFieldMissing("".to_string())), }, } } } #[cfg(test)] mod fieldsettings { use super::*; #[test] fn validates_field_type() { let fset = FieldSetting::new(FieldType::Uuid); let value: Field = Uuid::new_v4().into(); match fset.validate(Some(value.clone())) { Ok(data) => assert_eq!(data, value), Err(err) => unreachable!("got {:?}: should have gotten a value", err), } } #[test] fn validates_for_bad_field_type() { let fset = FieldSetting::new(FieldType::Uuid); let value: Field = "text".into(); match fset.validate(Some(value)) { Ok(data) => unreachable!("got {:?}: should have gotten an error", data), Err(err) => match err { MTTError::DocumentFieldWrongDataType(expected, got) => { assert_eq!(expected, FieldType::Uuid); assert_eq!(got, FieldType::StaticString); } _ => unreachable!("got {:?}: should have gotten a value", err), }, } } #[test] fn no_default_returns_error() { let fset = FieldSetting::new(FieldType::Uuid); match fset.validate(None) { Ok(data) => unreachable!("got {:?}: should have gotten an error", data), Err(err) => match err { MTTError::DocumentFieldMissing(data) => assert_eq!(data, ""), _ => unreachable!("got {:?}: should have gotten a value", err), }, } } #[test] fn returns_value_if_default_is_set() { let mut fset = FieldSetting::new(FieldType::StaticString); let mut calc = Calculation::new(Operand::Assign); calc.add_value(FieldType::StaticString); fset.set_default(calc); match fset.validate(None) { Ok(data) => assert_eq!(data, "".into()), Err(err) => unreachable!("got {:?}: should have gotten a value", err), } } #[test] fn returns_default_value() { let mut fset = FieldSetting::new(FieldType::StaticString); let input = "fred"; let mut calc = Calculation::new(Operand::Assign); calc.add_value(input); fset.set_default(calc); match fset.validate(None) { Ok(data) => assert_eq!(data, input.into()), Err(err) => unreachable!("got {:?}: should have gotten a value", err), } } #[test] fn can_default_be_calculated() { let mut fset = FieldSetting::new(FieldType::DateTime); let duration = Duration::from_secs(3600); let mut calc = Calculation::new(Operand::Add); calc.add_value(FieldType::DateTime); calc.add_value(duration); fset.set_default(calc); let start = Utc::now() + duration; let result = match fset.validate(None).unwrap() { Field::DateTime(data) => data, _ => unreachable!("should return datetime"), }; let stop = Utc::now() + duration; assert!( result > start, "{:?} should have been greater than {:?}", result, start ); assert!( result < stop, "{:?} should have been less than {:?}", result, stop ); } } #[derive(Clone, Debug)] struct Addition { data: Document, } impl Addition { fn new() -> Self { Self { data: Document::new(), } } fn add_field(&mut self, name: String, field: CV) where CV: Into, { self.data.add_field(name, field); } fn get_field(&self, name: &str) -> Option { self.data.get_field(name) } fn get_document(&self) -> Document { self.data.clone() } } #[cfg(test)] mod additions { use super::*; #[test] fn can_add_static_string() { let mut add = Addition::new(); let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4().to_string(); add.add_field(name.clone(), data.clone()); let result = add.get_field(&name).unwrap(); match result { Field::StaticString(result) => assert_eq!(result, data), _ => unreachable!("got {:?}: should have received static string", result), } } fn can_add_uuid() { let mut add = Addition::new(); let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4(); add.add_field(name.clone(), data.clone()); let result = add.get_field(&name).unwrap(); match result { Field::Uuid(result) => assert_eq!(result, data), _ => unreachable!("got {:?}: should have received uuid", result), } } fn can_get_document() { let mut add = Addition::new(); let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4(); add.add_field(name.clone(), data.clone()); let doc: Document = add.get_document(); match doc.get_field(&name).unwrap() { Field::Uuid(result) => assert_eq!(result, data), _ => unreachable!("should have received uuid"), } } } #[derive(Clone, Debug)] enum IndexType { Index, Unique, } impl IndexType { fn create_index(&self) -> Index { match self { Self::Index => Index::new(), Self::Unique => Index::new_unique(), } } } #[derive(Clone, Debug)] struct DocDef { fields: HashMap, indexes: HashMap, } impl DocDef { fn new() -> Self { Self { fields: HashMap::new(), indexes: HashMap::new(), } } fn add_field(&mut self, name: String, ftype: FieldType) { self.fields.insert(name, FieldSetting::new(ftype)); } fn get_field(&self, name: &str) -> Result<&FieldSetting, MTTError> { match self.fields.get(name) { Some(data) => Ok(data), None => Err(MTTError::DocumentFieldNotFound(name.to_string())), } } fn get_field_mut(&mut self, field_name: &str) -> Result<&mut FieldSetting, MTTError> { match self.fields.get_mut(field_name) { Some(data) => Ok(data), None => return Err(MTTError::DocumentFieldNotFound(field_name.to_string())), } } fn field_ids(&self) -> HashSet<&String> { self.fields.keys().collect::>() //self.fields.keys().cloned().collect::>() } fn validate(&self, field_name: &str, value: Option) -> Result { let setting = match self.get_field(field_name) { Ok(data) => data, Err(err) => return Err(err), }; setting.validate(value) } fn set_default(&mut self, field_name: &str, value: Calculation) -> Result<(), MTTError> { let setting = match self.get_field_mut(field_name) { Ok(data) => data, Err(err) => return Err(err), }; match setting.set_default(value) { Ok(_) => Ok(()), Err(err) => Err(err), } } fn add_index(&mut self, field_name: String, index_type: IndexType) -> Result<(), MTTError> { let setting = match self.get_field(&field_name) { Ok(_) => {} Err(err) => return Err(err), }; self.indexes.insert(field_name, index_type); Ok(()) } fn create_indexes(&self) -> Indexes { Indexes::new(&self.indexes) } fn iter(&self) -> impl Iterator { self.fields.iter() } } #[cfg(test)] mod docdefs { use super::*; #[test] fn can_field_be_added() { let mut docdef = DocDef::new(); let name = Uuid::new_v4().to_string(); let field_type = FieldType::Uuid; docdef.add_field(name.clone(), field_type.clone()); let result = docdef.get_field(name.as_str()).unwrap(); match result.validate(Some(Uuid::new_v4().into())) { Ok(_) => {} Err(err) => unreachable!("got {:?}: should have been a value", err), } } #[test] fn produces_error_for_bad_fields() { let docdef = DocDef::new(); let name = Uuid::new_v4().to_string(); match docdef.get_field(name.as_str()) { Ok(_) => unreachable!("should return non existant field error"), Err(err) => match err { MTTError::DocumentFieldNotFound(data) => assert_eq!(data, name), _ => unreachable!("got {:?}: should have been document field not found", err), }, } } #[test] fn can_multiple_fields_be_added() { let mut docdef = DocDef::new(); let names = ["one", "two", "three"]; let field_type = FieldType::StaticString; for name in names.iter() { docdef.add_field(name.to_string(), field_type.clone()); } for name in names.iter() { let result = docdef.get_field(name).unwrap(); match result.validate(Some("".into())) { Ok(_) => {} Err(err) => unreachable!("got {:?}: should have been a value", err), } } } #[test] fn can_change_field_default_to_function() { let mut docdef = DocDef::new(); let name = "defaultfunction"; docdef.add_field(name.to_string(), FieldType::StaticString); let mut calc = Calculation::new(Operand::Assign); calc.add_value(FieldType::StaticString); docdef.set_default(name, calc); match docdef.get_field(name).unwrap().validate(None) { Ok(data) => match data { Field::StaticString(result) => assert_eq!(result, ""), _ => unreachable!("got {:?}: should return a static string", data), }, Err(err) => unreachable!("got {:?}: should return a value", err), } } #[test] fn does_set_default_function_error_on_bad_field_name() { let mut docdef = DocDef::new(); let field_name = Uuid::new_v4().to_string(); let calc = Calculation::new(Operand::Assign); match docdef.set_default(field_name.as_str(), calc) { Ok(_) => unreachable!("should be an error"), Err(err) => match err { MTTError::DocumentFieldNotFound(data) => assert_eq!(data, field_name), _ => unreachable!("got {:?}: should have been field not found", err), }, } } #[test] fn does_set_default_value_error_on_bad_field_name() { let mut docdef = DocDef::new(); let field_name = Uuid::new_v4().to_string(); let mut calc = Calculation::new(Operand::Assign); calc.add_value(Uuid::new_v4()); match docdef.set_default(field_name.as_str(), calc) { Ok(_) => unreachable!("should be an error"), Err(err) => match err { MTTError::DocumentFieldNotFound(data) => assert_eq!(data, field_name), _ => unreachable!("got {:?}: should have been field not found", err), }, } } #[test] fn does_set_default_value_error_on_bad_field_type() { let mut docdef = DocDef::new(); let name = "defaultvalue"; docdef.add_field(name.to_string(), FieldType::Uuid); let mut calc = Calculation::new(Operand::Assign); calc.add_value("fred"); match docdef.set_default(name, calc) { Ok(data) => unreachable!("got {:?}, should be an error", data), Err(err) => match err { MTTError::DocumentFieldWrongDataType(expected, got) => { assert_eq!(expected, FieldType::Uuid); assert_eq!(got, FieldType::StaticString); } _ => unreachable!("got {:?}: should have been field not found", err), }, } } #[test] fn returns_field_ids() { let count = 5; let mut ids: HashSet = HashSet::new(); while ids.len() < count { ids.insert(Uuid::new_v4().to_string()); } let mut docdef = DocDef::new(); for id in ids.iter() { docdef.add_field(id.clone(), FieldType::Uuid); } let result = docdef.field_ids(); assert_eq!(result.len(), ids.len()); for id in result.iter() { assert!(ids.contains(id.clone())); } } } #[derive(Clone, Debug)] enum Operand { Add, Assign, Equal, } impl Operand { fn validate(&self, x: &Field, y: &Field) -> bool { match self { Self::Equal => x == y, _ => false, } } } #[cfg(test)] mod operands { use super::*; #[test] fn equals_true() { let data: Field = Uuid::new_v4().into(); assert!(Operand::Equal.validate(&data, &data)); } #[test] fn equals_false() { let x: Field = Uuid::new_v4().into(); let mut y: Field = Uuid::new_v4().into(); while x == y { y = Uuid::new_v4().into(); } assert!(!Operand::Equal.validate(&x, &y)); } } #[derive(Clone, Debug)] enum CalcValue { Calculate(Calculation), FType(FieldType), Value(Field), } impl CalcValue { fn get(&self) -> Field { match self { Self::FType(ftype) => ftype.get_default(), Self::Value(field) => field.clone(), Self::Calculate(calc) => calc.calculate(), } } } impl From for CalcValue { fn from(value: Calculation) -> Self { Self::Calculate(value) } } impl From for CalcValue { fn from(value: Field) -> Self { Self::Value(value) } } impl From for CalcValue { fn from(value: FieldType) -> Self { Self::FType(value) } } impl From for CalcValue { fn from(value: bool) -> Self { let output: Field = value.into(); Self::from(output).into() } } impl From> for CalcValue { fn from(value: DateTime) -> Self { let output: Field = value.into(); Self::from(output).into() } } impl From for CalcValue { fn from(value: Duration) -> Self { let output: Field = value.into(); Self::from(output).into() } } impl From for CalcValue { fn from(value: i128) -> Self { let output: Field = value.into(); Self::from(output).into() } } impl From<&str> for CalcValue { fn from(value: &str) -> Self { let output: Field = value.into(); Self::from(output).into() } } impl From for CalcValue { fn from(value: String) -> Self { let output: Field = value.into(); Self::from(output).into() } } impl From for CalcValue { fn from(value: Uuid) -> Self { let output: Field = value.into(); Self::from(output).into() } } #[cfg(test)] mod calcvalues { use super::*; #[test] fn from_uuid() { let value = Uuid::new_v4(); let expected: Field = value.into(); let result: CalcValue = value.into(); match result.clone() { CalcValue::Value(data) => assert_eq!(data, expected), _ => unreachable!("got {:?}, should have gotten a field", result), } assert_eq!(result.get(), expected); } #[test] fn from_str() { let value = "something"; let expected: Field = value.into(); let result: CalcValue = value.into(); match result.clone() { CalcValue::Value(data) => assert_eq!(data, expected), _ => unreachable!("got {:?}, should have gotten a field", result), } assert_eq!(result.get(), expected); } #[test] fn from_string() { let value = "data".to_string(); let expected: Field = value.clone().into(); let result: CalcValue = value.into(); match result.clone() { CalcValue::Value(data) => assert_eq!(data, expected), _ => unreachable!("got {:?}, should have gotten a field", result), } assert_eq!(result.get(), expected); } #[test] fn from_boolean() { let value = true; let expected: Field = value.clone().into(); let result: CalcValue = value.into(); match result.clone() { CalcValue::Value(data) => assert_eq!(data, expected), _ => unreachable!("got {:?}, should have gotten a field", result), } assert_eq!(result.get(), expected); } #[test] fn from_datetime() { let value = Utc::now(); let expected: Field = value.clone().into(); let result: CalcValue = value.into(); match result.clone() { CalcValue::Value(data) => assert_eq!(data, expected), _ => unreachable!("got {:?}, should have gotten a field", result), } assert_eq!(result.get(), expected); } #[test] fn from_duration() { let value = Duration::from_secs(5); let expected: Field = value.clone().into(); let result: CalcValue = value.into(); match result.clone() { CalcValue::Value(data) => assert_eq!(data, expected), _ => unreachable!("got {:?}, should have gotten a field", result), } assert_eq!(result.get(), expected); } #[test] fn from_integer() { let value: i128 = 5; let expected: Field = value.clone().into(); let result: CalcValue = value.into(); match result.clone() { CalcValue::Value(data) => assert_eq!(data, expected), _ => unreachable!("got {:?}, should have gotten a field", result), } assert_eq!(result.get(), expected); } #[test] fn from_calculation() { let duration = Duration::from_secs(300); let start = Utc::now() + duration; let mut calc = Calculation::new(Operand::Add); calc.add_value(FieldType::DateTime); calc.add_value(duration.clone()); let result: CalcValue = calc.into(); let data = match result.get() { Field::DateTime(data) => data, _ => unreachable!(), }; let stop = Utc::now() + duration; assert!( data > start && data < stop, "should be about 5 minutes ahead" ); } } #[derive(Clone, Debug)] struct Calculation { operation: Operand, values: Vec, } impl Calculation { fn new(operand: Operand) -> Self { Self { operation: operand, values: Vec::new(), } } fn operation(&self) -> &Operand { &self.operation } fn get_fields(&self) -> Vec { let mut output = Vec::new(); for item in self.values.iter() { output.push(item.get()); } output } fn push_value( &mut self, base: FieldType, ftype: FieldType, data: CalcValue, ) -> Result<(), MTTError> { if base == ftype { self.values.push(data); } else { return Err(MTTError::DocumentFieldWrongDataType(base, ftype)); } Ok(()) } fn add_value(&mut self, data: CV) -> Result<(), MTTError> where CV: Into, { let holder: CalcValue = data.into(); if self.values.len() == 0 { self.values.push(holder); Ok(()) } else { let mut base = self.values[0].get().get_type(); let ftype = holder.get().get_type(); match self.operation { Operand::Add => { if base == FieldType::DateTime { base = FieldType::Duration; } match self.push_value(base, ftype, holder) { Ok(_) => Ok(()), Err(err) => Err(err), } } _ => match self.push_value(base, ftype, holder) { Ok(_) => Ok(()), Err(err) => Err(err), }, } } } fn calculate(&self) -> Field { match self.operation { Operand::Add => { let values = self.get_fields(); match values[0].get_type() { FieldType::DateTime => { let mut output = Utc::now(); for item in values.iter() { match item { Field::DateTime(datetime) => output = datetime.clone(), Field::Duration(duration) => output += duration.clone(), _ => unreachable!("got {:?}, should have been a duration", item), } } output.into() } FieldType::Integer => { let mut output: i128 = 0; for item in values.iter() { match item { Field::Integer(data) => output += data, _ => unreachable! {"got {:?} expected Integer", item}, } } output.into() } _ => unreachable!("{:?} does not handle addition", values[0].get_type()), } } Operand::Assign => self.values[0].get(), Operand::Equal => { self.values[0].get() == self.values[1].get() }.into(), } } } #[cfg(test)] mod calculations { use super::*; use rand::random; #[test] fn errors_on_different_field_types() { let mut calc = Calculation::new(Operand::Equal); calc.add_value(Uuid::nil()).unwrap(); match calc.add_value("other") { Ok(_) => unreachable!("should have errored with wrong type"), Err(err) => match err { MTTError::DocumentFieldWrongDataType(expected, got) => { assert_eq!(expected, FieldType::Uuid); assert_eq!(got, FieldType::StaticString); } _ => unreachable!("got {:?}, expected wrong field type", err), }, } } #[test] fn returns_reference_to_operand() { let calc = Calculation::new(Operand::Assign); match calc.operation() { Operand::Assign => {} _ => unreachable!("got {:?}, shold have gotten assign", calc.operation()), } let calc = Calculation::new(Operand::Equal); match calc.operation() { Operand::Equal => {} _ => unreachable!("got {:?}, shold have gotten assign", calc.operation()), } } #[test] fn can_assign_value() { let mut calc = Calculation::new(Operand::Assign); let data: Field = Uuid::new_v4().into(); calc.add_value(data.clone()); let result = calc.calculate(); assert_eq!(result, data); } #[test] fn can_assign_default_function() { let mut calc = Calculation::new(Operand::Assign); calc.add_value(FieldType::Uuid); let result1 = calc.calculate(); let result2 = calc.calculate(); assert_ne!(result1, result2); } #[test] fn can_equal_true() { let mut calc = Calculation::new(Operand::Equal); let data: Field = Uuid::new_v4().into(); calc.add_value(data.clone()); calc.add_value(data.clone()); let expected: Field = true.into(); let result = calc.calculate(); assert_eq!(result, expected); } #[test] fn can_equal_false() { let mut calc = Calculation::new(Operand::Equal); let value1: Field = "fred".into(); let value2: Field = "barney".into(); calc.add_value(value1); calc.add_value(value2); let expected: Field = false.into(); let result = calc.calculate(); assert_eq!(result, expected); } #[test] fn can_add_numbers() { let mut calc = Calculation::new(Operand::Add); let value1: i128 = random::().into(); let value2: i128 = random::().into(); let expected: Field = { value1 + value2 }.into(); let value1: Field = value1.into(); let value2: Field = value2.into(); calc.add_value(value1); calc.add_value(value2); let result = calc.calculate(); assert_eq!(result, expected); } #[test] fn returns_error_on_mismatch() { let mut calc = Calculation::new(Operand::Add); calc.add_value(Uuid::nil()); match calc.add_value("mismatch") { Ok(_) => unreachable!("should have returned an error"), Err(err) => match err { MTTError::DocumentFieldWrongDataType(expected, got) => { assert_eq!(got, FieldType::StaticString); assert_eq!(expected, FieldType::Uuid); } _ => unreachable!("got {:?}, expected wrong field type", err), }, } } #[test] fn datetime_accepts_duration() { let mut calc = Calculation::new(Operand::Add); let duration = Duration::from_secs(3600); let start = Utc::now() + duration; calc.add_value(FieldType::DateTime).unwrap(); match calc.add_value(duration.clone()) { Ok(_) => {} Err(err) => unreachable!("got {:?}, should have returned normally", err), } let result = calc.calculate(); let stop = Utc::now() + duration; match result { Field::DateTime(data) => { assert!(data > start); assert!(data < stop); } _ => unreachable!("got {:?}, should have been datetime", result), } } } #[derive(Clone, Debug)] struct Operation { field_name: String, operation: Operand, value: Field, } impl Operation { fn new(name: String, op: Operand, value: F) -> Self where F: Into, { Self { field_name: name, operation: op, value: value.into(), } } fn which_field(&self) -> String { self.field_name.clone() } fn validate(&self, field: &Field) -> bool { self.operation.validate(field, &self.value) } } #[derive(Clone, Debug)] struct Query { data: HashMap, } impl Query { fn new() -> Self { Self { data: HashMap::new(), } } fn add(&mut self, name: String, operation: Calculation) -> Result<(), MTTError> { match operation.operation() { Operand::Equal => { self.data.insert(name, operation); Ok(()) } _ => Err(MTTError::QueryCannotChangeData), } } fn get(&self, name: &str) -> Option { match self.data.get(name) { Some(calc) => Some(calc.clone()), None => None, } } fn field_ids(&self) -> HashSet<&String> { self.data.keys().collect::>() //self.data.keys().cloned().collect::>() } } #[cfg(test)] mod queries { use super::*; #[test] fn holds_calculation_to_run_query() { let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4(); let mut bad_data = data.clone(); while bad_data == data { bad_data = Uuid::new_v4(); } let mut query = Query::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(data.clone()); query.add(name.clone(), calc); match query.get(&name) { Some(op) => { let expected: Field = true.into(); let mut holder = op.clone(); holder.add_value(data); assert_eq!(holder.calculate(), expected); } None => unreachable!("should have returned a calculation"), } match query.get(&name) { Some(op) => { let expected: Field = false.into(); let mut holder = op.clone(); holder.add_value(bad_data); assert_eq!(holder.calculate(), expected); } None => unreachable!("should have returned a calculation"), } } #[test] fn does_not_allow_data_changes() { let mut calc = Calculation::new(Operand::Assign); calc.add_value(Uuid::nil()); let mut query = Query::new(); match query.add("name".to_string(), calc) { Ok(_) => unreachable!("Should have received an error"), Err(err) => match err { MTTError::QueryCannotChangeData => {} _ => unreachable!("got {:?}, should have gotten cannot change data", err), }, } } #[test] fn returns_set_of_fields() { let count = 5; let mut field_ids: HashSet = HashSet::new(); while field_ids.len() < count { field_ids.insert(Uuid::new_v4().to_string()); } let mut query = Query::new(); for field_id in field_ids.iter() { query.add(field_id.clone(), Calculation::new(Operand::Equal)); } let result = query.field_ids(); assert_eq!(result.len(), field_ids.len()); for field_id in result.iter() { assert!( field_ids.contains(field_id.clone()), "field id {:?} not found", field_id ); } } } #[derive(Clone, Debug)] struct Reply { data: Vec, } impl Reply { fn new() -> Self { Self { data: Vec::new() } } fn add(&mut self, doc: Document) { self.data.push(doc); } fn len(&self) -> usize { self.data.len() } fn iter(&self) -> impl Iterator { self.data.iter() } } #[cfg(test)] mod replies { use super::*; #[test] fn is_new_empty() { let reply = Reply::new(); assert_eq!(reply.len(), 0, "should have no records"); } #[test] fn can_add_documents() { let mut reply = Reply::new(); let doc = Document::new(); reply.add(doc.clone()); assert_eq!(reply.len(), 1); reply.add(doc.clone()); assert_eq!(reply.len(), 2); } #[test] fn can_retrieve_documents() { let fieldname = "field".to_string(); let mut doc1 = Document::new(); doc1.add_field(fieldname.clone(), "one"); let mut doc2 = Document::new(); doc2.add_field(fieldname.clone(), "two"); let mut reply = Reply::new(); reply.add(doc1); reply.add(doc2); let mut reply_iter = reply.iter(); let mut result1 = reply_iter.next().unwrap(); match result1.get_field(&fieldname).unwrap() { Field::StaticString(output) => assert_eq!(output, "one"), _ => unreachable!("got {:?}: should have been static string", result1), } let result2 = reply_iter.next().unwrap(); match result2.get_field(&fieldname).unwrap() { Field::StaticString(output) => assert_eq!(output, "two"), _ => unreachable!("got {:?}: should have been static string", result2), } match reply_iter.next() { None => {} Some(_) => unreachable!("should be out of data"), } } } #[derive(Clone, Debug)] struct Document { data: HashMap, } impl Document { fn new() -> Self { Self { data: HashMap::new(), } } fn add_field(&mut self, name: String, field: CV) where CV: Into, { self.data.insert(name, field.into()); } fn get_field(&self, name: &str) -> Option { match self.data.get(name) { Some(data) => Some(data.get()), None => None, } } fn get_all(&self) -> Vec<(String, Field)> { let mut output = Vec::new(); for (key, value) in self.data.iter() { output.push((key.clone(), value.get())); } output } fn iter(&self) -> DocIter { DocIter::new(self) } } struct DocIter { storage: Vec<(String, Field)>, } impl DocIter { fn new(doc: &Document) -> Self { Self { storage: doc.get_all(), } } } impl Iterator for DocIter { type Item = (String, Field); fn next(&mut self) -> Option { self.storage.pop() } } #[cfg(test)] mod documents { use super::*; #[test] fn can_add_static_string() { let mut add = Document::new(); let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4().to_string(); add.add_field(name.clone(), data.clone()); let result = add.get_field(&name).unwrap(); match result { Field::StaticString(result) => assert_eq!(result, data), _ => unreachable!("got {:?}: should have received static string", result), } } fn can_add_uuid() { let mut add = Document::new(); let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4(); add.add_field(name.clone(), data.clone()); let result = add.get_field(&name).unwrap(); match result { Field::Uuid(result) => assert_eq!(result, data), _ => unreachable!("got {:?}: should have received uuid", result), } } } #[derive(Clone, Debug)] struct Delete { query: Query, } impl Delete { fn new() -> Self { Self { query: Query::new(), } } fn get_query(&self) -> &Query { &self.query } fn get_query_mut(&mut self) -> &mut Query { &mut self.query } } #[derive(Clone, Debug)] struct Update { query: Query, values: Document, } impl Update { fn new() -> Self { Self { query: Query::new(), values: Document::new(), } } fn get_query(&self) -> &Query { &self.query } fn get_query_mut(&mut self) -> &mut Query { &mut self.query } fn get_values(&self) -> &Document { &self.values } fn get_values_mut(&mut self) -> &mut Document { &mut self.values } } #[derive(Clone, Debug, Eq, Hash, PartialEq)] struct Oid { oid: Uuid, } impl Oid { fn new() -> Self { Self { oid: Uuid::new_v4(), } } } struct Index { data: HashMap>, unique: bool, } impl Index { fn new() -> Self { Self { data: HashMap::new(), unique: false, } } fn new_unique() -> Self { Self { data: HashMap::new(), unique: true, } } fn add(&mut self, field: Field, oid: Oid) -> Result<(), MTTError> { let oids = match self.data.get_mut(&field) { Some(data) => data, None => { self.data.insert(field.clone(), HashSet::new()); self.data.get_mut(&field).unwrap() } }; if self.unique && oids.len() > 0 { return Err(MTTError::FieldDuplicate("".to_string(), field)); } else { oids.insert(oid); } Ok(()) } fn get(&self, spec: &Operation) -> Vec { let mut output = Vec::new(); for (field, oids) in self.data.iter() { if spec.validate(field) { for oid in oids.iter() { output.push(oid.clone()); } } } output } fn pull(&self, calc: &Calculation) -> HashSet { let mut output = HashSet::new(); for (key, value) in self.data.iter() { let mut op = calc.clone(); op.add_value(key.clone()); if op.calculate() == true.into() { output = output.union(&value).cloned().collect(); } } output } fn remove(&mut self, field: &Field, oid: &Oid) { match self.data.get_mut(field) { Some(oids) => { oids.remove(oid); if oids.len() == 0 { self.data.remove(field); } } None => {} }; } fn validate(&self, field: &Field) -> Result<(), MTTError> { if self.unique { match self.data.get(field) { Some(_) => return Err(MTTError::FieldDuplicate("".to_string(), field.clone())), None => {} } } Ok(()) } } struct Indexes { data: HashMap, } impl Indexes { fn new(settings: &HashMap) -> Self { let mut output = HashMap::new(); for (key, value) in settings.iter() { output.insert(key.clone(), value.create_index()); } Self { data: output } } fn index_ids(&self) -> HashSet<&String> { self.data.keys().collect::>() } fn get_index(&self, field_id: &str) -> &Index { self.data.get(field_id).unwrap() } fn pull(&self, field_id: &str, calc: &Calculation) -> HashSet { self.get_index(field_id).pull(calc) } fn add_to_index(&mut self, field_name: &str, field: Field, oid: Oid) { let index = match self.data.get_mut(field_name) { Some(data) => data, None => return, }; index.add(field, oid); } fn remove_from_index(&mut self, field_name: &str, field: &Field, oid: &Oid) { let index = match self.data.get_mut(field_name) { Some(data) => data, None => return, }; index.remove(field, oid); } fn validate(&self, field_name: &str, value: &Field) -> Result<(), MTTError> { match self.data.get(field_name) { Some(index) => match index.validate(value) { Ok(_) => {} Err(err) => return Err(err), }, None => {} } Ok(()) } } #[cfg(test)] mod indexes { use super::*; fn get_fields(count: usize) -> Vec { let mut output = Vec::new(); while output.len() < count { let field: Field = Uuid::new_v4().into(); if !output.contains(&field) { output.push(field); } } output } fn get_oids(count: usize) -> Vec { let mut output = Vec::new(); while output.len() < count { let oid = Oid::new(); if !output.contains(&oid) { output.push(oid); } } output } #[test] fn add_to_index() { let mut index = Index::new(); let count = 3; let fields = get_fields(count); let oids = get_oids(count); for i in 0..count { index.add(fields[i].clone(), oids[i].clone()); } for i in 0..count { let spec = Operation::new("stuff".to_string(), Operand::Equal, fields[i].clone()); let result = index.get(&spec); assert_eq!(result.len(), 1); assert_eq!(result[0], oids[i]); } } #[test] fn index_can_handle_multiple_entries() { let mut index = Index::new(); let count = 3; let fields = get_fields(1); let oids = get_oids(count); for i in 0..count { index.add(fields[0].clone(), oids[i].clone()); } let spec = Operation::new("unimportant".to_string(), Operand::Equal, fields[0].clone()); let result = index.get(&spec); assert_eq!(result.len(), 3); for oid in oids { assert!(result.contains(&oid)); } } #[test] fn can_remove_oid() { let mut index = Index::new(); let count = 3; let pos = 1; let fields = get_fields(1); let oids = get_oids(count); for i in 0..count { index.add(fields[0].clone(), oids[i].clone()); } index.remove(&fields[0], &oids[pos]); let spec = Operation::new("x".to_string(), Operand::Equal, fields[0].clone()); let result = index.get(&spec); assert!(!result.contains(&oids[pos]), "should have removed oid"); } #[test] fn are_empty_indexes_removed() { let mut index = Index::new(); let field: Field = Uuid::new_v4().into(); let oid = Oid::new(); index.add(field.clone(), oid.clone()); index.remove(&field, &oid); assert_eq!(index.data.len(), 0); } #[test] fn do_unique_indexes_error_on_duplicates() { let mut index = Index::new_unique(); let field: Field = "fred".into(); let oids = get_oids(2); index.add(field.clone(), oids[0].clone()).unwrap(); match index.add(field.clone(), oids[0].clone()) { Ok(_) => unreachable!("should have been an error"), Err(err) => match err { MTTError::FieldDuplicate(field_name, value) => { assert_eq!(field_name, ""); assert_eq!(value, field); } _ => unreachable!("got {:?}: should have been duplicate field", err), }, } } #[test] fn index_returns_validate() { let mut index = Index::new(); let field: Field = "stuff".into(); let oid = Oid::new(); index.add(field.clone(), oid).unwrap(); match index.validate(&field) { Ok(_) => {} Err(err) => unreachable!("got {:?}: should have returned without issue", err), } } #[test] fn unique_return_duplicate_error() { let mut index = Index::new_unique(); let field: Field = "fred".into(); let oid = Oid::new(); index.add(field.clone(), oid).unwrap(); match index.validate(&field) { Ok(_) => unreachable!("should have gotten a duplication error"), Err(err) => match err { MTTError::FieldDuplicate(field_name, value) => { assert_eq!(field_name, ""); assert_eq!(value, field); } _ => unreachable!("got {:?}: should have been duplicate field", err), }, } } } struct DocumentFile { docdef: DocDef, docs: HashMap, indexes: Indexes, queue: Queue, rx: Receiver, } impl DocumentFile { fn new(queue: Queue, rx: Receiver, docdef: DocDef) -> Self { Self { docdef: docdef.clone(), docs: HashMap::new(), indexes: docdef.create_indexes(), queue: queue, rx: rx, } } fn start(mut queue: Queue, msg: Message) { let (tx, rx) = channel(); let name = match msg.get_document_id() { NameID::Name(name) => name.clone(), NameID::ID(id) => id.to_string(), NameID::None => unreachable!("should never be none"), }; let routes = [ RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Addition), ), RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Delete), ), RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Query), ), RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Show), ), RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Update), ), ] .to_vec(); match queue.register(tx, name, routes) { Ok(_) => {} Err(err) => { let error = msg.response(err); queue.send(error).unwrap(); return; } } let action = msg.get_action(); let docdef = match action { MsgAction::Create(data) => data.clone(), _ => unreachable!("got {:?}: should have been a create message", action), }; let mut doc = DocumentFile::new(queue.clone(), rx, docdef); spawn(move || { doc.listen(); }); let reply = msg.response(Reply::new()); queue.send(reply).unwrap(); } fn listen(&mut self) { loop { let msg = self.rx.recv().unwrap(); let result = match msg.get_action() { MsgAction::Addition(data) => self.add_document(data), MsgAction::Delete(delete) => self.delete(delete), MsgAction::Query(query) => self.query(query), MsgAction::Update(update) => self.update(update), _ => Reply::new().into(), }; self.queue.send(msg.response(result)).unwrap(); } } fn get_docdef(&self) -> &DocDef { &self.docdef } fn get_documents<'a>(&self) -> impl Iterator { self.docs.iter() } fn validate(&self, field_name: &str, value: Option) -> Result { let output = match self.docdef.validate(field_name, value) { Ok(data) => data, Err(err) => return Err(err), }; match self.indexes.validate(field_name, &output) { Ok(_) => {} Err(err) => return Err(err), } Ok(output) } fn add_field_to_error(key: String, err: MTTError) -> MTTError { match err { MTTError::DocumentFieldMissing(_) => MTTError::DocumentFieldMissing(key), MTTError::FieldDuplicate(_, field) => MTTError::FieldDuplicate(key, field.clone()), _ => err.clone(), } } fn add_to_index(&mut self, field_name: &str, field: Field, oid: Oid) { self.indexes.add_to_index(field_name, field, oid) } fn remove_from_index(&mut self, field_name: &str, field: &Field, oid: &Oid) { self.indexes.remove_from_index(field_name, field, oid); } fn add_document(&mut self, addition: &Addition) -> MsgAction { let mut holder = Document::new(); let doc = addition.get_document(); for (key, value) in doc.iter() { match self.validate(&key, Some(value.clone())) { Ok(data) => { holder.add_field(key.clone(), value.clone()); } Err(err) => return Self::add_field_to_error(key.to_string(), err).into(), } } for (key, value) in self.docdef.iter() { match holder.get_field(key) { Some(_) => {} None => match self.validate(key, None) { Ok(data) => holder.add_field(key.clone(), data.clone()), Err(err) => return Self::add_field_to_error(key.to_string(), err).into(), }, } } let mut oid = Oid::new(); while self.docs.contains_key(&oid) { oid = Oid::new(); } self.docs.insert(oid.clone(), holder.clone()); for (key, value) in holder.iter() { self.add_to_index(&key, value.clone(), oid.clone()); } let mut reply = Reply::new(); reply.add(holder); reply.into() } fn delete(&mut self, delete: &Delete) -> MsgAction { let mut reply = Reply::new(); let oids = self.run_query(delete.get_query()).unwrap(); for oid in oids.iter() { reply.add(self.docs.get(oid).unwrap().clone()); self.docs.remove(oid); } reply.into() } fn run_query(&self, query: &Query) -> Result, MTTError> { let query_ids = query.field_ids(); let doc_ids = self.docdef.field_ids(); let index_ids = self.indexes.index_ids(); if !doc_ids.is_superset(&query_ids) { let missed = query_ids.difference(&doc_ids).last().unwrap(); return Err(MTTError::DocumentFieldNotFound(missed.to_string())); } let used_indexed = index_ids .intersection(&query_ids) .cloned() .collect::>(); let used_unindexed = query_ids .difference(&index_ids) .cloned() .collect::>(); let mut oids = HashSet::new(); if used_indexed.is_empty() { 'docs: for (oid, doc) in self.docs.iter() { for query_id in query_ids.iter() { let doc_data = doc.get_field(query_id).unwrap(); let mut operation = query.get(query_id).unwrap(); match operation.add_value(doc_data.clone()) { Ok(_) => {} Err(err) => match err { MTTError::DocumentFieldWrongDataType(got, expected) => { return Err(MTTError::DocumentFieldWrongDataType(expected, got)) } _ => return Err(err), }, } if operation.calculate() == false.into() { continue 'docs; } } oids.insert(oid.clone()); } } else { let mut first_time = true; for field_id in used_indexed.iter() { let op = query.get(field_id).unwrap(); let holder = self.indexes.pull(field_id, &op); if first_time { oids = holder; } else { oids = oids.intersection(&holder).cloned().collect(); } first_time = false; } for field_id in used_unindexed.iter() { let mut holder: HashSet = HashSet::new(); for oid in oids.iter() { let doc = self.docs.get(oid).unwrap(); let mut op = query.get(field_id).unwrap().clone(); op.add_value(doc.get_field(field_id).unwrap()); if op.calculate() == true.into() { holder.insert(oid.clone()); } } oids = oids.intersection(&holder).cloned().collect(); } } Ok(oids) } fn query(&self, query: &Query) -> MsgAction { match self.run_query(query) { Ok(result) => { let mut reply = Reply::new(); for oid in result.iter() { reply.add(self.docs.get(oid).unwrap().clone()); } reply.into() } Err(err) => err.into(), } } fn update(&mut self, update: &Update) -> MsgAction { let oids = match self.run_query(update.get_query()) { Ok(result) => result, Err(err) => return err.into(), }; let mut holder: HashMap = HashMap::new(); for oid in oids.iter() { let doc = self.docs.get(oid).unwrap(); let old_new = [doc.clone(), doc.clone()]; holder.insert(oid.clone(), old_new); } let mut index_holder = self.docdef.create_indexes(); for (oid, docs) in holder.iter_mut() { let mut updated = Document::new(); for (key, value) in update.get_values().iter() { match self.validate(&key, Some(value.clone())) { Ok(field) => match index_holder.validate(&key, &field) { Ok(_) => { index_holder.add_to_index(&key, field.clone(), oid.clone()); docs[1].add_field(key.clone(), field.clone()); } Err(err) => return Self::add_field_to_error(key.to_string(), err).into(), }, Err(err) => return err.into(), } } } let mut reply = Reply::new(); for (oid, docs) in holder.iter() { self.docs.insert(oid.clone(), docs[1].clone()); reply.add(docs[1].clone()); for (key, value) in docs[0].iter() { self.remove_from_index(&key, &value, oid); self.add_to_index(&key, docs[1].get_field(&key).unwrap().clone(), oid.clone()); } } reply.into() } } #[cfg(test)] mod document_files { use super::{support_test::TIMEOUT, *}; use std::sync::mpsc::RecvTimeoutError; struct TestDocument { docdef: DocDef, doc_name: String, queue: Queue, routes: Vec, tx: Sender, rx: Receiver, } impl TestDocument { fn new(field_types: Vec) -> Self { let mut docdef = DocDef::new(); let mut count = 0; for field_type in field_types.iter() { docdef.add_field(format!("field{}", count), field_type.clone()); count += 1; } let (tx, rx) = channel(); Self { docdef: docdef, doc_name: Uuid::new_v4().to_string(), queue: Queue::new(), routes: [ RouteRequest::new(Include::All, Include::All, Include::Some(Action::Reply)), RouteRequest::new(Include::All, Include::All, Include::Some(Action::Error)), ] .to_vec(), tx: tx, rx: rx, } } fn get_docdef_mut(&mut self) -> &mut DocDef { &mut self.docdef } fn get_routes_mut(&mut self) -> &mut Vec { &mut self.routes } fn get_queue(&mut self) -> Queue { self.queue.clone() } fn get_receiver(&self) -> &Receiver { &self.rx } fn get_sender(&self) -> Sender { self.tx.clone() } fn send(&self, action: A) -> Result<(), MTTError> where A: Into, { let msg = Message::new(self.doc_name.clone(), action); self.queue.send(msg) } fn start(&mut self) { let msg = Message::new(self.doc_name.clone(), self.docdef.clone()); DocumentFile::start(self.queue.clone(), msg); self.queue .register( self.tx.clone(), Uuid::new_v4().to_string(), self.routes.clone(), ) .unwrap(); } fn populate(&self, data: Vec) { let mut add = Addition::new(); let mut count = 0; for item in data.iter() { add.add_field(format!("field{}", count), item.clone()); count += 1; } self.send(add).unwrap(); self.rx.recv().unwrap(); // eat addition response. } } fn standard_routes() -> Vec { [ RouteRequest::new(Include::All, Include::All, Include::Some(Action::Reply)), RouteRequest::new(Include::All, Include::All, Include::Some(Action::Error)), ] .to_vec() } fn create_docdef(field_types: Vec) -> (DocDef, String) { let mut output = DocDef::new(); let mut count = 0; for field_type in field_types.iter() { output.add_field(format!("field{}", count), field_type.clone()); count += 1; } (output, format!("name-{}", Uuid::new_v4())) } fn test_doc( name: &str, docdef: DocDef, routes: Vec, ) -> (Queue, Receiver) { let (tx, rx) = channel(); let mut queue = Queue::new(); let msg = Message::new(name, docdef); DocumentFile::start(queue.clone(), msg); queue .register(tx, Uuid::new_v4().to_string(), routes) .unwrap(); (queue, rx) } #[test] fn does_not_respond_to_create() { let docdef = DocDef::new(); let name = "quiet"; let (mut queue, rx) = test_doc(name, docdef, standard_routes()); let other = "alternate"; let (tx, _) = channel(); queue.register(tx, other.to_string(), Vec::new()).unwrap(); let msg = Message::new(name, DocDef::new()); queue.send(msg).unwrap(); match rx.recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } #[test] fn can_show_document_details() { let docdef = DocDef::new(); let name = "first"; let (queue, rx) = test_doc(name, docdef, standard_routes()); let msg = Message::new(name, MsgAction::Show); queue.send(msg.clone()).unwrap(); let show = rx.recv_timeout(TIMEOUT).unwrap(); } #[test] fn can_query_new_document() { let docdef = DocDef::new(); let name = "second"; let (queue, rx) = test_doc(name, docdef, standard_routes()); let query = Message::new(name, Query::new()); queue.send(query).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Reply(data) => assert_eq!(data.len(), 0), _ => unreachable!( "got {:?}: should have received a reply", result.get_action() ), } } #[test] fn only_responses_to_its_show_request() { let docdef = DocDef::new(); let name = "quiet"; let (mut queue, rx) = test_doc(name, docdef, standard_routes()); let other = "alternate"; let (tx, _) = channel(); queue.register(tx, other.to_string(), Vec::new()).unwrap(); let msg = Message::new(other, MsgAction::Show); queue.send(msg).unwrap(); match rx.recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } #[test] fn only_responses_to_its_query_request() { let docdef = DocDef::new(); let name = "quiet"; let (mut queue, rx) = test_doc(name, docdef, standard_routes()); let other = "alternate"; let (tx, _) = channel(); queue.register(tx, other.to_string(), Vec::new()).unwrap(); let msg = Message::new(other, Query::new()); queue.send(msg).unwrap(); match rx.recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } #[test] fn can_document_be_added() { let mut docdef = DocDef::new(); let name = "field"; let doc_name = "document"; let data = Uuid::new_v4(); docdef.add_field(name.to_string(), FieldType::Uuid); let (queue, rx) = test_doc(doc_name, docdef, standard_routes()); let mut new_doc = Addition::new(); new_doc.add_field(name.to_string(), data.clone()); let msg = Message::new(doc_name, new_doc); queue.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); match result.get_action() { MsgAction::Reply(output) => { assert_eq!(output.len(), 1); let holder = output.iter().next().unwrap(); match holder.get_field(name) { Some(field) => match field { Field::Uuid(store) => assert_eq!(store, data), _ => unreachable!( "got {:?}: should have received uuid", holder.get_field(name).unwrap() ), }, None => unreachable!("{:?} did not contain field '{}'", holder, name), } } _ => unreachable!("got {:?}: should have been a reply", result), } let msg = Message::new(doc_name, Query::new()); queue.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); match result.get_action() { MsgAction::Reply(output) => { assert_eq!(output.len(), 1); let holder = output.iter().next().unwrap(); match holder.get_field(name) { Some(field) => match field { Field::Uuid(store) => assert_eq!(store, data), _ => unreachable!( "got {:?}: should have received uuid", holder.get_field(name).unwrap() ), }, None => unreachable!("{:?} did not contain field '{}'", holder, name), } } _ => unreachable!("got {:?}: should have been a reply", result), } } #[test] fn only_responses_to_its_additions() { let docdef = DocDef::new(); let name = "quiet"; let (mut queue, rx) = test_doc(name, docdef, standard_routes()); let other = "alternate"; let (tx, _) = channel(); queue.register(tx, other.to_string(), Vec::new()).unwrap(); let msg = Message::new(other, Addition::new()); queue.send(msg).unwrap(); match rx.recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } #[test] fn can_add_multiple_documents() { let count = 4; let mut docdef = DocDef::new(); let name = "field"; let doc_name = "document"; let data = Uuid::new_v4(); docdef.add_field(name.to_string(), FieldType::Uuid); let (queue, rx) = test_doc(doc_name, docdef, standard_routes()); let mut new_doc = Addition::new(); new_doc.add_field(name.to_string(), data.clone()); for _ in 0..count { let msg = Message::new(doc_name, new_doc.clone()); queue.send(msg.clone()).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); // eats the confirmation reply. } let msg = Message::new(doc_name, Query::new()); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Reply(data) => assert_eq!(data.len(), count), _ => unreachable!("got {:?}: should have been a reply", result.get_action()), } } #[test] fn errors_on_wrong_field_name() { let (docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec()); let field_name = Uuid::new_v4().to_string(); let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes()); let mut addition = Addition::new(); addition.add_field(field_name.clone(), Uuid::new_v4()); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Error(err) => match err { MTTError::DocumentFieldNotFound(data) => assert_eq!(data, &field_name), _ => unreachable!("got {:?}: should have been document field not found.", err), }, _ => unreachable!("got {:?}: should have been an error", result.get_action()), } } #[test] fn errors_on_wrong_field_type() { let (docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec()); let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes()); let mut addition = Addition::new(); addition.add_field("field0".to_string(), "astring"); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Error(err) => match err { MTTError::DocumentFieldWrongDataType(expected, got) => { assert_eq!(got, &FieldType::StaticString); assert_eq!(expected, &FieldType::Uuid); } _ => unreachable!( "got {:?}: should have been document field data mismatch.", err ), }, _ => unreachable!("got {:?}: should have been an error", result.get_action()), } } #[test] fn errors_on_missing_fields() { let (docdef, doc_name) = create_docdef([FieldType::Uuid, FieldType::Uuid].to_vec()); let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes()); let mut addition = Addition::new(); addition.add_field("field0".to_string(), Uuid::nil()); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Error(err) => match err { MTTError::DocumentFieldMissing(field) => assert_eq!(field, "field1"), _ => unreachable!("got {:?}: should have been document field missing", err), }, _ => unreachable!("got {:?}: should have been an error", result.get_action()), } } #[test] fn does_query_return_related_entries() { let mut doc = TestDocument::new([FieldType::Uuid].to_vec()); doc.start(); let count = 3; let mut values: HashSet = HashSet::new(); while values.len() < count { values.insert(Uuid::new_v4().into()); } for value in values.iter() { doc.populate([value.clone()].to_vec()); } let expected = values.iter().last().unwrap().clone(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(expected.clone()); let mut query = Query::new(); query.add("field0".to_string(), calc); doc.send(query).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(data) => { assert_eq!( data.len(), 1, "should return one entry containing {:?} got:\n{:?}", expected, action ); for doc in data.iter() { assert_eq!(doc.get_field("field0").unwrap(), expected); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn gets_all_documents_in_query() { let mut doc = TestDocument::new([FieldType::Integer].to_vec()); doc.start(); let values = [ [1.into()].to_vec(), [2.into()].to_vec(), [1.into()].to_vec(), [3.into()].to_vec(), [1.into()].to_vec(), [4.into()].to_vec(), [1.into()].to_vec(), [5.into()].to_vec(), ]; for value in values.iter() { doc.populate(value.clone()); } let mut calc = Calculation::new(Operand::Equal); calc.add_value(1); let mut query = Query::new(); query.add("field0".to_string(), calc); doc.send(query).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); let input: Field = 1.into(); match action { MsgAction::Reply(data) => { assert_eq!(data.len(), 4, "should return 4 entries"); for doc in data.iter() { assert_eq!(doc.get_field("field0").unwrap(), input); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn query_should_work_with_multiple_fields() { let mut doc = TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); doc.start(); let values = [ ["a".into(), "a".into()].to_vec(), ["a".into(), "b".into()].to_vec(), ["b".into(), "a".into()].to_vec(), ["b".into(), "b".into()].to_vec(), ]; for value in values.iter() { doc.populate(value.clone()); } let mut query = Query::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value("a"); query.add("field0".to_string(), calc); let mut calc = Calculation::new(Operand::Equal); calc.add_value("b"); query.add("field1".to_string(), calc); doc.send(query).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(data) => { let afield: Field = "a".into(); let bfield: Field = "b".into(); assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); for doc in data.iter() { assert_eq!(doc.get_field("field0").unwrap(), afield); assert_eq!(doc.get_field("field1").unwrap(), bfield); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn query_should_work_with_multiple_inexed_fields() { let mut doc = TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); let docdef = doc.get_docdef_mut(); docdef.add_index("field0".to_string(), IndexType::Index); docdef.add_index("field1".to_string(), IndexType::Index); doc.start(); let values = [ ["a".into(), "a".into()].to_vec(), ["a".into(), "b".into()].to_vec(), ["b".into(), "a".into()].to_vec(), ["b".into(), "b".into()].to_vec(), ]; for value in values.iter() { doc.populate(value.clone()); } let mut query = Query::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value("a"); query.add("field0".to_string(), calc); let mut calc = Calculation::new(Operand::Equal); calc.add_value("b"); query.add("field1".to_string(), calc); doc.send(query).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(data) => { let afield: Field = "a".into(); let bfield: Field = "b".into(); assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); for doc in data.iter() { assert_eq!(doc.get_field("field0").unwrap(), afield); assert_eq!(doc.get_field("field1").unwrap(), bfield); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn query_should_work_with_mixed_inexed_fields() { let mut doc = TestDocument::new([FieldType::StaticString, FieldType::StaticString].to_vec()); let docdef = doc.get_docdef_mut(); docdef.add_index("field0".to_string(), IndexType::Index); doc.start(); let values = [ ["a".into(), "a".into()].to_vec(), ["a".into(), "b".into()].to_vec(), ["b".into(), "a".into()].to_vec(), ["b".into(), "b".into()].to_vec(), ]; for value in values.iter() { doc.populate(value.clone()); } let mut query = Query::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value("a"); query.add("field0".to_string(), calc); let mut calc = Calculation::new(Operand::Equal); calc.add_value("b"); query.add("field1".to_string(), calc); doc.send(query).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(data) => { let afield: Field = "a".into(); let bfield: Field = "b".into(); assert_eq!(data.len(), 1, "should return one entry:\n{:?}", action); for doc in data.iter() { assert_eq!(doc.get_field("field0").unwrap(), afield); assert_eq!(doc.get_field("field1").unwrap(), bfield); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn errors_on_bad_field_name() { let (docdef, doc_name) = create_docdef(Vec::new()); let field_name = "wrong"; let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes()); let mut query = Query::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value("something"); query.add(field_name.to_string(), calc); let msg = Message::new(doc_name, query); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(data) => match data { MTTError::DocumentFieldNotFound(output) => assert_eq!(output, field_name), _ => unreachable!("got {:?}: should been field not found", data), }, _ => unreachable!("got {:?}: should have been a error", action), } } #[test] fn errors_on_bad_field_type() { let mut doc = TestDocument::new([FieldType::Uuid].to_vec()); doc.start(); doc.populate([Uuid::nil().into()].to_vec()); let mut calc = Calculation::new(Operand::Equal); calc.add_value("notUUID"); let mut query = Query::new(); query.add("field0".to_string(), calc); doc.send(query).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(data) => match data { MTTError::DocumentFieldWrongDataType(expected, got) => { assert_eq!(expected, &FieldType::Uuid); assert_eq!(got, &FieldType::StaticString); } _ => unreachable!("got {:?}: should been field not found", data), }, _ => unreachable!("got {:?}: should have been a error", action), } } #[test] fn can_use_default_values() { let (mut docdef, doc_name) = create_docdef([FieldType::StaticString].to_vec()); let mut calc = Calculation::new(Operand::Assign); calc.add_value(FieldType::StaticString); docdef.set_default("field0", calc); let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes()); let new_doc = Addition::new(); let msg = Message::new(doc_name, new_doc); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(docs) => { assert_eq!(docs.len(), 1); for doc in docs.iter() { let expected: Field = "".into(); assert_eq!(doc.get_field("field0").unwrap(), expected); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn can_a_default_value_be_set() { let (mut docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec()); let input = Uuid::nil(); let mut calc = Calculation::new(Operand::Assign); calc.add_value(input.clone()); docdef.set_default("field0", calc); let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes()); let new_doc = Addition::new(); let msg = Message::new(doc_name, new_doc); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(docs) => { assert_eq!(docs.len(), 1); let expected: Field = input.into(); for doc in docs.iter() { assert_eq!(doc.get_field("field0").unwrap(), expected); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn can_default_values_be_overridden() { let (mut docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec()); let mut calc = Calculation::new(Operand::Assign); calc.add_value(FieldType::Uuid); docdef.set_default("field0", calc); let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes()); let mut new_doc = Addition::new(); new_doc.add_field("field0".to_string(), Uuid::nil()); let msg = Message::new(doc_name, new_doc); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(docs) => { assert_eq!(docs.len(), 1); let expected: Field = Uuid::nil().into(); for doc in docs.iter() { assert_eq!(doc.get_field("field0").unwrap(), expected); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn empty_update_query_results_in_zero_changes() { let count = 5; let mut ids: HashSet = HashSet::new(); while ids.len() < count { ids.insert(Uuid::new_v4()); } let id = ids.iter().last().unwrap().clone(); ids.remove(&id); let mut doc = TestDocument::new([FieldType::Uuid].to_vec()); doc.start(); for id in ids.iter() { doc.populate([id.clone().into()].to_vec()); } let mut update = Update::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(id); update.get_query_mut().add("field0".to_string(), calc); update .get_values_mut() .add_field("field0".to_string(), Uuid::nil()); doc.send(update).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(docs) => assert_eq!(docs.len(), 0), _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn only_responses_to_its_update_request() { let mut doc = TestDocument::new([FieldType::Integer].to_vec()); doc.start(); let alt_doc_name = "alternate"; let (tx, _) = channel(); let mut queue = doc.get_queue(); queue .register(tx, alt_doc_name.to_string(), Vec::new()) .unwrap(); let update = Update::new(); let msg = Message::new(alt_doc_name, update); queue.send(msg).unwrap(); match doc.get_receiver().recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } #[test] fn changes_information_requested() { let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); doc.start(); let id = Uuid::new_v4(); let old = "old"; let new = "new"; doc.populate([id.into(), old.into()].to_vec()); let mut update = Update::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(id.clone()); update .get_query_mut() .add("field0".to_string(), calc.clone()); update.get_values_mut().add_field("field1".to_string(), new); doc.send(update).unwrap(); let mut results: HashMap = HashMap::new(); results.insert( "update".to_string(), doc.get_receiver().recv_timeout(TIMEOUT).unwrap(), ); let mut query = Query::new(); query.add("field0".to_string(), calc.clone()); doc.send(query).unwrap(); results.insert( "query".to_string(), doc.get_receiver().recv_timeout(TIMEOUT).unwrap(), ); let expected_id: Field = id.into(); let output: Field = new.into(); for (key, result) in results.iter() { let action = result.get_action(); match action { MsgAction::Reply(docs) => { assert_eq!(docs.len(), 1, "{}", key); for doc in docs.iter() { assert_eq!(doc.get_field("field0").unwrap(), expected_id, "{}", key); assert_eq!(doc.get_field("field1").unwrap(), output, "{}", key); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } } } #[test] fn changes_only_the_queried() { let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); doc.start(); let mut ids: HashSet = HashSet::new(); while ids.len() < 2 { ids.insert(Uuid::new_v4()); } let expected = ids.iter().last().unwrap(); let old = "old"; let new = "new"; let mut values: Vec> = Vec::new(); for id in ids.iter() { let mut holder: Vec = Vec::new(); holder.push(id.clone().into()); holder.push(old.into()); values.push(holder); } for value in values { doc.populate(value); } let mut update = Update::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(expected.clone()); update.get_query_mut().add("field0".to_string(), calc); update.get_values_mut().add_field("field1".to_string(), new); doc.send(update).unwrap(); doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(new); let mut query = Query::new(); query.add("field1".to_string(), calc); doc.send(query).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(docs) => { assert_eq!(docs.len(), 1); let expected_id: Field = expected.clone().into(); let output: Field = new.into(); for doc in docs.iter() { assert_eq!(doc.get_field("field0").unwrap(), expected_id); assert_eq!(doc.get_field("field1").unwrap(), output); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn can_handle_multiple_updates() { let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); doc.start(); let count = 3; let id = Uuid::new_v4(); let old = "old"; let new = "new"; for _ in 0..count { doc.populate([id.into(), old.into()].to_vec()); } let mut calc = Calculation::new(Operand::Equal); calc.add_value(id.clone()); let mut update = Update::new(); update .get_query_mut() .add("field0".to_string(), calc.clone()); update.get_values_mut().add_field("field1".to_string(), new); doc.send(update).unwrap(); let mut results: HashMap = HashMap::new(); results.insert( "update".to_string(), doc.get_receiver().recv_timeout(TIMEOUT).unwrap(), ); let mut query = Query::new(); query.add("field0".to_string(), calc.clone()); doc.send(query).unwrap(); results.insert( "query".to_string(), doc.get_receiver().recv_timeout(TIMEOUT).unwrap(), ); let expected_id: Field = id.into(); let output: Field = new.into(); for (key, result) in results.iter() { let action = result.get_action(); match action { MsgAction::Reply(docs) => { assert_eq!(docs.len(), count, "{}", key); for doc in docs.iter() { assert_eq!(doc.get_field("field0").unwrap(), expected_id, "{}", key); assert_eq!(doc.get_field("field1").unwrap(), output, "{}", key); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } } } #[test] fn update_errors_on_bad_field_name() { let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); doc.start(); let id = Uuid::new_v4(); let old = "old"; let new = "new"; doc.populate([id.into(), old.into()].to_vec()); let mut update = Update::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(id.clone()); update.get_query_mut().add("field0".to_string(), calc); update.get_values_mut().add_field("wrong".to_string(), new); doc.send(update).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::DocumentFieldNotFound(data) => assert_eq!(data, "wrong"), _ => unreachable!("got {:?}: should have gotten an missing field", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } } #[test] fn update_errors_on_bad_field_type() { let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); doc.start(); let id = Uuid::new_v4(); let old = "old"; let new = Uuid::nil(); doc.populate([id.into(), old.into()].to_vec()); let mut update = Update::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(id.clone()); update.get_query_mut().add("field0".to_string(), calc); update.get_values_mut().add_field("field1".to_string(), new); doc.send(update).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::DocumentFieldWrongDataType(expected, got) => { assert_eq!(expected, &FieldType::StaticString); assert_eq!(got, &FieldType::Uuid); } _ => unreachable!("got {:?}: should have gotten incorrect file type", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } } #[test] fn does_update_maintain_unique_fields() { let (mut docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec()); docdef.add_index("field0".to_string(), IndexType::Unique); let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes()); let field0 = Uuid::new_v4(); let mut addition = Addition::new(); addition.add_field("field0".to_string(), field0.clone()); let msg = Message::new(doc_name.clone(), addition.clone()); queue.send(msg).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); let msg2 = Message::new(doc_name.clone(), addition); queue.send(msg2).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::FieldDuplicate(key, field) => { let expected: Field = field0.into(); assert_eq!(key, "field0"); assert_eq!(field, &expected); } _ => unreachable!("got {:?}: should have gotten an missing field", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } } #[test] fn unique_value_remains_available_if_failure_occurs() { let (mut docdef, doc_name) = create_docdef([FieldType::Uuid, FieldType::Uuid].to_vec()); docdef.add_index("field0".to_string(), IndexType::Unique); let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes()); let field0 = Uuid::new_v4(); let mut bad_addition = Addition::new(); bad_addition.add_field("field0".to_string(), field0.clone()); bad_addition.add_field("field1".to_string(), ""); let msg = Message::new(doc_name.clone(), bad_addition.clone()); queue.send(msg).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); let mut good_addition = Addition::new(); good_addition.add_field("field0".to_string(), field0.clone()); good_addition.add_field("field1".to_string(), field0.clone()); let msg = Message::new(doc_name.clone(), good_addition.clone()); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(_) => {} _ => unreachable!("got {:?}: should have gotten a reply", action), } } #[test] fn updating_unique_updates_index_entries() { let mut doc = TestDocument::new([FieldType::StaticString].to_vec()); doc.get_docdef_mut() .add_index("field0".to_string(), IndexType::Unique); doc.start(); let old = "old"; let new = "new"; let fold: Field = old.into(); let fnew: Field = new.into(); doc.populate([old.into()].to_vec()); let mut update = Update::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(old); update.get_query_mut().add("field0".to_string(), calc); update.get_values_mut().add_field("field0".to_string(), new); doc.send(update).unwrap(); doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let mut old_addition = Addition::new(); old_addition.add_field("field0".to_string(), old); doc.send(old_addition).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(data) => { assert_eq!(data.len(), 1); for doc in data.iter() { assert_eq!(doc.get_field("field0").unwrap(), fold); } } _ => unreachable!("got {:?}: should have gotten a reply", action), } let mut new_addition = Addition::new(); new_addition.add_field("field0".to_string(), new); doc.send(new_addition).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::FieldDuplicate(key, field) => { let expected: Field = new.into(); assert_eq!(key, "field0"); assert_eq!(field, &expected); } _ => unreachable!("got {:?}: should have gotten an missing field", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } } #[test] fn unique_available_after_bad_change() { let mut doc = TestDocument::new([FieldType::Uuid, FieldType::StaticString].to_vec()); doc.get_docdef_mut() .add_index("field0".to_string(), IndexType::Unique); doc.start(); let count = 5; let data = "data"; let mut ids: HashSet = HashSet::new(); while ids.len() < count { ids.insert(Uuid::new_v4()); } let holder = ids.iter().last().unwrap().clone(); let fholder: Field = holder.into(); ids.remove(&holder); for id in ids.iter() { doc.populate([id.clone().into(), data.into()].to_vec()); } let mut update = Update::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(data); update.get_query_mut().add("field1".to_string(), calc); update .get_values_mut() .add_field("field0".to_string(), holder.clone()); doc.send(update).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::FieldDuplicate(key, field) => { assert_eq!(key, "field0"); assert_eq!(field, &fholder); } _ => unreachable!("got {:?}: should have gotten an missing field", err), }, _ => unreachable!("got {:?}: should have gotten an error", action), } let query = Query::new(); doc.send(query).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(data) => { assert_eq!(data.len(), ids.len()); for doc in data.iter() { match doc.get_field("field0").unwrap() { Field::Uuid(id) => assert!(ids.contains(&id)), _ => unreachable!("did not get uuid"), } } } _ => unreachable!("got {:?}: should have gotten reply", action), } } #[test] fn can_calculate_field_values() { let mut doc = TestDocument::new([FieldType::DateTime].to_vec()); doc.start(); let duration = Duration::from_secs(300); let mut calc = Calculation::new(Operand::Add); calc.add_value(FieldType::DateTime).unwrap(); calc.add_value(duration.clone()).unwrap(); let mut addition = Addition::new(); addition.add_field("field0".to_string(), calc); let start = Utc::now() + duration; doc.send(addition).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let stop = Utc::now() + duration; let action = result.get_action(); match action { MsgAction::Reply(data) => { assert_eq!(data.len(), 1); for doc in data.iter() { match doc.get_field("field0").unwrap() { Field::DateTime(datetime) => assert!(datetime > start && datetime < stop), _ => unreachable!("did not get uuid"), } } } _ => unreachable!("got {:?}: should have gotten reply", action), } } #[test] fn can_delete() { let mut doc = TestDocument::new([FieldType::Integer].to_vec()); doc.start(); doc.populate([1.into()].to_vec()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(1); let mut query = Query::new(); query.add("field0".to_string(), calc); let mut delete = Delete::new(); *delete.get_query_mut() = query.clone(); doc.send(delete).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(data) => { assert_eq!(data.len(), 1); for doc in data.iter() { match doc.get_field("field0").unwrap() { Field::Integer(num) => assert_eq!(num, 1), _ => unreachable!("did not get uuid"), } } } _ => unreachable!("got {:?}: should have gotten reply", action), } doc.send(query).unwrap(); let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Reply(data) => assert_eq!(data.len(), 0), _ => unreachable!("did not get uuid"), } } #[test] #[ignore] fn delete_should_only_respond_to_its_own() { let mut doc = TestDocument::new([FieldType::Integer].to_vec()); doc.start(); doc.populate([1.into()].to_vec()); let mut calc = Calculation::new(Operand::Equal); calc.add_value(1); let mut query = Query::new(); query.add("field0".to_string(), calc); let mut delete = Delete::new(); *delete.get_query_mut() = query.clone(); doc.send(delete).unwrap(); let name = "other"; let msg = Message::new(name.to_string(), MsgAction::Show); let (tx, _) = channel(); let mut queue = doc.get_queue(); queue.register(tx, name.to_string(), Vec::new()).unwrap(); queue.send(msg).unwrap(); match doc.get_receiver().recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } } #[cfg(test)] mod createdocs { use super::support_test::TIMEOUT; use super::*; fn setup_create_doc(routes: Vec) -> (Queue, Receiver) { let mut queue = Queue::new(); let (tx, rx) = channel(); queue .register(tx, Uuid::new_v4().to_string(), routes) .unwrap(); CreateDoc::start(queue.clone()); (queue, rx) } #[test] fn create_document_creation() { let name = "project"; let routes = [RouteRequest::new( Include::All, Include::All, Include::Some(Action::Reply), )] .to_vec(); let (queue, rx) = setup_create_doc(routes); let msg1 = Message::new(name, MsgAction::Create(DocDef::new())); queue.send(msg1.clone()).unwrap(); let result1 = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result1.get_message_id(), msg1.get_message_id()); assert_eq!(result1.get_document_id(), msg1.get_document_id()); match result1.get_action() { MsgAction::Reply(_) => {} _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), } let msg2 = Message::new(name, MsgAction::Query(Query::new())); queue.send(msg2.clone()).unwrap(); let result2 = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result2.get_message_id(), msg2.get_message_id()); assert_eq!(result2.get_document_id(), msg2.get_document_id()); match result2.get_action() { MsgAction::Reply(_) => {} _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), } } #[test] fn does_duplicates_generate_error() { let name = "duplicate"; let routes = [ RouteRequest::new(Include::All, Include::All, Include::Some(Action::Reply)), RouteRequest::new(Include::All, Include::All, Include::Some(Action::Error)), ] .to_vec(); let (queue, rx) = setup_create_doc(routes); let msg = Message::new(name, MsgAction::Create(DocDef::new())); queue.send(msg.clone()).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); queue.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); assert_eq!(result.get_document_id(), msg.get_document_id()); match result.get_action() { MsgAction::Error(err) => match err { MTTError::DocumentAlreadyExists(data) => assert_eq!(data, name), _ => unreachable!("got {:?}: should have been a reply.", err), }, _ => unreachable!("got {:?}: should have been a reply.", result.get_action()), } } }