use std::{ collections::HashMap, sync::{ mpsc::{channel, Receiver, Sender}, Arc, RwLock, }, thread::spawn, }; use uuid::Uuid; #[cfg(test)] mod support_test { use std::time::Duration; pub static TIMEOUT: Duration = Duration::from_millis(500); } #[derive(Clone, Debug)] enum MTTError { DocumentAlreadyExists(String), DocumentFieldMissing(String), DocumentFieldNotFound(String), DocumentFieldWrongDataType(FieldType, FieldType), DocumentNotFound(String), } #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum Action { Addition, Create, Error, Query, Reply, Show, } impl From for Action { fn from(value: MsgAction) -> Self { match value { MsgAction::Addition(_) => Action::Addition, MsgAction::Create(_) => Action::Create, MsgAction::Error(_) => Action::Error, MsgAction::Query(_) => Action::Query, MsgAction::Reply(_) => Action::Reply, MsgAction::Show => Action::Show, } } } impl From<&MsgAction> for Action { fn from(value: &MsgAction) -> Self { let action = value.clone(); Self::from(action) } } #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum NameID { ID(Uuid), Name(String), } impl From<&str> for NameID { fn from(value: &str) -> Self { Self::Name(value.to_string()) } } impl From for NameID { fn from(value: String) -> Self { Self::Name(value) } } impl From for NameID { fn from(value: Uuid) -> Self { Self::ID(value) } } impl From<&NameID> for NameID { fn from(value: &NameID) -> Self { value.clone() } } #[derive(Clone, Debug)] enum MsgAction { Addition(Addition), Create(DocDef), // Alter // Remove Error(MTTError), Query(Query), Reply(Reply), Show, // Delete } impl From for MsgAction { fn from(value: Addition) -> Self { MsgAction::Addition(value) } } impl From for MsgAction { fn from(value: DocDef) -> Self { MsgAction::Create(value) } } impl From for MsgAction { fn from(value: MTTError) -> Self { MsgAction::Error(value) } } impl From for MsgAction { fn from(value: Query) -> Self { MsgAction::Query(value) } } impl From for MsgAction { fn from(value: Reply) -> Self { MsgAction::Reply(value) } } #[cfg(test)] mod msgactions { use super::*; #[test] fn turn_document_definition_into_action() { let value = DocDef::new(); let result: MsgAction = value.into(); match result { MsgAction::Create(_) => {} _ => unreachable!("Got {:?}: dhould have been create", result), } } #[test] fn turn_error_into_action() { let data = "data".to_string(); let value = MTTError::DocumentAlreadyExists(data.clone()); let result: MsgAction = value.into(); match result { MsgAction::Error(result) => match result { MTTError::DocumentAlreadyExists(output) => assert_eq!(output, data), _ => unreachable!("Got {:?}: dhould have been create", result), }, _ => unreachable!("Got {:?}: dhould have been create", result), } let value = MTTError::DocumentNotFound(data.clone()); let result: MsgAction = value.into(); match result { MsgAction::Error(result) => match result { MTTError::DocumentNotFound(output) => assert_eq!(output, data), _ => unreachable!("Got {:?}: dhould have been create", result), }, _ => unreachable!("Got {:?}: dhould have been create", result), } } #[test] fn turn_query_into_action() { let value = Query::new(); let result: MsgAction = value.into(); match result { MsgAction::Query(_) => {} _ => unreachable!("Got {:?}: dhould have been query", result), } } #[test] fn turn_reply_into_action() { let value = Reply::new(); let result: MsgAction = value.into(); match result { MsgAction::Reply(_) => {} _ => unreachable!("Got {:?}: dhould have been reply", result), } } } #[derive(Clone, Debug)] struct Message { msg_id: Uuid, document_id: NameID, action: MsgAction, } impl Message { fn new(doc_id: D, action: A) -> Self where D: Into, A: Into, { Self { msg_id: Uuid::new_v4(), document_id: doc_id.into(), action: action.into(), } } fn get_message_id(&self) -> &Uuid { &self.msg_id } fn get_document_id(&self) -> &NameID { &self.document_id } fn get_action(&self) -> &MsgAction { &self.action } fn response(&self, action: A) -> Self where A: Into, { Self { msg_id: self.msg_id.clone(), document_id: self.document_id.clone(), action: action.into(), } } fn reply(&self, resp: Reply) -> Self { Self { msg_id: self.msg_id.clone(), document_id: self.document_id.clone(), action: MsgAction::Reply(resp), } } fn error(&self, err: MTTError) -> Self { Self { msg_id: self.msg_id.clone(), document_id: self.document_id.clone(), action: MsgAction::Error(err), } } } #[cfg(test)] mod messages { use super::*; #[test] fn can_the_document_be_a_stringi_reference() { let dts = ["one", "two"]; for document in dts.into_iter() { let msg = Message::new(document, MsgAction::Create(DocDef::new())); match msg.get_document_id() { NameID::ID(_) => unreachable!("should have been a string id"), NameID::Name(data) => assert_eq!(data, document), } match msg.get_action() { MsgAction::Create(_) => {} _ => unreachable!("should have been a create document"), } } } #[test] fn can_the_document_be_a_string() { let dts = ["one".to_string(), "two".to_string()]; for document in dts.into_iter() { let msg = Message::new(document.clone(), MsgAction::Query(Query::new())); match msg.get_document_id() { NameID::ID(_) => unreachable!("should have been a string id"), NameID::Name(data) => assert_eq!(data, &document), } match msg.get_action() { MsgAction::Query(_) => {} _ => unreachable!("should have been an access query"), } } } #[test] fn can_the_document_be_an_id() { let document = Uuid::new_v4(); let msg = Message::new(document.clone(), MsgAction::Query(Query::new())); match msg.get_document_id() { NameID::ID(data) => assert_eq!(data, &document), NameID::Name(_) => unreachable!("should have been an id"), } match msg.get_action() { MsgAction::Query(_) => {} _ => unreachable!("should have been an access query"), } } #[test] fn is_the_message_id_random() { let mut ids: Vec = Vec::new(); for _ in 0..5 { let msg = Message::new("tester", MsgAction::Create(DocDef::new())); let id = msg.get_message_id().clone(); assert!(!ids.contains(&id), "{:?} containts {}", ids, id); ids.push(id); } } #[test] fn Can_make_reply_message() { let name = "testing"; let msg = Message::new(name, MsgAction::Query(Query::new())); let responce = Reply::new(); let reply = msg.reply(responce); assert_eq!(reply.get_message_id(), msg.get_message_id()); match reply.get_document_id() { NameID::Name(data) => assert_eq!(data, name), _ => unreachable!("should have been a name"), } match reply.get_action() { MsgAction::Reply(_) => {} _ => unreachable!("should have been a reply"), } } #[test] fn Can_make_error_message() { let name = "testing"; let msg = Message::new(name, MsgAction::Query(Query::new())); let err_msg = Uuid::new_v4().to_string(); let result = msg.error(MTTError::DocumentNotFound(err_msg.clone())); assert_eq!(result.get_message_id(), msg.get_message_id()); match result.get_document_id() { NameID::Name(data) => assert_eq!(data, name), _ => unreachable!("should have been a name"), } match result.get_action() { MsgAction::Error(data) => match data { MTTError::DocumentNotFound(txt) => assert_eq!(txt, &err_msg), _ => unreachable!("got {:?}, should have received not found", data), }, _ => unreachable!("should have been a reply"), } } #[test] fn can_make_a_response_message() { let doc_id = Uuid::new_v4(); let msg = Message::new(doc_id.clone(), MsgAction::Query(Query::new())); let data = Uuid::new_v4().to_string(); let result1 = msg.response(MTTError::DocumentNotFound(data.clone())); let result2 = msg.response(Reply::new()); assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result2.get_message_id(), msg.get_message_id()); assert_eq!(result1.get_document_id(), msg.get_document_id()); assert_eq!(result2.get_document_id(), msg.get_document_id()); let action1 = result1.get_action(); match action1 { MsgAction::Error(err) => match err { MTTError::DocumentNotFound(output) => assert_eq!(output, &data), _ => unreachable!("got {:?}: should have received document not found", err), }, _ => unreachable!("got {:?}: should have received error", action1), } let action2 = result2.get_action(); match action2 { MsgAction::Reply(data) => assert_eq!(data.len(), 0), _ => unreachable!("got {:?}: should have received a reply", action2), } } } #[derive(Clone, Debug)] enum Include { All, Some(T), } impl PartialEq for Include { fn eq(&self, other: &Self) -> bool { match self { Include::All => true, Include::Some(data) => match other { Include::All => true, Include::Some(other_data) => data == other_data, }, } } } #[cfg(test)] mod includes { use super::*; #[test] fn does_all_equal_evberything() { let a: Include = Include::All; let b: Include = Include::Some(5); let c: Include = Include::Some(7); assert!(a == a, "all should equal all"); assert!(a == b, "all should equal some"); assert!(b == a, "some should equal all"); assert!(b == b, "same some should equal"); assert!(b != c, "different somes do not equal"); } } #[derive(Clone, Eq, Hash, PartialEq)] struct RouteID { action: Option, doc_type: Option, msg_id: Option, } impl From for RouteID { fn from(value: Route) -> Self { Self { action: match value.action { Include::All => None, Include::Some(action) => Some(action.clone()), }, doc_type: match value.doc_type { Include::All => None, Include::Some(doc) => Some(doc.clone()), }, msg_id: match value.msg_id { Include::All => None, Include::Some(id) => Some(id.clone()), }, } } } #[derive(Clone, Debug, PartialEq)] struct Route { action: Include, doc_type: Include, msg_id: Include, } impl Route { fn new(msg_id: Include, doc: Include, action: Include) -> Self { Self { action: action, doc_type: doc, msg_id: msg_id, } } } impl From for Route { fn from(value: RouteID) -> Self { Self { action: match value.action { Some(data) => Include::Some(data.clone()), None => Include::All, }, doc_type: match value.doc_type { Some(doc) => Include::Some(doc.clone()), None => Include::All, }, msg_id: match value.msg_id { Some(msg) => Include::Some(msg.clone()), None => Include::All, }, } } } impl From<&RouteID> for Route { fn from(value: &RouteID) -> Self { Self { action: match &value.action { Some(data) => Include::Some(data.clone()), None => Include::All, }, doc_type: match &value.doc_type { Some(doc) => Include::Some(doc.clone()), None => Include::All, }, msg_id: match &value.msg_id { Some(msg) => Include::Some(msg.clone()), None => Include::All, }, } } } #[cfg(test)] mod routes { use super::*; #[test] fn can_a_route_set_action() { let actions = [Action::Query, Action::Reply]; for action in actions.into_iter() { let route = Route::new(Include::All, Include::All, Include::Some(action.clone())); match route.msg_id { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.doc_type { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.action { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, action), } } } #[test] fn can_route_set_document_by_name() { let doc_id = Uuid::new_v4(); let route = Route::new(Include::All, Include::Some(doc_id.clone()), Include::All); match route.msg_id { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.doc_type { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, doc_id), } match route.action { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } } #[test] fn can_route_set_document_by_id() { let id = Uuid::new_v4(); let route = Route::new(Include::All, Include::Some(id.clone()), Include::All); match route.msg_id { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.doc_type { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, id), } match route.action { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } } #[test] fn can_route_be_set_by_message_id() { let id = Uuid::new_v4(); let route = Route::new(Include::Some(id.clone()), Include::All, Include::All); match route.msg_id { Include::All => unreachable!("should be a specific value"), Include::Some(result) => assert_eq!(result, id), } match route.doc_type { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } match route.action { Include::All => {} Include::Some(_) => unreachable!("should have been all"), } } } #[derive(Clone)] struct RouteRequest { msg_id: Include, doc_name: Include, action: Include, } impl RouteRequest { fn new(msg_id: Include, doc_name: Include, action: Include) -> Self { Self { msg_id: msg_id, doc_name: doc_name, action: action, } } } struct QueueData { senders: HashMap>, names: HashMap, routes: HashMap>, } impl QueueData { fn new() -> Self { Self { senders: HashMap::new(), names: HashMap::new(), routes: HashMap::new(), } } fn get_doc_id(&self, nameid: N) -> Result where N: Into, { let sender_id = match nameid.into() { NameID::Name(name) => match self.names.get(&name) { Some(id) => id.clone(), None => return Err(MTTError::DocumentNotFound(name.clone())), }, NameID::ID(id) => id.clone(), }; if self.senders.contains_key(&sender_id) { Ok(sender_id) } else { Err(MTTError::DocumentNotFound(sender_id.to_string())) } } fn register( &mut self, tx: Sender, name: String, routes: Vec, ) -> Result<(), MTTError> { let mut id = Uuid::new_v4(); while self.senders.contains_key(&id) { id = Uuid::new_v4(); } match self.get_doc_id(name.clone()) { Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)), Err(_) => {} } let mut holder: HashMap> = HashMap::new(); for route in routes.iter() { let doc_type = match &route.doc_name { Include::Some(doc_name) => { if doc_name == &name { Include::Some(id.clone()) } else { match self.get_doc_id(doc_name.to_string()) { Ok(doc_id) => Include::Some(doc_id), Err(err) => return Err(err), } } } Include::All => Include::All, }; let route_id: RouteID = Route::new(route.msg_id.clone(), doc_type, route.action.clone()).into(); match self.routes.get(&route_id) { Some(senders) => { let mut addition = senders.clone(); addition.push(id.clone()); holder.insert(route_id, addition); } None => { let senders = [id.clone()].to_vec(); holder.insert(route_id, senders); } } } self.senders.insert(id.clone(), tx); self.names.insert(name.clone(), id.clone()); for (route_id, senders) in holder.iter() { self.routes.insert(route_id.clone(), senders.clone()); } Ok(()) } fn send(&self, msg: Message) -> Result<(), MTTError> { let doc_id: Include = match self.get_doc_id(msg.get_document_id()) { Ok(id) => Include::Some(id.clone()), Err(err) => { let action: Action = msg.get_action().into(); if action == Action::Create { Include::Some(Uuid::nil()) } else { return Err(err); } } }; let route = Route::new( Include::Some(msg.get_message_id().clone()), doc_id, Include::Some(msg.get_action().into()), ); for (send_route, send_ids) in self.routes.iter() { if route == send_route.into() { for send_id in send_ids { let tx = self.senders.get(&send_id).unwrap(); tx.send(msg.clone()).unwrap(); } } } Ok(()) } } #[cfg(test)] mod queuedatas { use super::support_test::TIMEOUT; use super::*; use std::sync::mpsc::RecvTimeoutError; #[test] fn can_document_be_registered() { let mut queuedata = QueueData::new(); let (tx, rx) = channel(); let name = Uuid::new_v4().to_string(); let routes = [ RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Query), ), RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Reply), ), ] .to_vec(); queuedata.register(tx, name.clone(), routes).unwrap(); let msg1 = Message::new(name.clone(), MsgAction::Query(Query::new())); let msg2 = Message::new(name.clone(), MsgAction::Reply(Reply::new())); let msg3 = Message::new(name.clone(), MsgAction::Create(DocDef::new())); queuedata.send(msg1.clone()).unwrap(); queuedata.send(msg2.clone()).unwrap(); queuedata.send(msg3.clone()).unwrap(); let result1 = rx.recv_timeout(TIMEOUT).unwrap(); let result2 = rx.recv_timeout(TIMEOUT).unwrap(); match rx.recv_timeout(TIMEOUT) { Ok(_) => unreachable!("should have timed out"), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } assert_eq!(result1.get_message_id(), msg1.get_message_id()); assert_eq!(result2.get_message_id(), msg2.get_message_id()); match result1.get_action() { MsgAction::Query(_) => {} _ => unreachable!("should have been a query"), } match result2.get_action() { MsgAction::Reply(_) => {} _ => unreachable!("should have been a query"), } } #[test] fn does_register_fail_on_duplicate_documents() { let mut queuedata = QueueData::new(); let (tx1, _) = channel(); let (tx2, _) = channel(); let name = Uuid::new_v4().to_string(); queuedata .register(tx1, name.to_string(), Vec::new()) .unwrap(); match queuedata.register(tx2, name.to_string(), Vec::new()) { Ok(_) => unreachable!("duplicates should create an error"), Err(err) => match err { MTTError::DocumentAlreadyExists(result) => assert_eq!(result, name), _ => unreachable!("should have been document does not exists"), }, } } #[test] fn does_bad_route_prevent_registration() { let mut queuedata = QueueData::new(); let (tx, rx) = channel(); let good = "good"; let bad = Uuid::new_v4().to_string(); let routes = [ RouteRequest::new( Include::All, Include::Some(good.to_string()), Include::Some(Action::Query), ), RouteRequest::new( Include::All, Include::Some(bad.clone()), Include::Some(Action::Reply), ), ] .to_vec(); match queuedata.register(tx, good.to_string(), routes) { Ok(_) => unreachable!("should produce an error"), Err(err) => match err { MTTError::DocumentNotFound(result) => assert_eq!(result, bad), _ => unreachable!("Shouuld be document not found"), }, } assert_eq!(queuedata.senders.len(), 0, "should not add to senders"); assert_eq!(queuedata.names.len(), 0, "should not add to names"); assert_eq!(queuedata.routes.len(), 0, "should nor add to routes"); } #[test] fn is_sender_only_added_once_to_routes() { let mut queuedata = QueueData::new(); let (tx, rx) = channel(); let name = "project"; let routes = [ RouteRequest::new( Include::All, Include::Some(name.to_string()), Include::Some(Action::Query), ), RouteRequest::new( Include::All, Include::Some(name.to_string()), Include::Some(Action::Query), ), ] .to_vec(); queuedata.register(tx, name.to_string(), routes).unwrap(); for senders in queuedata.routes.values() { assert_eq!(senders.len(), 1, "should be no double entries"); } } #[test] fn does_a_bad_document_name_fail() { let docname = Uuid::new_v4().to_string(); let queuedata = QueueData::new(); let msg = Message::new(docname.clone(), MsgAction::Query(Query::new())); match queuedata.send(msg) { Ok(_) => unreachable!("should have been an error"), Err(data) => match data { MTTError::DocumentNotFound(doc) => assert_eq!(doc, docname), _ => unreachable!("should have been a not found error"), }, } } #[test] fn is_send_okay_if_no_one_is_listening() { let mut queuedata = QueueData::new(); let name = "something"; let (tx, _) = channel(); queuedata .register(tx, name.to_string(), Vec::new()) .unwrap(); let msg = Message::new("something", MsgAction::Create(DocDef::new())); match queuedata.send(msg) { Ok(_) => {} Err(err) => unreachable!("got {:?}: should not error", err), } } #[test] fn can_more_than_one_document_respond() { let mut queuedata = QueueData::new(); let name1 = "task"; let name2 = "work"; let action = MsgAction::Query(Query::new()); let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); let routes = [RouteRequest::new( Include::All, Include::Some(name1.to_string()), Include::All, )] .to_vec(); queuedata .register(tx1, name1.to_string(), routes.clone()) .unwrap(); queuedata .register(tx2, name2.to_string(), routes.clone()) .unwrap(); let msg = Message::new(name1, action.clone()); queuedata.send(msg.clone()).unwrap(); let result1 = rx1.recv_timeout(TIMEOUT).unwrap(); let result2 = rx2.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result1.get_message_id(), result2.get_message_id()); } } #[derive(Clone)] struct Queue { queue_data: Arc>, } impl Queue { fn new() -> Self { Self { queue_data: Arc::new(RwLock::new(QueueData::new())), } } fn register( &mut self, tx: Sender, name: String, routes: Vec, ) -> Result<(), MTTError> { let mut queuedata = self.queue_data.write().unwrap(); queuedata.register(tx, name, routes) } fn send(&self, msg: Message) -> Result<(), MTTError> { let queuedata = self.queue_data.read().unwrap(); queuedata.send(msg) } } #[cfg(test)] mod queues { use super::*; #[test] fn create_a_queue() { Queue::new(); } } struct CreateDoc { queue: Queue, rx: Receiver, } impl CreateDoc { fn new(queue: Queue, rx: Receiver) -> Self { Self { queue: queue, rx: rx, } } fn start(mut queue: Queue) { let (tx, rx) = channel(); let routes = [RouteRequest::new( Include::All, Include::All, Include::Some(Action::Create), )] .to_vec(); let id = queue.register(tx, "document".to_string(), routes).unwrap(); let doc = CreateDoc::new(queue, rx); spawn(move || { doc.listen(); }); } fn listen(&self) { loop { let msg = self.rx.recv().unwrap(); DocumentFile::start(self.queue.clone(), msg); } } } #[derive(Clone, Debug, PartialEq)] enum FieldType { StaticString, Uuid, } impl FieldType { fn get_default(&self) -> Field { match self { FieldType::StaticString => "".into(), FieldType::Uuid => Uuid::new_v4().into(), } } } impl From<&Field> for FieldType { fn from(value: &Field) -> Self { match value { Field::StaticString(_) => Self::StaticString, Field::Uuid(_) => Self::Uuid, } } } #[cfg(test)] mod fieldtypes { use super::*; #[test] fn can_get_defaults_for_uuid() { let ftype = FieldType::Uuid; let mut ids: Vec = Vec::new(); for _ in 0..5 { let result = ftype.get_default(); match result { Field::Uuid(data) => { assert!(!ids.contains(&data), "found duplicate id {:?} in {:?}", data, ids); ids.push(data.clone()); }, _ => unreachable!("got {:?}: should have been uuid", result), } } } #[test] fn can_get_defaults_for_static_string() { let ftype = FieldType::StaticString; let result = ftype.get_default(); match result { Field::StaticString(data) => assert_eq!(data, ""), _ => unreachable!("got {:?}: should have been static string", result), } } } #[derive(Clone, Debug, PartialEq)] enum Field { StaticString(String), Uuid(Uuid), } impl Field { fn get_type(&self) -> FieldType { self.into() } } impl From for Field { fn from(value: String) -> Self { Self::StaticString(value) } } impl From<&str> for Field { fn from(value: &str) -> Self { Self::from(value.to_string()) } } impl From for Field { fn from(value: Uuid) -> Self { Self::Uuid(value) } } #[cfg(test)] mod fields { use super::*; #[test] fn can_create_static_string() { let data = Uuid::new_v4().to_string(); let result: Field = data.clone().into(); match result.clone() { Field::StaticString(output) => assert_eq!(output, data), _ => unreachable!("got {:?}: should have been static string", result), } assert_eq!(result.get_type(), FieldType::StaticString); } #[test] fn can_create_from_str() { let holder = ["one", "two"]; for data in holder.into_iter() { let result: Field = data.into(); match result.clone() { Field::StaticString(output) => assert_eq!(output, data), _ => unreachable!("got {:?}: should have been static string", result), } assert_eq!(result.get_type(), FieldType::StaticString); } } #[test] fn create_from_uuid() { let data = Uuid::new_v4(); let result: Field = data.clone().into(); match result.clone() { Field::Uuid(output) => assert_eq!(output, data), _ => unreachable!("got {:?}: should have been uuid", result), } assert_eq!(result.get_type(), FieldType::Uuid); } } #[derive(Clone, Debug)] struct FieldSetting { fieldtype: FieldType, } impl FieldSetting { fn new(ftype: FieldType) -> Self { Self { fieldtype: ftype } } fn get_type(&self) -> &FieldType { &self.fieldtype } } #[cfg(test)] mod fieldsettings { use super::*; #[test] fn can_field_type_be_assigned() { let ftypes = [FieldType::StaticString, FieldType::Uuid]; for ftype in ftypes.into_iter() { let fieldinfo = FieldSetting::new(ftype.clone()); assert_eq!(fieldinfo.get_type(), &ftype); } } } #[derive(Clone, Debug)] struct Addition { data: Document, } impl Addition { fn new() -> Self { Self { data: Document::new(), } } fn add_field(&mut self, name: String, field: F) where F: Into, { self.data.add_field(name, field); } fn get_field(&self, name: &str) -> Option<&Field> { self.data.get_field(name) } fn get_document(&self) -> Document { self.data.clone() } } #[cfg(test)] mod additions { use super::*; #[test] fn can_add_static_string() { let mut add = Addition::new(); let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4().to_string(); add.add_field(name.clone(), data.clone()); let result = add.get_field(&name).unwrap(); match result { Field::StaticString(result) => assert_eq!(result, &data), _ => unreachable!("got {:?}: should have received static string", result), } } fn can_add_uuid() { let mut add = Addition::new(); let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4(); add.add_field(name.clone(), data.clone()); let result = add.get_field(&name).unwrap(); match result { Field::Uuid(result) => assert_eq!(result, &data), _ => unreachable!("got {:?}: should have received uuid", result), } } fn can_get_document() { let mut add = Addition::new(); let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4(); add.add_field(name.clone(), data.clone()); let doc: Document = add.get_document(); match doc.get_field(&name).unwrap() { Field::Uuid(result) => assert_eq!(result, &data), _ => unreachable!("should have received uuid"), } } } type DocDefMap = HashMap; #[derive(Clone, Debug)] struct DocDef { fields: HashMap, } impl DocDef { fn new() -> Self { Self { fields: HashMap::new(), } } fn add_field(&mut self, name: String, ftype: FieldType) { self.fields.insert(name, FieldSetting::new(ftype)); } fn get_field(&self, name: &str) -> Result<&FieldSetting, MTTError> { match self.fields.get(name) { Some(data) => Ok(data), None => Err(MTTError::DocumentFieldNotFound(name.to_string())), } } fn iter(&self) -> impl Iterator { self.fields.iter() } } #[cfg(test)] mod docdefs { use super::*; #[test] fn can_field_be_added() { let mut docdef = DocDef::new(); let name = Uuid::new_v4().to_string(); let field_type = FieldType::Uuid; docdef.add_field(name.clone(), field_type.clone()); let result = docdef.get_field(name.as_str()).unwrap(); assert_eq!(result.get_type(), &field_type); } #[test] fn produces_error_for_bad_fields() { let docdef = DocDef::new(); let name = Uuid::new_v4().to_string(); match docdef.get_field(name.as_str()) { Ok(_) => unreachable!("should return non existant field error"), Err(err) => match err { MTTError::DocumentFieldNotFound(data) => assert_eq!(data, name), _ => unreachable!("got {:?}: should have been document field not found", err), }, } } #[test] fn can_multiple_fields_be_added() { let mut docdef = DocDef::new(); let names = ["one", "two", "three"]; let field_type = FieldType::StaticString; for name in names.iter() { docdef.add_field(name.to_string(), field_type.clone()); } for name in names.iter() { let result = docdef.get_field(name).unwrap(); assert_eq!(result.get_type(), &field_type); } } } #[derive(Clone, Debug)] enum Operand { Equal, } #[derive(Clone, Debug)] struct Specifier { field_name: String, operation: Operand, value: Field, } impl Specifier { fn new(name: String, op: Operand, value: F) -> Self where F: Into, { Self { field_name: name, operation: op, value: value.into(), } } } #[derive(Clone, Debug)] struct Query { specifiers: Vec, } impl Query { fn new() -> Self { Self { specifiers: Vec::new(), } } fn add_specifier(&mut self, name: String, op: Operand, value: F) where F: Into, { let spec = Specifier::new(name, op, value); self.specifiers.push(spec); } fn run(&self, docs: &DocumentFile) -> Result { let mut reply = Reply::new(); let docdef = docs.get_docdef(); for specifier in self.specifiers.iter() { match docdef.get_field(&specifier.field_name) { Ok(spec) => { let value_type: FieldType = (&specifier.value).into(); let wanted_type = spec.get_type(); if &value_type != wanted_type { return Err(MTTError::DocumentFieldWrongDataType( wanted_type.clone(), value_type.clone(), )); } } Err(err) => return Err(err), } } for doc in docs.get_documents() { let mut output = true; for specifier in self.specifiers.iter() { let value = doc.get_field(&specifier.field_name).unwrap(); if value != &specifier.value { output = false; } } if output { reply.add(doc.clone()); } } Ok(reply) } } #[derive(Clone, Debug)] struct Reply { data: Vec, } impl Reply { fn new() -> Self { Self { data: Vec::new() } } fn add(&mut self, doc: Document) { self.data.push(doc); } fn len(&self) -> usize { self.data.len() } fn iter(&self) -> impl Iterator { self.data.iter() } } #[cfg(test)] mod replies { use super::*; #[test] fn is_new_empty() { let reply = Reply::new(); assert_eq!(reply.len(), 0, "should have no records"); } #[test] fn can_add_documents() { let mut reply = Reply::new(); let doc = Document::new(); reply.add(doc.clone()); assert_eq!(reply.len(), 1); reply.add(doc.clone()); assert_eq!(reply.len(), 2); } #[test] fn can_retrieve_documents() { let fieldname = "field".to_string(); let mut doc1 = Document::new(); doc1.add_field(fieldname.clone(), "one"); let mut doc2 = Document::new(); doc2.add_field(fieldname.clone(), "two"); let mut reply = Reply::new(); reply.add(doc1); reply.add(doc2); let mut reply_iter = reply.iter(); let mut result1 = reply_iter.next().unwrap(); match result1.get_field(&fieldname).unwrap() { Field::StaticString(output) => assert_eq!(output, "one"), _ => unreachable!("got {:?}: should have been static string", result1), } let result2 = reply_iter.next().unwrap(); match result2.get_field(&fieldname).unwrap() { Field::StaticString(output) => assert_eq!(output, "two"), _ => unreachable!("got {:?}: should have been static string", result2), } match reply_iter.next() { None => {} Some(_) => unreachable!("should be out of data"), } } } type DocumentMap = HashMap; #[derive(Clone, Debug)] struct Document { data: HashMap, } impl Document { fn new() -> Self { Self { data: HashMap::new(), } } fn add_field(&mut self, name: String, field: F) where F: Into, { self.data.insert(name, field.into()); } fn get_field(&self, name: &str) -> Option<&Field> { self.data.get(name) } fn iter(&self) -> impl Iterator { self.data.iter() } } #[cfg(test)] mod documents { use super::*; #[test] fn can_add_static_string() { let mut add = Document::new(); let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4().to_string(); add.add_field(name.clone(), data.clone()); let result = add.get_field(&name).unwrap(); match result { Field::StaticString(result) => assert_eq!(result, &data), _ => unreachable!("got {:?}: should have received static string", result), } } fn can_add_uuid() { let mut add = Document::new(); let name = Uuid::new_v4().to_string(); let data = Uuid::new_v4(); add.add_field(name.clone(), data.clone()); let result = add.get_field(&name).unwrap(); match result { Field::Uuid(result) => assert_eq!(result, &data), _ => unreachable!("got {:?}: should have received uuid", result), } } } struct DocumentFile { docdef: DocDef, docs: Vec, queue: Queue, rx: Receiver, } impl DocumentFile { fn new(queue: Queue, rx: Receiver, docdef: DocDef) -> Self { Self { docdef: docdef, docs: Vec::new(), queue: queue, rx: rx, } } fn start(mut queue: Queue, msg: Message) { let (tx, rx) = channel(); let name = match msg.get_document_id() { NameID::Name(name) => name.clone(), NameID::ID(id) => id.to_string(), }; let routes = [ RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Addition), ), RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Query), ), RouteRequest::new( Include::All, Include::Some(name.clone()), Include::Some(Action::Show), ), ] .to_vec(); match queue.register(tx, name, routes) { Ok(_) => {} Err(err) => { let error = msg.error(err); queue.send(error).unwrap(); return; } } let action = msg.get_action(); let docdef = match action { MsgAction::Create(data) => data.clone(), _ => unreachable!("got {:?}: should have been a create message", action), }; let mut doc = DocumentFile::new(queue.clone(), rx, docdef); spawn(move || { doc.listen(); }); let reply = msg.reply(Reply::new()); queue.send(reply).unwrap(); } fn listen(&mut self) { loop { let msg = self.rx.recv().unwrap(); let result = match msg.get_action() { MsgAction::Addition(data) => self.add_document(data), MsgAction::Query(query) => self.query(query), _ => Reply::new().into(), }; self.queue.send(msg.response(result)).unwrap(); } } fn get_docdef(&self) -> &DocDef { &self.docdef } fn get_documents<'a>(&self) -> impl Iterator { self.docs.iter() } fn add_document(&mut self, addition: &Addition) -> MsgAction { let mut holder = Document::new(); let doc = addition.get_document(); for (key, value) in doc.iter() { match self.docdef.get_field(&key) { Err(err) => return err.into(), Ok(field_info) => { if field_info.get_type() == &value.get_type() { holder.add_field(key.clone(), value.clone()); } else { return MTTError::DocumentFieldWrongDataType( value.get_type(), field_info.get_type().clone(), ) .into(); } } } } for (key, _) in self.docdef.iter() { match holder.get_field(key) { Some(_) => {} None => return MTTError::DocumentFieldMissing(key.clone()).into(), } } self.docs.push(holder.clone()); let mut reply = Reply::new(); reply.add(holder); reply.into() } fn query(&self, query: &Query) -> MsgAction { match query.run(self) { Ok(reply) => reply.into(), Err(err) => err.into(), } } } #[cfg(test)] mod document_files { use super::{support_test::TIMEOUT, *}; use std::sync::mpsc::RecvTimeoutError; fn standard_routes() -> Vec { [ RouteRequest::new(Include::All, Include::All, Include::Some(Action::Reply)), RouteRequest::new(Include::All, Include::All, Include::Some(Action::Error)), ] .to_vec() } fn create_docdef(field_types: Vec) -> DocDef { let mut output = DocDef::new(); let mut count = 0; for field_type in field_types.iter() { output.add_field(format!("field{}", count), field_type.clone()); count += 1; } output } fn test_doc( name: &str, docdef: DocDef, routes: Vec, ) -> (Queue, Receiver) { let (tx, rx) = channel(); let mut queue = Queue::new(); let msg = Message::new(name, docdef); DocumentFile::start(queue.clone(), msg); queue .register(tx, Uuid::new_v4().to_string(), routes) .unwrap(); (queue, rx) } #[test] fn does_not_respond_to_create() { let docdef = DocDef::new(); let name = "quiet"; let (mut queue, rx) = test_doc(name, docdef, standard_routes()); let other = "alternate"; let (tx, _) = channel(); queue.register(tx, other.to_string(), Vec::new()).unwrap(); let msg = Message::new(name, DocDef::new()); queue.send(msg).unwrap(); match rx.recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } #[test] fn can_show_document_details() { let docdef = DocDef::new(); let name = "first"; let (queue, rx) = test_doc(name, docdef, standard_routes()); let msg = Message::new(name, MsgAction::Show); queue.send(msg.clone()).unwrap(); let show = rx.recv_timeout(TIMEOUT).unwrap(); } #[test] fn can_query_new_document() { let docdef = DocDef::new(); let name = "second"; let (queue, rx) = test_doc(name, docdef, standard_routes()); let query = Message::new(name, Query::new()); queue.send(query).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Reply(data) => assert_eq!(data.len(), 0), _ => unreachable!( "got {:?}: should have received a reply", result.get_action() ), } } #[test] fn only_responses_to_its_show_request() { let docdef = DocDef::new(); let name = "quiet"; let (mut queue, rx) = test_doc(name, docdef, standard_routes()); let other = "alternate"; let (tx, _) = channel(); queue.register(tx, other.to_string(), Vec::new()).unwrap(); let msg = Message::new(other, MsgAction::Show); queue.send(msg).unwrap(); match rx.recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } #[test] fn only_responses_to_its_query_request() { let docdef = DocDef::new(); let name = "quiet"; let (mut queue, rx) = test_doc(name, docdef, standard_routes()); let other = "alternate"; let (tx, _) = channel(); queue.register(tx, other.to_string(), Vec::new()).unwrap(); let msg = Message::new(other, Query::new()); queue.send(msg).unwrap(); match rx.recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } #[test] fn can_document_be_added() { let mut docdef = DocDef::new(); let name = "field"; let doc_name = "document"; let data = Uuid::new_v4(); docdef.add_field(name.to_string(), FieldType::Uuid); let (queue, rx) = test_doc(doc_name, docdef, standard_routes()); let mut new_doc = Addition::new(); new_doc.add_field(name.to_string(), data.clone()); let msg = Message::new(doc_name, new_doc); queue.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); match result.get_action() { MsgAction::Reply(output) => { assert_eq!(output.len(), 1); let holder = output.iter().next().unwrap(); match holder.get_field(name) { Some(field) => match field { Field::Uuid(store) => assert_eq!(store, &data), _ => unreachable!( "got {:?}: should have received uuid", holder.get_field(name).unwrap() ), }, None => unreachable!("{:?} did not contain field '{}'", holder, name), } } _ => unreachable!("got {:?}: should have been a reply", result), } let msg = Message::new(doc_name, Query::new()); queue.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); match result.get_action() { MsgAction::Reply(output) => { assert_eq!(output.len(), 1); let holder = output.iter().next().unwrap(); match holder.get_field(name) { Some(field) => match field { Field::Uuid(store) => assert_eq!(store, &data), _ => unreachable!( "got {:?}: should have received uuid", holder.get_field(name).unwrap() ), }, None => unreachable!("{:?} did not contain field '{}'", holder, name), } } _ => unreachable!("got {:?}: should have been a reply", result), } } #[test] fn only_responses_to_its_additions() { let docdef = DocDef::new(); let name = "quiet"; let (mut queue, rx) = test_doc(name, docdef, standard_routes()); let other = "alternate"; let (tx, _) = channel(); queue.register(tx, other.to_string(), Vec::new()).unwrap(); let msg = Message::new(other, Addition::new()); queue.send(msg).unwrap(); match rx.recv_timeout(TIMEOUT) { Ok(msg) => unreachable!("should not receive: {:?}", msg), Err(err) => match err { RecvTimeoutError::Timeout => {} _ => unreachable!("should have timed out"), }, } } #[test] fn can_add_multiple_documents() { let count = 4; let mut docdef = DocDef::new(); let name = "field"; let doc_name = "document"; let data = Uuid::new_v4(); docdef.add_field(name.to_string(), FieldType::Uuid); let (queue, rx) = test_doc(doc_name, docdef, standard_routes()); let mut new_doc = Addition::new(); new_doc.add_field(name.to_string(), data.clone()); for _ in 0..count { let msg = Message::new(doc_name, new_doc.clone()); queue.send(msg.clone()).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); // eats the confirmation reply. } let msg = Message::new(doc_name, Query::new()); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Reply(data) => assert_eq!(data.len(), count), _ => unreachable!("got {:?}: should have been a reply", result.get_action()), } } #[test] fn errors_on_wrong_field_name() { let doc_name = "mismatch"; let field_name = Uuid::new_v4().to_string(); let (queue, rx) = test_doc( doc_name, create_docdef([FieldType::Uuid].to_vec()), standard_routes(), ); let mut addition = Addition::new(); addition.add_field(field_name.clone(), Uuid::new_v4()); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Error(err) => match err { MTTError::DocumentFieldNotFound(data) => assert_eq!(data, &field_name), _ => unreachable!("got {:?}: should have been document field not found.", err), }, _ => unreachable!("got {:?}: should have been an error", result.get_action()), } } #[test] fn errors_on_wrong_field_type() { let doc_name = "mismatch"; let (queue, rx) = test_doc( doc_name, create_docdef([FieldType::Uuid].to_vec()), standard_routes(), ); let mut addition = Addition::new(); addition.add_field("field0".to_string(), "astring"); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Error(err) => match err { MTTError::DocumentFieldWrongDataType(found, expected) => { assert_eq!(found, &FieldType::StaticString); assert_eq!(expected, &FieldType::Uuid); } _ => unreachable!( "got {:?}: should have been document field data mismatch.", err ), }, _ => unreachable!("got {:?}: should have been an error", result.get_action()), } } #[test] fn errors_on_missing_fields() { let doc_name = "missing"; let (queue, rx) = test_doc( doc_name, create_docdef([FieldType::Uuid, FieldType::Uuid].to_vec()), standard_routes(), ); let mut addition = Addition::new(); addition.add_field("field0".to_string(), Uuid::nil()); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::Error(err) => match err { MTTError::DocumentFieldMissing(field) => assert_eq!(field, "field1"), _ => unreachable!("got {:?}: should have been document field missing", err), }, _ => unreachable!("got {:?}: should have been an error", result.get_action()), } } #[test] fn does_query_return_related_entries() { let doc_name = "query"; let (queue, rx) = test_doc( doc_name, create_docdef([FieldType::Uuid, FieldType::Uuid].to_vec()), standard_routes(), ); let field0 = Uuid::new_v4(); let field1 = Uuid::new_v4(); for _ in 0..3 { let mut addition = Addition::new(); addition.add_field("field0".to_string(), Uuid::new_v4()); addition.add_field("field1".to_string(), Uuid::new_v4()); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); } let mut addition = Addition::new(); addition.add_field("field0".to_string(), field0.clone()); addition.add_field("field1".to_string(), field1.clone()); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); for _ in 0..3 { let mut addition = Addition::new(); addition.add_field("field0".to_string(), Uuid::new_v4()); addition.add_field("field1".to_string(), Uuid::new_v4()); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); } let mut query = Query::new(); query.add_specifier("field0".to_string(), Operand::Equal, field0.clone()); let msg = Message::new(doc_name, query); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); let field0: Field = field0.into(); let field1: Field = field1.into(); match action { MsgAction::Reply(data) => { assert_eq!(data.len(), 1, "should return one entry"); for doc in data.iter() { assert_eq!(doc.get_field("field0").unwrap(), &field0); assert_eq!(doc.get_field("field1").unwrap(), &field1); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn gets_all_documents_in_query() { let doc_name = "multiple"; let count = 4; let input = Uuid::new_v4(); let (queue, rx) = test_doc( doc_name, create_docdef([FieldType::Uuid].to_vec()), standard_routes(), ); for _ in 0..3 { let mut addition = Addition::new(); addition.add_field("field0".to_string(), Uuid::new_v4()); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); } for _ in 0..count { let mut addition = Addition::new(); addition.add_field("field0".to_string(), input.clone()); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); } let mut query = Query::new(); query.add_specifier("field0".to_string(), Operand::Equal, input.clone()); let msg = Message::new(doc_name, query); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); let input: Field = input.into(); match action { MsgAction::Reply(data) => { assert_eq!(data.len(), count, "should return {} entries", count); for doc in data.iter() { assert_eq!(doc.get_field("field0").unwrap(), &input); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn query_should_work_with_multiple_fields() { let doc_name = "onlyone"; let (queue, rx) = test_doc( doc_name, create_docdef([FieldType::Uuid, FieldType::Uuid].to_vec()), standard_routes(), ); let field0 = Uuid::new_v4(); let field1 = Uuid::new_v4(); let input = [ [Uuid::new_v4(), Uuid::new_v4()], [field0.clone(), field1.clone()], [field1.clone(), field0.clone()], [field0.clone(), Uuid::new_v4()], [Uuid::new_v4(), field1.clone()], ]; for combo in input.iter() { let mut addition = Addition::new(); addition.add_field("field0".to_string(), combo[0].clone()); addition.add_field("field1".to_string(), combo[1].clone()); let msg = Message::new(doc_name, addition); queue.send(msg).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); } let mut query = Query::new(); query.add_specifier("field0".to_string(), Operand::Equal, field0.clone()); query.add_specifier("field1".to_string(), Operand::Equal, field1.clone()); let msg = Message::new(doc_name, query); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); let field0: Field = field0.into(); let field1: Field = field1.into(); match action { MsgAction::Reply(data) => { assert_eq!(data.len(), 1, "should return one entry"); for doc in data.iter() { assert_eq!(doc.get_field("field0").unwrap(), &field0); assert_eq!(doc.get_field("field1").unwrap(), &field1); } } _ => unreachable!("got {:?}: should have been a reply", action), } } #[test] fn errors_on_bad_field_name() { let doc_name = "testing"; let field_name = "wrong"; let (queue, rx) = test_doc(doc_name, create_docdef(Vec::new()), standard_routes()); let mut query = Query::new(); query.add_specifier(field_name.to_string(), Operand::Equal, Uuid::new_v4()); let msg = Message::new(doc_name, query); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(data) => match data { MTTError::DocumentFieldNotFound(output) => assert_eq!(output, field_name), _ => unreachable!("got {:?}: should been field not found", data), }, _ => unreachable!("got {:?}: should have been a error", action), } } #[test] fn errors_on_bad_field_type() { let doc_name = "testing"; let (queue, rx) = test_doc( doc_name, create_docdef([FieldType::Uuid].to_vec()), standard_routes(), ); let mut query = Query::new(); query.add_specifier("field0".to_string(), Operand::Equal, "wrong"); let msg = Message::new(doc_name, query); queue.send(msg).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); let action = result.get_action(); match action { MsgAction::Error(data) => match data { MTTError::DocumentFieldWrongDataType(expected, got) => { assert_eq!(expected, &FieldType::Uuid); assert_eq!(got, &FieldType::StaticString); } _ => unreachable!("got {:?}: should been field not found", data), }, _ => unreachable!("got {:?}: should have been a error", action), } } } #[cfg(test)] mod createdocs { use super::support_test::TIMEOUT; use super::*; fn setup_create_doc(routes: Vec) -> (Queue, Receiver) { let mut queue = Queue::new(); let (tx, rx) = channel(); queue .register(tx, Uuid::new_v4().to_string(), routes) .unwrap(); CreateDoc::start(queue.clone()); (queue, rx) } #[test] fn create_document_creation() { let name = "project"; let routes = [RouteRequest::new( Include::All, Include::All, Include::Some(Action::Reply), )] .to_vec(); let (queue, rx) = setup_create_doc(routes); let msg1 = Message::new(name, MsgAction::Create(DocDef::new())); queue.send(msg1.clone()).unwrap(); let result1 = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result1.get_message_id(), msg1.get_message_id()); assert_eq!(result1.get_document_id(), msg1.get_document_id()); match result1.get_action() { MsgAction::Reply(_) => {} _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), } let msg2 = Message::new(name, MsgAction::Query(Query::new())); queue.send(msg2.clone()).unwrap(); let result2 = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result2.get_message_id(), msg2.get_message_id()); assert_eq!(result2.get_document_id(), msg2.get_document_id()); match result2.get_action() { MsgAction::Reply(_) => {} _ => unreachable!("got {:?}: should have been a reply.", result1.get_action()), } } #[test] fn does_duplicates_generate_error() { let name = "duplicate"; let routes = [ RouteRequest::new(Include::All, Include::All, Include::Some(Action::Reply)), RouteRequest::new(Include::All, Include::All, Include::Some(Action::Error)), ] .to_vec(); let (queue, rx) = setup_create_doc(routes); let msg = Message::new(name, MsgAction::Create(DocDef::new())); queue.send(msg.clone()).unwrap(); rx.recv_timeout(TIMEOUT).unwrap(); queue.send(msg.clone()).unwrap(); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); assert_eq!(result.get_document_id(), msg.get_document_id()); match result.get_action() { MsgAction::Error(err) => match err { MTTError::DocumentAlreadyExists(data) => assert_eq!(data, name), _ => unreachable!("got {:?}: should have been a reply.", err), }, _ => unreachable!("got {:?}: should have been a reply.", result.get_action()), } } }