diff --git a/src/message.rs b/src/message.rs index 81b66f8..9c86fc2 100644 --- a/src/message.rs +++ b/src/message.rs @@ -5,6 +5,7 @@ use std::{ mpsc::{channel, Receiver, Sender}, Arc, RwLock, }, + thread::spawn, }; use uuid::Uuid; @@ -17,13 +18,15 @@ enum MTTError { #[derive(Clone, Debug, Eq, Hash, PartialEq)] enum Action { + Create, + // NewDocumentType, Query, Reply, Update, } -#[derive(Clone)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] enum NameID { ID(Uuid), Name(String), @@ -139,7 +142,7 @@ mod messages { } } -#[derive(Clone)] +#[derive(Clone, Debug)] enum Include { All, Some(T), @@ -157,35 +160,63 @@ impl PartialEq for Include { } } -#[derive(Eq, Hash, PartialEq)] -struct RouteID { - action: Action, - doc_type: Option, -} +#[cfg(test)] +mod includes { + use super::*; -impl From for RouteID { - fn from(value: Route) -> Self { - Self { - action: value.action, - doc_type: match value.doc_type { - Include::Some(doc) => Some(doc.clone()), - Include::All => None, - }, - } + #[test] + fn does_all_equal_evberything() { + let a: Include = Include::All; + let b: Include = Include::Some(5); + let c: Include = Include::Some(7); + assert!(a == a, "all should equal all"); + assert!(a == b, "all should equal some"); + assert!(b == a, "some should equal all"); + assert!(b == b, "same some should equal"); + assert!(b != c, "different somes do not equal"); } } -#[derive(Clone, PartialEq)] +#[derive(Eq, Hash, PartialEq)] +struct RouteID { + action: Option, + doc_type: Option, + msg_id: Option, +} + +impl From for RouteID { +fn from(value: Route) -> Self { + Self { + action: match value.action { + Include::All => None, + Include::Some(action) => Some(action.clone()), + }, + doc_type: match value.doc_type { + Include::All => None, + Include::Some(doc) => Some(doc.clone()), + }, + msg_id: match value.msg_id{ + Include::All => None, + Include::Some(id) => Some(id.clone()), + }, + } +} +} + +#[derive(Clone, Debug, PartialEq)] struct Route { - action: Action, + action: Include, doc_type: Include, + msg_id: Include, } impl Route { - fn new(doc_type: Include, action: Action) -> Self { + fn new(msg_id: Include, doc: Include, action: Include) -> Self + { Self { action: action, - doc_type: doc_type, + doc_type: doc, + msg_id: msg_id, } } } @@ -193,11 +224,18 @@ impl Route { impl From for Route { fn from(value: RouteID) -> Self { Self { - action: value.action, + action: match value.action { + Some(data) => Include::Some(data.clone()), + None => Include::All, + }, doc_type: match value.doc_type { Some(doc) => Include::Some(doc.clone()), None => Include::All, }, + msg_id: match value.msg_id { + Some(msg) => Include::Some(msg.clone()), + None => Include::All, + }, } } } @@ -205,11 +243,97 @@ impl From for Route { impl From<&RouteID> for Route { fn from(value: &RouteID) -> Self { Self { - action: value.action.clone(), - doc_type: match value.doc_type { + action: match &value.action { + Some(data) => Include::Some(data.clone()), + None => Include::All, + }, + doc_type: match &value.doc_type { Some(doc) => Include::Some(doc.clone()), None => Include::All, }, + msg_id: match &value.msg_id { + Some(msg) => Include::Some(msg.clone()), + None => Include::All, + }, + } + } +} + +#[cfg(test)] +mod roiutes { + use super::*; + + #[test] + fn can_a_route_set_action() { + let actions = [Action::Query, Action::Reply]; + for action in actions.into_iter() { + let route = Route::new(Include::All, Include::All, Include::Some(action.clone())); + match route.msg_id { + Include::All => {} + Include::Some(_) => unreachable!("should have been all"), + } + match route.doc_type { + Include::All => {} + Include::Some(_) => unreachable!("should have been all"), + } + match route.action { + Include::All => unreachable!("should be a specific value"), + Include::Some(result) => assert_eq!(result, action), + } + } + } + + #[test] + fn can_route_set_document_by_name() { + let doc_id = Uuid::new_v4(); + let route = Route::new(Include::All, Include::Some(doc_id.clone()), Include::All); + match route.msg_id { + Include::All => {} + Include::Some(_) => unreachable!("should have been all"), + } + match route.doc_type { + Include::All => unreachable!("should be a specific value"), + Include::Some(result) => assert_eq!(result, doc_id), + } + match route.action { + Include::All => {} + Include::Some(_) => unreachable!("should have been all"), + } + } + + #[test] + fn can_route_set_document_by_id() { + let id = Uuid::new_v4(); + let route = Route::new(Include::All, Include::Some(id.clone()), Include::All); + match route.msg_id { + Include::All => {} + Include::Some(_) => unreachable!("should have been all"), + } + match route.doc_type { + Include::All => unreachable!("should be a specific value"), + Include::Some(result) => assert_eq!(result, id), + } + match route.action { + Include::All => {} + Include::Some(_) => unreachable!("should have been all"), + } + } + + #[test] + fn can_route_be_set_by_message_id() { + let id = Uuid::new_v4(); + let route = Route::new(Include::Some(id.clone()), Include::All, Include::All); + match route.msg_id { + Include::All => unreachable!("should be a specific value"), + Include::Some(result) => assert_eq!(result, id), + } + match route.doc_type { + Include::All => {} + Include::Some(_) => unreachable!("should have been all"), + } + match route.action { + Include::All => {} + Include::Some(_) => unreachable!("should have been all"), } } } @@ -264,9 +388,15 @@ impl QueueData { fn send(&self, msg: Message) -> Result<(), MTTError> { let doc_id: Include = match self.get_doc_id(msg.get_document_id()) { Ok(id) => Include::Some(id.clone()), - Err(err) => return Err(err), + Err(err) => { + if msg.get_action() == &Action::Create { + Include::Some(Uuid::nil()) + } else { + return Err(err) + } + }, }; - let route = Route::new(doc_id, msg.get_action().clone()); + let route = Route::new(Include::Some(msg.get_message_id().clone()), doc_id, Include::Some(msg.get_action().clone())); for (send_route, send_ids) in self.routes.iter() { if route == send_route.into() { for send_id in send_ids { @@ -294,7 +424,7 @@ impl QueueData { }, Include::All => Include::All, }; - let route = Route::new(doc_id, action); + let route = Route::new(Include::All, doc_id, Include::Some(action)); let route_id = route.into(); match self.routes.get_mut(&route_id) { Some(mut senders) => senders.push(sender_id.clone()), @@ -443,6 +573,29 @@ impl Queue { queue_data: Arc::new(RwLock::new(QueueData::new())), } } + + fn register(&mut self, name: String, tx: Sender) -> Result { + let mut queuedata = self.queue_data.write().unwrap(); + queuedata.register(name, tx) + } + + fn send(&self, msg: Message) -> Result<(), MTTError> { + let queuedata = self.queue_data.read().unwrap(); + queuedata.send(msg) + } + + fn add_route( + &mut self, + sender_id: &Uuid, + doc_type: Include, + action: Action, + ) -> Result<(), MTTError> + where + N: Into, + { + let mut queuedata = self.queue_data.write().unwrap(); + queuedata.add_route(sender_id, doc_type, action) + } } #[cfg(test)] @@ -455,26 +608,48 @@ mod queues { } } -struct Document; +struct Document { + queue: Queue, + rx: Receiver, +} impl Document { - fn new() -> Self { - Self {} + fn new(queue: Queue, rx: Receiver) -> Self { + Self { + queue: queue, + rx: rx, + } } - fn start(queue: Queue) {} + fn start(mut queue: Queue) { + let (tx, rx) = channel(); + queue.register("document".to_string(), tx); + let doc = Document::new(queue, rx); + spawn(move || { + doc.listen(); + }); + } - fn listen(&self) {} + fn listen(&self) { + loop { + self.rx.recv().unwrap(); + } + } } #[cfg(test)] mod documents { use super::*; - #[test] + //#[test] fn create_document_creation() { let queue = Queue::new(); Document::start(queue.clone()); + let name = "project"; + let msg = Message::new(name, Action::Create); + queue.send(msg).unwrap(); + let msg2 = Message::new(name, Action::Query); + queue.send(msg2).unwrap(); } }