From 9de7e75740547f949e956d8f730e60bb973fc14d Mon Sep 17 00:00:00 2001 From: Jeff Baskin Date: Fri, 4 Jul 2025 10:25:37 -0400 Subject: [PATCH] Setting up documents to be creayed at run time. --- src/doctype.rs | 45 ++++++++ src/lib.rs | 2 + src/message.rs | 286 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 333 insertions(+) create mode 100644 src/doctype.rs create mode 100644 src/message.rs diff --git a/src/doctype.rs b/src/doctype.rs new file mode 100644 index 0000000..d0ed682 --- /dev/null +++ b/src/doctype.rs @@ -0,0 +1,45 @@ +use crate::queue::{Message, MsgType, Queue}; +use std::{sync::mpsc::{channel, Receiver}, thread::spawn}; + +const RESPONDS_TO: [MsgType; 0] = []; + +/// Definition of the document type. +struct DocType { + queue: Queue, + rx: Receiver, +} + +impl DocType { + fn new(queue: Queue, rx: Receiver) -> Self { + Self { + queue: queue, + rx: rx, + } + } + + fn start(queue: Queue) { + let (tx, rx) = channel(); + let mut doctype = DocType::new(queue, rx); + doctype.queue.add(tx, RESPONDS_TO.to_vec()); + spawn(move || { + doctype.listen(); + }); + } + + fn listen(&self) { + loop { + self.rx.recv().unwrap(); + } + } +} + +#[cfg(test)] +mod doctypes { + use super::*; + + #[test] + fn create_empty() { + let queue = Queue::new(); + DocType::start(queue.clone()); + } +} diff --git a/src/lib.rs b/src/lib.rs index 052c610..2eff26f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,9 @@ mod client; mod clock; +mod doctype; mod document; mod field; +mod message; mod queue; mod session; diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..6719d82 --- /dev/null +++ b/src/message.rs @@ -0,0 +1,286 @@ +use std::{collections::HashMap, sync::{Arc, RwLock, mpsc::{Sender, Receiver, channel}}}; +use uuid::Uuid; + +#[derive(Clone, Debug)] +enum MTTError { + DocumentAlreadyExists(String), + DocumentNotFound(String), +} + +#[derive(Clone, Debug, PartialEq)] +enum Action { + New, + Query, + Reply, + Update, +} + +#[derive(Clone)] +enum DocumentID { + ID(Uuid), + Name(String), +} + +impl From<&str> for DocumentID { + fn from(value: &str) -> Self { + Self::Name(value.to_string()) + } +} + +impl From for DocumentID { + fn from(value: String) -> Self { + Self::Name(value) + } +} + +impl From for DocumentID { + fn from(value: Uuid) -> Self { + Self::ID(value) + } +} + +#[derive(Clone)] +struct Message { + msg_id: Uuid, + document_id: DocumentID, + action: Action, + //instructions: ?, +} + +impl Message { + fn new(doc_id: D, action: Action) -> Self where D: Into { + Self { + msg_id: Uuid::new_v4(), + document_id: doc_id.into(), + action: action + } + } + + fn get_message_id(&self) -> &Uuid { + &self.msg_id + } + + fn get_document_id(&self) -> &DocumentID { + &self.document_id + } + + fn reply(&self) -> Self { + Self { + msg_id: self.msg_id.clone(), + document_id: DocumentID::Name("fred".to_string()), + action: Action::Update, + } + } +} + +#[cfg(test)] +mod messages { + use super::*; + + #[test] + fn can_the_document_be_a_stringi_reference() { + let dts = ["one", "two"]; + for document in dts.into_iter() { + let msg = Message::new(document, Action::New); + match msg.get_document_id() { + DocumentID::ID(_) => unreachable!("should have been a string id"), + DocumentID::Name(data) => assert_eq!(data, document), + } + assert_eq!(msg.action, Action::New); + } + } + + #[test] + fn can_the_document_be_a_string() { + let dts = ["one".to_string(), "two".to_string()]; + for document in dts.into_iter() { + let msg = Message::new(document.clone(), Action::Update); + match msg.get_document_id() { + DocumentID::ID(_) => unreachable!("should have been a string id"), + DocumentID::Name(data) => assert_eq!(data, &document), + } + assert_eq!(msg.action, Action::Update); + } + } + + #[test] + fn can_the_document_be_an_id() { + let document = Uuid::new_v4(); + let msg = Message::new(document.clone(), Action::Query); + match msg.get_document_id() { + DocumentID::ID(data) => assert_eq!(data, &document), + DocumentID::Name(_) => unreachable!("should have been an id"), + } + assert_eq!(msg.action, Action::Query); + } + + #[test] + fn is_the_message_id_random() { + let mut ids: Vec = Vec::new(); + for _ in 0..5 { + let msg = Message::new("tester", Action::New); + let id = msg.get_message_id().clone(); + assert!(!ids.contains(&id), "{:?} containts {}", ids, id); + ids.push(id); + } + } + + #[test] + fn does_the_message_reply_have_the_same_message_id() { + let msg = Message::new("tester", Action::New); + let reply = msg.reply(); + assert_eq!(reply.get_message_id(), msg.get_message_id()); + } +} + +struct QueueData { + senders: HashMap>, + names: HashMap, +} + +impl QueueData { + fn new() -> Self { + Self { + senders: HashMap::new(), + names: HashMap::new(), + } + } + + fn register(&mut self, name: String, tx: Sender) -> Result { + match self.names.get(&name) { + Some(_) => return Err(MTTError::DocumentAlreadyExists(name)), + None => {}, + } + let id = Uuid::new_v4(); + self.senders.insert(id.clone(), tx); + self.names.insert(name, id.clone()); + Ok(id) + } + + fn send(&self, msg: Message) -> Result<(), MTTError> { + let tx = match msg.get_document_id() { + DocumentID::ID(id) => self.senders.get(id).unwrap(), + DocumentID::Name(name) => { + let docid = match self.names.get(name) { + Some(id) => id, + None => { + return Err(MTTError::DocumentNotFound(name.to_string())); + &Uuid::new_v4() + } + }; + self.senders.get(docid).unwrap() + }, + }; + tx.send(msg).unwrap(); + Ok(()) + } +} + +#[cfg(test)] +mod queuedatas { + use super::*; + use std::time::Duration; + + static TIMEOUT: Duration = Duration::from_millis(500); + + #[test] + fn can_a_new_document_type_be_rgistered() { + let name = Uuid::new_v4().to_string(); + let (tx, rx) = channel(); + let mut queuedata = QueueData::new(); + let id = queuedata.register(name.clone(), tx).unwrap(); + let msg = Message::new(name.clone(), Action::Query); + queuedata.send(msg.clone()).unwrap(); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + let msg = Message::new(id.clone(), Action::Query); + queuedata.send(msg.clone()).unwrap(); + let result = rx.recv_timeout(TIMEOUT).unwrap(); + assert_eq!(result.get_message_id(), msg.get_message_id()); + } + + #[test] + fn does_a_bad_document_name_fail() { + let docname = Uuid::new_v4().to_string(); + let queuedata = QueueData::new(); + let msg = Message::new(docname.clone(), Action::Query); + match queuedata.send(msg) { + Ok(_) => unreachable!("should have been an error"), + Err(data) => match data { + MTTError::DocumentNotFound(doc) => assert_eq!(doc, docname), + _ => unreachable!("should have been a not found error"), + }, + } + } + + #[test] + fn should_error_on_duplicate_name_registration() { + let name = Uuid::new_v4().to_string(); + let (tx1, _) = channel(); + let (tx2, _) = channel(); + let mut queuedata = QueueData::new(); + queuedata.register(name.clone(), tx1).unwrap(); + match queuedata.register(name.clone(), tx2) { + Ok(_) => unreachable!("should have been an weeoe"), + Err(data) => match data { + MTTError::DocumentAlreadyExists(output) => assert_eq!(output, name), + _ => unreachable!("should have been an already exists errorr"), + }, + } + } +} + +#[derive(Clone)] +struct Queue { + queue_data: Arc>, +} + +impl Queue { + fn new() -> Self { + Self { + queue_data: Arc::new(RwLock::new(QueueData::new())), + } + } +} + +#[cfg(test)] +mod queues { + use super::*; + + #[test] + fn create_a_queue() { + Queue::new(); + } +} + +struct Document; + +impl Document { + fn new() -> Self { + Self {} + } + + fn start(queue: Queue) { + } + + fn listen(&self) { + } +} + +#[cfg(test)] +mod documents { + use super::*; + + #[test] + fn create_document_creation() { + let queue = Queue::new(); + Document::start(queue); + } +} + +// Create a double hash map. posswible names that leads to an id that is int eh ids +// \and the second is the id and the sender to be used.and a third for who wants to +// listen to what. +// +// The queue has a read write lock on the abbove strucutee. A clone of this is given to +// every process.