diff --git a/src/message.rs b/src/message.rs index 74e4c4f..f3defb4 100644 --- a/src/message.rs +++ b/src/message.rs @@ -22,6 +22,7 @@ mod support_test { #[derive(Clone, Debug)] enum MTTError { AdditionMissingField(Name), + CannotConvertMessageToRouteID, DocumentAlreadyExists(String), DocumentFieldAlreadyExists(String, Field), DocumentFieldMissing(String), @@ -495,6 +496,61 @@ impl From for RouteID { } } +impl TryFrom for RouteID { + type Error = MTTError; + + fn try_from(value: Message) -> Result { + let doc_id = match value.get_document_id() { + NameType::ID(data) => data, + _ => return Err(MTTError::CannotConvertMessageToRouteID), + }; + Ok(RouteID { + action: Some(value.get_action().into()), + doc_type: Some(doc_id.clone()), + msg_id: Some(value.get_message_id().clone()), + }) + } +} + +#[cfg(test)] +mod route_ids { + use super::*; + + #[test] + fn can_get_message_route_id() { + let actions: Vec = vec![Query::new().into(), MsgAction::Show]; + let doc_id = Uuid::new_v4(); + for action in actions.iter() { + let msg = Message::new(doc_id.clone(), action.clone()); + let route_id: RouteID = msg.clone().try_into().unwrap(); + match route_id.msg_id { + Some(data) => assert_eq!(&data, msg.get_message_id()), + None => unreachable!("should have had a message id"), + } + match route_id.doc_type { + Some(data) => assert_eq!(data, doc_id), + None => unreachable!("should have had doc type"), + } + match route_id.action { + Some(data) => assert_eq!(data, action.into()), + None => unreachable!("should have had doc type"), + } + } + } + + #[test] + fn errors_when_doc_id_is_not_id() { + let msg = Message::new(Name::english("nope"), Query::new()); + match TryInto::::try_into(msg) { + Ok(_) => unreachable!("should be an error"), + Err(err) => match err { + MTTError::CannotConvertMessageToRouteID => {} + _ => unreachable!("got {:?}, should have been covert error", err), + }, + } + } +} + #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum NameType { ID(Uuid), @@ -2623,28 +2679,106 @@ impl IndexType { } } +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +enum DocFuncType { + Action, + Add, + Delete, + Query, + Show, + Update, +} + +#[derive(Clone, Debug)] +struct PathAction { + path: Path, + func_type: DocFuncType, +} + +impl PathAction { + fn new(path: Path, func_type: DocFuncType) -> Self { + Self { + path: path, + func_type: func_type, + } + } + + fn path(&self) -> Path { + self.path.clone() + } + + fn doc_function(&self) -> DocFuncType { + self.func_type.clone() + } +} + #[derive(Clone, Debug)] struct DocDef { doc_names: Vec, field_names: Names, fields: HashMap, indexes: HashMap, + routes: Vec, } impl DocDef { fn new(name: Name) -> Self { + let names = vec![name]; + Self::with_names(names) + } + + fn with_names(names: Vec) -> Self { + let routes = vec![ + PathAction::new( + Path::new( + Include::All, + Include::Some(names[0].clone().into()), + Include::Some(Action::Addition), + ), + DocFuncType::Add, + ), + PathAction::new( + Path::new( + Include::All, + Include::Some(names[0].clone().into()), + Include::Some(Action::Delete), + ), + DocFuncType::Delete, + ), + PathAction::new( + Path::new( + Include::All, + Include::Some(names[0].clone().into()), + Include::Some(Action::Query), + ), + DocFuncType::Query, + ), + PathAction::new( + Path::new( + Include::All, + Include::Some(names[0].clone().into()), + Include::Some(Action::Show), + ), + DocFuncType::Show, + ), + PathAction::new( + Path::new( + Include::All, + Include::Some(names[0].clone().into()), + Include::Some(Action::Update), + ), + DocFuncType::Update, + ), + ]; Self { - doc_names: [name].to_vec(), + doc_names: names, field_names: Names::new(), fields: HashMap::new(), indexes: HashMap::new(), + routes: routes, } } - fn with_names(names: Name) -> Self { - todo!("with an existing list of names"); - } - fn get_document_names(&self) -> &Vec { &self.doc_names } @@ -2739,6 +2873,10 @@ impl DocDef { fn iter(&self) -> impl Iterator { self.fields.iter() } + + fn iter_routes(&self) -> impl Iterator { + self.routes.iter() + } } #[cfg(test)] @@ -2835,6 +2973,67 @@ mod docdefs { }, } } + + #[test] + fn does_default_routes_get_set() { + let default_num = 5; + let docname = Name::english(Uuid::new_v4().to_string().as_str()); + let docdef = DocDef::new(docname.clone()); + assert_eq!( + docdef.iter_routes().count(), + default_num, + "routes contained the following:\n{:?}", + docdef.routes + ); + let mut actions: HashSet = HashSet::new(); + let mut doc_funcs: HashSet = HashSet::new(); + for path_action in docdef.iter_routes() { + let path = path_action.path(); + match &path.msg_id { + Include::All => {} + _ => unreachable!("got {:?}, message id should include all", path.msg_id), + } + match &path.doc { + Include::Some(output) => match output { + NameType::Name(data) => assert_eq!(data, &docname), + _ => unreachable!("got {:?}, name type should be {:?}", path.doc, docname), + }, + _ => unreachable!("got {:?}, name type should be {:?}", path.doc, docname), + }; + match &path.action { + Include::Some(output) => match output { + Action::Addition => actions.insert(output.clone()), + Action::Delete => actions.insert(output.clone()), + Action::Query => actions.insert(output.clone()), + Action::Show => actions.insert(output.clone()), + Action::Update => actions.insert(output.clone()), + _ => unreachable!("got {:?} which is not a default action", output), + }, + _ => unreachable!("got {:?}, which is not a default action", path.action), + }; + let file_func = path_action.doc_function(); + match file_func { + DocFuncType::Add => doc_funcs.insert(file_func), + DocFuncType::Delete => doc_funcs.insert(file_func), + DocFuncType::Query => doc_funcs.insert(file_func), + DocFuncType::Show => doc_funcs.insert(file_func), + DocFuncType::Update => doc_funcs.insert(file_func), + _ => unreachable!("got {:?}, which is not a default function", file_func), + }; + } + assert_eq!( + actions.len(), + default_num, + "got {:?}, missing some actions", + actions + ); + assert_eq!( + doc_funcs.len(), + default_num, + "got {:?}, missing some actions", + doc_funcs + ); + } } #[derive(Clone, Debug)] @@ -4199,16 +4398,23 @@ struct DocumentFile { docs: InternalRecords, indexes: Indexes, queue: Queue, + routes: HashMap, rx: Receiver, } impl DocumentFile { - fn new(queue: Queue, rx: Receiver, docdef: DocDef) -> Self { + fn new( + queue: Queue, + rx: Receiver, + docdef: DocDef, + routes: HashMap, + ) -> Self { Self { docdef: docdef.clone(), docs: InternalRecords::new(), indexes: docdef.create_indexes(), queue: queue, + routes: routes, rx: rx, } } @@ -4238,40 +4444,27 @@ impl DocumentFile { }, _ => unreachable!("should only return a name id or an error"), }; - let routes = [ - Path::new( - Include::All, - Include::Some(name_id.into()), - Include::Some(Action::Addition), - ), - Path::new( - Include::All, - Include::Some(name_id.into()), - Include::Some(Action::Delete), - ), - Path::new( - Include::All, - Include::Some(name_id.into()), - Include::Some(Action::Query), - ), - Path::new( - Include::All, - Include::Some(name_id.into()), - Include::Some(Action::Show), - ), - Path::new( - Include::All, - Include::Some(name_id.into()), - Include::Some(Action::Update), - ), - ]; - for route in routes.iter() { - let request = reg_msg.response(RegMsg::AddRoute(route.clone())); + let mut route_action: HashMap = HashMap::new(); + for path_action in docdef.iter_routes() { + let request = reg_msg.response(RegMsg::AddRoute(path_action.path())); let add_route = rmsg.response(request); queue.send(add_route).unwrap(); - rx.recv().unwrap(); + let result = rx.recv().unwrap(); + let route_id = match result.get_action() { + MsgAction::Register(data) => match data.get_msg() { + RegMsg::RouteID(data) => data, + RegMsg::Error(err) => { + queue.remove_sender(&id); + queue.send(msg.response(err.clone())).unwrap(); + return; + } + _ => unreachable!("should only return a route id or an error"), + }, + _ => unreachable!("should only return a route id or an error"), + }; + route_action.insert(route_id.clone(), path_action.doc_function()); } - let mut doc = DocumentFile::new(queue.clone(), rx, docdef); + let mut doc = DocumentFile::new(queue.clone(), rx, docdef, route_action); spawn(move || { doc.listen(); }); @@ -4282,14 +4475,19 @@ impl DocumentFile { fn listen(&mut self) { loop { let msg = self.rx.recv().unwrap(); - let result = match msg.get_action() { - MsgAction::Addition(data) => self.add_document(data), - MsgAction::Delete(delete) => self.delete(delete), - MsgAction::Query(query) => self.query(query), - MsgAction::Update(update) => self.update(update), - _ => Reply::new().into(), - }; - self.queue.send(msg.response(result)).unwrap(); + let route: Route = msg.get_path().try_into().unwrap(); + for (route_id, doc_func) in self.routes.clone().iter() { + if route == route_id.into() { + match doc_func { + DocFuncType::Add => self.add_document(&msg), + DocFuncType::Delete => self.delete(&msg), + DocFuncType::Query => self.query(&msg), + DocFuncType::Show => self.queue.send(msg.response(Reply::new())).unwrap(), + DocFuncType::Update => self.update(&msg), + _ => {} + } + } + } } } @@ -4339,12 +4537,20 @@ impl DocumentFile { self.indexes.remove_from_index(&field_id, field, oid); } - fn add_document(&mut self, addition: &Addition) -> MsgAction { + fn add_document(&mut self, msg: &Message) { + let addition = match msg.get_action() { + MsgAction::Addition(data) => data, + _ => return, + }; let mut holder = InternalRecord::new(); for (name_id, value) in addition.iter() { let field_id = match self.docdef.get_field_id(name_id) { Ok(id) => id, - Err(err) => return MsgAction::Error(err), + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply).unwrap(); + return; + } }; holder.insert(field_id.clone(), value.get(&Field::None)); } @@ -4355,7 +4561,11 @@ impl DocumentFile { }; let corrected = match self.validate(field_id, value) { Ok(data) => data, - Err(err) => return MsgAction::Error(err), + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply).unwrap(); + return; + } }; holder.insert(field_id.clone(), corrected.clone()); } @@ -4372,13 +4582,22 @@ impl DocumentFile { self.docs.insert(oid.clone(), holder.clone()); records.insert(oid, holder); } - records.into() + let reply = msg.response(records); + self.queue.send(reply).unwrap(); } - fn delete(&mut self, delete: &Delete) -> MsgAction { + fn delete(&mut self, msg: &Message) { + let delete = match msg.get_action() { + MsgAction::Delete(data) => data, + _ => unreachable!("should always be delete action"), + }; let records = match self.run_query(delete.get_query()) { Ok(data) => data, - Err(err) => return err.into(), + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply).unwrap(); + return; + } }; for (oid, record) in records.iter() { for (field_id, index) in self.indexes.iter_mut() { @@ -4386,7 +4605,8 @@ impl DocumentFile { } self.docs.remove(oid); } - Records::with_data(self.docdef.get_field_names().clone(), records).into() + let rec = Records::with_data(self.docdef.get_field_names().clone(), records); + self.queue.send(msg.response(rec)).unwrap(); } fn run_query(&self, query: &Query) -> Result { @@ -4440,26 +4660,45 @@ impl DocumentFile { Ok(records) } - fn query(&self, query: &Query) -> MsgAction { + fn query(&self, msg: &Message) { + let query = match msg.get_action() { + MsgAction::Query(data) => data, + _ => unreachable!("should receive a query"), + }; let records = match self.run_query(query) { Ok(data) => data, - Err(err) => return err.into(), + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply).unwrap(); + return; + } }; let recs = Records::with_data(self.docdef.get_field_names().clone(), records); - //self.queue. - recs.into() + self.queue.send(msg.response(recs)).unwrap(); } - fn update(&mut self, update: &Update) -> MsgAction { + fn update(&mut self, msg: &Message) { + let update = match msg.get_action() { + MsgAction::Update(data) => data, + _ => unreachable!("should receive a update"), + }; let original = match self.run_query(update.get_query()) { Ok(result) => result, - Err(err) => return err.into(), + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply).unwrap(); + return; + } }; let mut changes: HashMap = HashMap::new(); for (key, value) in update.get_values().iter() { let field_id = match self.docdef.get_field_id(key) { Ok(data) => data, - Err(err) => return err.into(), + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply).unwrap(); + return; + } }; changes.insert(field_id, value); } @@ -4471,12 +4710,20 @@ impl DocumentFile { let field = value.get(holder.get(field_id).unwrap()); let correction = match self.validate(field_id, &field) { Ok(data) => data, - Err(err) => return err.into(), + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply).unwrap(); + return; + } }; holder.insert(field_id.clone(), correction.clone()); match indexes.add_to_index(&field_id, correction, oid.clone()) { Ok(_) => {} - Err(err) => return err.into(), + Err(err) => { + let reply = msg.response(err); + self.queue.send(reply).unwrap(); + return; + } } } updates.insert(oid.clone(), holder); @@ -4489,7 +4736,8 @@ impl DocumentFile { } self.docs.insert(oid.clone(), new_rec.clone()); } - Records::with_data(self.docdef.get_field_names().clone(), updates).into() + let recs = Records::with_data(self.docdef.get_field_names().clone(), updates); + self.queue.send(msg.response(recs)).unwrap(); } }