diff --git a/src/message.rs b/src/message.rs index 10bb9d5..25d9563 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,4 +1,5 @@ use chrono::prelude::*; +use isolang::Language; use std::{ collections::{HashMap, HashSet}, sync::{ @@ -33,8 +34,10 @@ enum MTTError { enum Action { Addition, Create, + Delete, Error, Query, + Register, Reply, Show, Update, @@ -45,8 +48,10 @@ impl From for Action { match value { MsgAction::Addition(_) => Action::Addition, MsgAction::Create(_) => Action::Create, + MsgAction::Delete(_) => Action::Delete, MsgAction::Error(_) => Action::Error, MsgAction::Query(_) => Action::Query, + MsgAction::Register(_) => Action::Register, MsgAction::Reply(_) => Action::Reply, MsgAction::Show => Action::Show, MsgAction::Update(_) => Action::Update, @@ -65,6 +70,16 @@ impl From<&MsgAction> for Action { enum NameID { ID(Uuid), Name(String), + None, +} + +impl NameID { + fn is_none(&self) -> bool { + match self { + Self::None => true, + _ => false + } + } } impl From<&str> for NameID { @@ -99,9 +114,10 @@ enum MsgAction { // Remove Error(MTTError), Query(Query), + Register(Register), Reply(Reply), Show, - // Delete + Delete(Delete), Update(Update), } @@ -111,6 +127,12 @@ impl From for MsgAction { } } +impl From for MsgAction { + fn from(value: Delete) -> Self { + MsgAction::Delete(value) + } +} + impl From for MsgAction { fn from(value: DocDef) -> Self { MsgAction::Create(value) @@ -129,6 +151,12 @@ impl From for MsgAction { } } +impl From for MsgAction { + fn from(value: Register) -> Self { + MsgAction::Register(value) + } +} + impl From for MsgAction { fn from(value: Reply) -> Self { MsgAction::Reply(value) @@ -253,8 +281,8 @@ mod messages { for document in dts.into_iter() { let msg = Message::new(document, MsgAction::Create(DocDef::new())); match msg.get_document_id() { - NameID::ID(_) => unreachable!("should have been a string id"), NameID::Name(data) => assert_eq!(data, document), + _ => unreachable!("should have been a string id"), } match msg.get_action() { MsgAction::Create(_) => {} @@ -269,8 +297,8 @@ mod messages { for document in dts.into_iter() { let msg = Message::new(document.clone(), MsgAction::Query(Query::new())); match msg.get_document_id() { - NameID::ID(_) => unreachable!("should have been a string id"), NameID::Name(data) => assert_eq!(data, &document), + _ => unreachable!("should have been a string id"), } match msg.get_action() { MsgAction::Query(_) => {} @@ -285,7 +313,7 @@ mod messages { let msg = Message::new(document.clone(), MsgAction::Query(Query::new())); match msg.get_document_id() { NameID::ID(data) => assert_eq!(data, &document), - NameID::Name(_) => unreachable!("should have been an id"), + _ => unreachable!("should have been an id"), } match msg.get_action() { MsgAction::Query(_) => {} @@ -430,6 +458,64 @@ impl From for RouteID { } } +#[derive(Clone, Debug, Eq, PartialEq)] +struct Name { + name: String, + lang: Language, +} + +impl Name { + fn english(name: String) -> Self { + Self { + name: name, + lang: Language::from_639_1("en").unwrap(), + } + } +} + +impl ToString for Name { + fn to_string(&self) -> String { + self.name.clone() + } +} + +#[derive(Clone, Debug)] +enum RegMsg { + DocName(Name), + Error(MTTError), + Ok, +} + +#[derive(Clone, Debug)] +struct Register { + msg: RegMsg, + sender_id: Uuid, +} + +impl Register { + fn new(sender_id: Uuid, reg_msg: RegMsg) -> Self { + Self { + msg: reg_msg, + sender_id: sender_id, + } + } + + fn get_msg(&self) -> &RegMsg { + &self.msg + } + + fn get_sender_id(&self) -> &Uuid { + &self.sender_id + } + + fn response(&self, reg_msg: RegMsg) -> Self { + Self { + msg: reg_msg, + sender_id: self.sender_id.clone(), + } + } +} + #[derive(Clone, Debug, PartialEq)] struct Route { action: Include, @@ -606,6 +692,7 @@ impl QueueData { None => return Err(MTTError::DocumentNotFound(name.clone())), }, NameID::ID(id) => id.clone(), + NameID::None => unreachable!("should never be none"), }; if self.senders.contains_key(&sender_id) { Ok(sender_id) @@ -877,18 +964,179 @@ mod queuedatas { } } +struct DocRegistry { + doc_names: Vec, + queue: Queue, + receiver: Receiver, +} + +impl DocRegistry { + fn new(queue: Queue, rx: Receiver) -> Self { + Self { + doc_names: Vec::new(), + queue: queue, + receiver: rx, + } + } + + fn start(queue: Queue, rx: Receiver) { + let mut doc_names = DocRegistry::new(queue, rx); + spawn(move || { + doc_names.listen(); + }); + } + + fn listen(&mut self) { + loop { + let msg = self.receiver.recv().unwrap(); + match msg.get_action() { + MsgAction::Register(data) => { + let id = data.get_sender_id(); + let reply = msg.response(self.register_action(data)); + self.queue.forward(id, reply); + }, + _ => {}, + } + } + } + + fn register_action(&mut self, reg: &Register) -> Register { + match reg.get_msg() { + RegMsg::DocName(name) => { + if self.doc_names.contains(name) { + reg.response(RegMsg::Error(MTTError::DocumentAlreadyExists(name.to_string()))) + } else { + self.doc_names.push(name.clone()); + reg.response(RegMsg::Ok) + } + }, + _ => reg.response(RegMsg::Ok), + } + } +} + +struct Router { + doc_registry: Sender, + senders: HashMap>, +} + +impl Router { + fn new(tx: Sender) -> Self { + Self { + doc_registry: tx, + senders: HashMap::new(), + } + } + + fn add_sender(&mut self, sender: Sender) -> Uuid { + let mut id = Uuid::new_v4(); + while self.senders.contains_key(&id) { + id = Uuid::new_v4(); + } + self.senders.insert(id.clone(), sender); + id + } + + fn forward(&self, id: &Uuid, msg: Message) { + self.senders.get(id).unwrap().send(msg).unwrap(); + } + + fn send(&self, msg: Message) { + self.doc_registry.send(msg).unwrap(); + } +} + +#[cfg(test)] +mod routers { + use super::{support_test::TIMEOUT, *}; + + #[test] + fn can_pass_message() { + let (tx, rx) = channel(); + let router = Router::new(tx); + let msg = Message::new("task", Query::new()); + router.send(msg.clone()); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + } + + #[test] + fn can_forward_message() { + let (tx, _) = channel(); + let mut router = Router::new(tx); + let (sender, receiver) = channel(); + let id = router.add_sender(sender); + let msg = Message::new("wiki", Query::new()); + router.forward(&id, msg.clone()); + let result = receiver.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + } + + #[test] + fn sender_ids_are_unique() { + let (tx, _) = channel(); + let mut router = Router::new(tx); + let count = 10; + let mut holder: HashSet = HashSet::new(); + for _ in 0..count { + let (tx, _) = channel(); + holder.insert(router.add_sender(tx)); + } + assert_eq!(holder.len(), count, "had duplicate keys"); + } +} + #[derive(Clone)] struct Queue { + router: Arc>, + // + // + // queue_data: Arc>, } impl Queue { fn new() -> Self { - Self { + let (tx, rx) = channel(); + let output = Self { + router: Arc::new(RwLock::new(Router::new(tx))), + // + // + // queue_data: Arc::new(RwLock::new(QueueData::new())), + }; + DocRegistry::start(output.clone(), rx); + output + } + + fn add_sender(&mut self, sender: Sender) -> Uuid { + let mut router = self.router.write().unwrap(); + router.add_sender(sender) + } + + fn forward(&self, id: &Uuid, msg: Message) { + let router = self.router.read().unwrap(); + router.forward(id, msg); + } + + fn send(&self, msg: Message) -> Result<(), MTTError> { + let router = self.router.read().unwrap(); + router.send(msg.clone()); + // + // + // + if msg.get_document_id().is_none() { + Ok(()) + } else { + let queuedata = self.queue_data.read().unwrap(); + queuedata.send(msg) } } + // + // + // + fn register( &mut self, tx: Sender, @@ -898,20 +1146,141 @@ impl Queue { let mut queuedata = self.queue_data.write().unwrap(); queuedata.register(tx, name, routes) } - - fn send(&self, msg: Message) -> Result<(), MTTError> { - let queuedata = self.queue_data.read().unwrap(); - queuedata.send(msg) - } } #[cfg(test)] mod queues { - use super::*; + use super::{support_test::TIMEOUT, *}; + use std::sync::mpsc::RecvTimeoutError; + + struct TestQueue { + sender_id: Uuid, + queue: Queue, + receiver: Receiver, + } + + impl TestQueue { + fn new() -> Self { + let mut queue = Queue::new(); + let (tx, rx) = channel(); + let id = queue.add_sender(tx); + Self { + sender_id: id, + queue: queue, + receiver: rx, + } + } + + fn get_preset_id(&self) -> &Uuid { + &self.sender_id + } + + fn get_preset_rx(&self) -> &Receiver { + &self.receiver + } + + fn add_sender(&mut self, sender: Sender) -> Uuid { + self.queue.add_sender(sender) + } + + fn forward(&self, id: &Uuid, msg: Message) { + self.queue.forward(id, msg); + } + + fn send(&self, msg: Message) -> Result<(), MTTError> { + self.queue.send(msg) + } + } #[test] - fn create_a_queue() { - Queue::new(); + fn can_forward_message() { + let mut queue = TestQueue::new(); + let msg = Message::new("wiki", Query::new()); + queue.forward(queue.get_preset_id(), msg.clone()); + let result = queue.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + } + + #[test] + fn sender_ids_are_unique() { + let mut queue = Queue::new(); + let count = 10; + let mut holder: HashSet = HashSet::new(); + for _ in 0..count { + let (tx, _) = channel(); + holder.insert(queue.add_sender(tx)); + } + assert_eq!(holder.len(), count, "had duplicate keys"); + } + + #[test] + fn can_register_document_name() { + let mut queue = TestQueue::new(); + let doc_name = Name::english(Uuid::new_v4().to_string()); + let reg_msg = Register::new(queue.get_preset_id().clone(), RegMsg::DocName(doc_name.clone())); + let msg = Message::new(NameID::None, reg_msg); + queue.send(msg.clone()).unwrap(); + let result = queue.get_preset_rx().recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + let action = result.get_action(); + match action { + MsgAction::Register(data) => match data.get_msg() { + RegMsg::Ok => {}, + _ => unreachable!("got {:?}, should have been register ok", action), + }, + _ => unreachable!("got {:?}, should have been register ok", action), + } + } + + #[test] + fn errors_on_duplicate_names() { + let mut queue = TestQueue::new(); + //let mut queue = Queue::new(); + //let (sender, receiver) = channel(); + //let id = queue.add_sender(sender); + let receiver = queue.get_preset_rx(); + let doc_name = Name::english(Uuid::new_v4().to_string()); + let reg_msg = Register::new(queue.get_preset_id().clone(), RegMsg::DocName(doc_name.clone())); + let msg = Message::new(NameID::None, reg_msg.clone()); + queue.send(msg.clone()).unwrap(); + receiver.recv_timeout(TIMEOUT).unwrap(); + let msg2 = Message::new(NameID::None, reg_msg.clone()); + queue.send(msg.clone()).unwrap(); + let result = receiver.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + let action = result.get_action(); + match action { + MsgAction::Register(data) => match data.get_msg() { + RegMsg::Error(err) => match err { + MTTError::DocumentAlreadyExists(name) => assert_eq!(name.to_string(), doc_name.to_string()), + _ => unreachable!("got {:?}, should have been duplicate error", err), + }, + _ => unreachable!("got {:?}, should have been error", data), + }, + _ => unreachable!("got {:?}, should have been register ok", action), + } + } + + #[test] + fn can_register_routes() { + + } + + #[test] + #[ignore] + fn default_send_does_nothing() { + let mut queue = Queue::new(); + let (sender, receiver) = channel(); + let id = queue.add_sender(sender); + let msg = Message::new("wiki", Query::new()); + queue.send(msg).unwrap(); + match receiver.recv_timeout(TIMEOUT) { + Ok(msg) => unreachable!("should not receive: {:?}", msg), + Err(err) => match err { + RecvTimeoutError::Timeout => {} + _ => unreachable!("should have timed out"), + }, + } } } @@ -2058,16 +2427,12 @@ impl Operation { #[derive(Clone, Debug)] struct Query { data: HashMap, - - specifiers: Vec, } impl Query { fn new() -> Self { Self { data: HashMap::new(), - - specifiers: Vec::new(), } } @@ -2327,6 +2692,27 @@ mod documents { } } +#[derive(Clone, Debug)] +struct Delete { + query: Query, +} + +impl Delete { + fn new() -> Self { + Self { + query: Query::new(), + } + } + + fn get_query(&self) -> &Query { + &self.query + } + + fn get_query_mut(&mut self) -> &mut Query { + &mut self.query + } +} + #[derive(Clone, Debug)] struct Update { query: Query, @@ -2666,6 +3052,7 @@ impl DocumentFile { let name = match msg.get_document_id() { NameID::Name(name) => name.clone(), NameID::ID(id) => id.to_string(), + NameID::None => unreachable!("should never be none"), }; let routes = [ RouteRequest::new( @@ -2673,6 +3060,11 @@ impl DocumentFile { Include::Some(name.clone()), Include::Some(Action::Addition), ), + RouteRequest::new( + Include::All, + Include::Some(name.clone()), + Include::Some(Action::Delete), + ), RouteRequest::new( Include::All, Include::Some(name.clone()), @@ -2716,6 +3108,7 @@ impl DocumentFile { let msg = self.rx.recv().unwrap(); let result = match msg.get_action() { MsgAction::Addition(data) => self.add_document(data), + MsgAction::Delete(delete) => self.delete(delete), MsgAction::Query(query) => self.query(query), MsgAction::Update(update) => self.update(update), _ => Reply::new().into(), @@ -2793,6 +3186,16 @@ impl DocumentFile { reply.into() } + fn delete(&mut self, delete: &Delete) -> MsgAction { + let mut reply = Reply::new(); + let oids = self.run_query(delete.get_query()).unwrap(); + for oid in oids.iter() { + reply.add(self.docs.get(oid).unwrap().clone()); + self.docs.remove(oid); + } + reply.into() + } + fn run_query(&self, query: &Query) -> Result, MTTError> { let query_ids = query.field_ids(); let doc_ids = self.docdef.field_ids(); @@ -2964,6 +3367,10 @@ mod document_files { &self.rx } + fn get_sender(&self) -> Sender { + self.tx.clone() + } + fn send(&self, action: A) -> Result<(), MTTError> where A: Into, @@ -4018,6 +4425,69 @@ mod document_files { _ => unreachable!("got {:?}: should have gotten reply", action), } } + + #[test] + fn can_delete() { + let mut doc = TestDocument::new([FieldType::Integer].to_vec()); + doc.start(); + doc.populate([1.into()].to_vec()); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(1); + let mut query = Query::new(); + query.add("field0".to_string(), calc); + let mut delete = Delete::new(); + *delete.get_query_mut() = query.clone(); + doc.send(delete).unwrap(); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Reply(data) => { + assert_eq!(data.len(), 1); + for doc in data.iter() { + match doc.get_field("field0").unwrap() { + Field::Integer(num) => assert_eq!(num, 1), + _ => unreachable!("did not get uuid"), + } + } + } + _ => unreachable!("got {:?}: should have gotten reply", action), + } + doc.send(query).unwrap(); + let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap(); + let action = result.get_action(); + match action { + MsgAction::Reply(data) => assert_eq!(data.len(), 0), + _ => unreachable!("did not get uuid"), + } + } + + #[test] + #[ignore] + fn delete_should_only_respond_to_its_own() { + let mut doc = TestDocument::new([FieldType::Integer].to_vec()); + doc.start(); + doc.populate([1.into()].to_vec()); + let mut calc = Calculation::new(Operand::Equal); + calc.add_value(1); + let mut query = Query::new(); + query.add("field0".to_string(), calc); + let mut delete = Delete::new(); + *delete.get_query_mut() = query.clone(); + doc.send(delete).unwrap(); + let name = "other"; + let msg = Message::new(name.to_string(), MsgAction::Show); + let (tx, _) = channel(); + let mut queue = doc.get_queue(); + queue.register(tx, name.to_string(), Vec::new()).unwrap(); + queue.send(msg).unwrap(); + match doc.get_receiver().recv_timeout(TIMEOUT) { + Ok(msg) => unreachable!("should not receive: {:?}", msg), + Err(err) => match err { + RecvTimeoutError::Timeout => {} + _ => unreachable!("should have timed out"), + }, + } + } } #[cfg(test)]