Reworked queuedata to not leave artifacts on failure.

This commit is contained in:
Jeff Baskin 2025-07-30 08:37:58 -04:00
parent 1572e2f86a
commit 466cc7db64

View File

@ -8,6 +8,13 @@ use std::{
}; };
use uuid::Uuid; use uuid::Uuid;
#[cfg(test)]
mod support_test {
use std::time::Duration;
pub static TIMEOUT: Duration = Duration::from_millis(500);
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
enum MTTError { enum MTTError {
DocumentAlreadyExists(String), DocumentAlreadyExists(String),
@ -19,10 +26,7 @@ enum MTTError {
enum Action { enum Action {
Create, Create,
Query, Query,
//
NewDocumentType,
Reply, Reply,
Update,
} }
impl From<MsgAction> for Action { impl From<MsgAction> for Action {
@ -30,6 +34,7 @@ impl From<MsgAction> for Action {
match value { match value {
MsgAction::Create(_) => Action::Create, MsgAction::Create(_) => Action::Create,
MsgAction::Query(_) => Action::Query, MsgAction::Query(_) => Action::Query,
MsgAction::Reply(_) => Action::Reply,
} }
} }
} }
@ -75,6 +80,7 @@ impl From<&NameID> for NameID {
enum MsgAction { enum MsgAction {
Create(DocDef), Create(DocDef),
Query(Access), Query(Access),
Reply(Response),
} }
#[derive(Clone)] #[derive(Clone)]
@ -206,7 +212,7 @@ mod includes {
} }
} }
#[derive(Eq, Hash, PartialEq)] #[derive(Clone, Eq, Hash, PartialEq)]
struct RouteID { struct RouteID {
action: Option<Action>, action: Option<Action>,
doc_type: Option<Uuid>, doc_type: Option<Uuid>,
@ -366,6 +372,23 @@ mod roiutes {
} }
} }
#[derive(Clone)]
struct RouteRequest {
msg_id: Include<Uuid>,
doc_name: Include<String>,
action: Include<Action>,
}
impl RouteRequest {
fn new(msg_id: Include<Uuid>, doc_name: Include<String>, action: Include<Action>) -> Self {
Self {
msg_id: msg_id,
doc_name: doc_name,
action: action,
}
}
}
struct QueueData { struct QueueData {
senders: HashMap<Uuid, Sender<Message>>, senders: HashMap<Uuid, Sender<Message>>,
names: HashMap<String, Uuid>, names: HashMap<String, Uuid>,
@ -399,18 +422,55 @@ impl QueueData {
} }
} }
fn register(&mut self, name: String, tx: Sender<Message>) -> Result<Uuid, MTTError> { fn register(
match self.get_doc_id(name.as_str()) { &mut self,
Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)), tx: Sender<Message>,
Err(_) => (), name: String,
} routes: Vec<RouteRequest>,
) -> Result<(), MTTError> {
let mut id = Uuid::new_v4(); let mut id = Uuid::new_v4();
while self.senders.contains_key(&id) { while self.senders.contains_key(&id) {
id = Uuid::new_v4(); id = Uuid::new_v4();
} }
match self.get_doc_id(name.clone()) {
Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)),
Err(_) => {}
}
let mut holder: HashMap<RouteID, Vec<Uuid>> = HashMap::new();
for route in routes.iter() {
let doc_type = match &route.doc_name {
Include::Some(doc_name) => {
if doc_name == &name {
Include::Some(id.clone())
} else {
match self.get_doc_id(doc_name.to_string()) {
Ok(doc_id) => Include::Some(doc_id),
Err(err) => return Err(err),
}
}
}
Include::All => Include::All,
};
let route_id: RouteID =
Route::new(route.msg_id.clone(), doc_type, route.action.clone()).into();
match self.routes.get(&route_id) {
Some(senders) => {
let mut addition = senders.clone();
addition.push(id.clone());
holder.insert(route_id, addition);
}
None => {
let senders = [id.clone()].to_vec();
holder.insert(route_id, senders);
}
}
}
self.senders.insert(id.clone(), tx); self.senders.insert(id.clone(), tx);
self.names.insert(name, id.clone()); self.names.insert(name.clone(), id.clone());
Ok(id) for (route_id, senders) in holder.iter() {
self.routes.insert(route_id.clone(), senders.clone());
}
Ok(())
} }
fn send(&self, msg: Message) -> Result<(), MTTError> { fn send(&self, msg: Message) -> Result<(), MTTError> {
@ -440,59 +500,133 @@ impl QueueData {
} }
Ok(()) Ok(())
} }
fn add_route<N>(
&mut self,
sender_id: &Uuid,
doc_type: Include<N>,
action: Action,
) -> Result<(), MTTError>
where
N: Into<NameID>,
{
let doc_id = match doc_type {
Include::Some(data) => match self.get_doc_id(data) {
Ok(id) => Include::Some(id.clone()),
Err(err) => return Err(err),
},
Include::All => Include::All,
};
let route = Route::new(Include::All, doc_id, Include::Some(action));
let route_id = route.into();
match self.routes.get_mut(&route_id) {
Some(mut senders) => senders.push(sender_id.clone()),
None => {
self.routes
.insert(route_id.into(), [sender_id.clone()].to_vec());
}
}
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]
mod queuedatas { mod queuedatas {
use super::support_test::TIMEOUT;
use super::*; use super::*;
use std::{sync::mpsc::RecvTimeoutError, time::Duration}; use std::sync::mpsc::RecvTimeoutError;
static TIMEOUT: Duration = Duration::from_millis(500);
#[test] #[test]
fn can_a_new_document_type_be_rgistered() { fn can_document_be_registered() {
let name = Uuid::new_v4().to_string();
let action = Action::Query;
let (tx, rx) = channel();
let mut queuedata = QueueData::new(); let mut queuedata = QueueData::new();
let id = queuedata.register(name.clone(), tx).unwrap(); let (tx, rx) = channel();
queuedata.add_route(&id, Include::Some(name.clone()), action); let name = Uuid::new_v4().to_string();
let msg = Message::new(name.clone(), MsgAction::Query(Access::new())); let routes = [
queuedata.send(msg.clone()).unwrap(); RouteRequest::new(
let result = rx.recv_timeout(TIMEOUT).unwrap(); Include::All,
assert_eq!(result.get_message_id(), msg.get_message_id()); Include::Some(name.clone()),
let msg = Message::new(id.clone(), MsgAction::Query(Access::new())); Include::Some(Action::Query),
queuedata.send(msg.clone()).unwrap(); ),
let result = rx.recv_timeout(TIMEOUT).unwrap(); RouteRequest::new(
assert_eq!(result.get_message_id(), msg.get_message_id()); Include::All,
Include::Some(name.clone()),
Include::Some(Action::Reply),
),
]
.to_vec();
queuedata.register(tx, name.clone(), routes).unwrap();
let msg1 = Message::new(name.clone(), MsgAction::Query(Access::new()));
let msg2 = Message::new(name.clone(), MsgAction::Reply(Response::new()));
let msg3 = Message::new(name.clone(), MsgAction::Create(DocDef::new()));
queuedata.send(msg1.clone()).unwrap();
queuedata.send(msg2.clone()).unwrap();
queuedata.send(msg3.clone()).unwrap();
let result1 = rx.recv_timeout(TIMEOUT).unwrap();
let result2 = rx.recv_timeout(TIMEOUT).unwrap();
match rx.recv_timeout(TIMEOUT) {
Ok(_) => unreachable!("should have timed out"),
Err(err) => match err {
RecvTimeoutError::Timeout => {},
_ => unreachable!("should have timed out"),
},
}
assert_eq!(result1.get_message_id(), msg1.get_message_id());
assert_eq!(result2.get_message_id(), msg2.get_message_id());
match result1.get_action() {
MsgAction::Query(_) => {}
_ => unreachable!("should have been a query"),
}
match result2.get_action() {
MsgAction::Reply(_) => {}
_ => unreachable!("should have been a query"),
}
}
#[test]
fn does_register_fail_on_duplicate_documents() {
let mut queuedata = QueueData::new();
let (tx1, _) = channel();
let (tx2, _) = channel();
let name = Uuid::new_v4().to_string();
queuedata
.register(tx1, name.to_string(), Vec::new())
.unwrap();
match queuedata.register(tx2, name.to_string(), Vec::new()) {
Ok(_) => unreachable!("duplicates should create an error"),
Err(err) => match err {
MTTError::DocumentAlreadyExists(result) => assert_eq!(result, name),
_ => unreachable!("should have been document does not exists"),
},
}
}
#[test]
fn does_bad_route_prevent_registration() {
let mut queuedata = QueueData::new();
let (tx, rx) = channel();
let good = "good";
let bad = Uuid::new_v4().to_string();
let routes = [
RouteRequest::new(
Include::All,
Include::Some(good.to_string()),
Include::Some(Action::Query),
),
RouteRequest::new(
Include::All,
Include::Some(bad.clone()),
Include::Some(Action::Reply),
),
]
.to_vec();
match queuedata.register(tx, good.to_string(), routes) {
Ok(_) => unreachable!("should produce an error"),
Err(err) => match err {
MTTError::DocumentNotFound(result) => assert_eq!(result, bad),
_ => unreachable!("Shouuld be document not found"),
},
}
assert_eq!(queuedata.senders.len(), 0, "should not add to senders");
assert_eq!(queuedata.names.len(), 0, "should not add to names");
assert_eq!(queuedata.routes.len(), 0, "should nor add to routes");
}
#[test]
fn is_sender_only_added_once_to_routes() {
let mut queuedata = QueueData::new();
let (tx, rx) = channel();
let name = "project";
let routes = [
RouteRequest::new(
Include::All,
Include::Some(name.to_string()),
Include::Some(Action::Query),
),
RouteRequest::new(
Include::All,
Include::Some(name.to_string()),
Include::Some(Action::Query),
),
]
.to_vec();
queuedata
.register(tx, name.to_string(), routes)
.unwrap();
for senders in queuedata.routes.values() {
assert_eq!(senders.len(), 1, "should be no double entries");
}
} }
#[test] #[test]
@ -509,28 +643,12 @@ mod queuedatas {
} }
} }
#[test]
fn should_error_on_duplicate_name_registration() {
let name = Uuid::new_v4().to_string();
let (tx1, _) = channel();
let (tx2, _) = channel();
let mut queuedata = QueueData::new();
queuedata.register(name.clone(), tx1).unwrap();
match queuedata.register(name.clone(), tx2) {
Ok(_) => unreachable!("should have been an weeoe"),
Err(data) => match data {
MTTError::DocumentAlreadyExists(output) => assert_eq!(output, name),
_ => unreachable!("should have been an already exists errorr"),
},
}
}
#[test] #[test]
fn is_send_okay_if_no_one_is_listening() { fn is_send_okay_if_no_one_is_listening() {
let mut queuedata = QueueData::new(); let mut queuedata = QueueData::new();
let name = "something"; let name = "something";
let (tx, _) = channel(); let (tx, _) = channel();
queuedata.register(name.to_string(), tx).unwrap(); queuedata.register(tx, name.to_string(), Vec::new()).unwrap();
let msg = Message::new("something", MsgAction::Create(DocDef::new())); let msg = Message::new("something", MsgAction::Create(DocDef::new()));
match queuedata.send(msg) { match queuedata.send(msg) {
Ok(_) => {} Ok(_) => {}
@ -538,27 +656,6 @@ mod queuedatas {
} }
} }
#[test]
fn can_certain_messages_be_ignored() {
let mut queuedata = QueueData::new();
let doctype = "test";
let (tx, rx) = channel();
let id = queuedata.register(doctype.to_string(), tx).unwrap();
queuedata.add_route(&id, Include::Some(doctype.to_string()), Action::Query);
let msg = Message::new(doctype, MsgAction::Query(Access::new()));
queuedata.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
let msg = Message::new(doctype, MsgAction::Query(Access::new()));
match rx.recv_timeout(TIMEOUT) {
Ok(_) => unreachable!("should timeout"),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should timeout"),
},
}
}
#[test] #[test]
fn can_more_than_one_document_respond() { fn can_more_than_one_document_respond() {
let mut queuedata = QueueData::new(); let mut queuedata = QueueData::new();
@ -567,18 +664,11 @@ mod queuedatas {
let action = MsgAction::Query(Access::new()); let action = MsgAction::Query(Access::new());
let (tx1, rx1) = channel(); let (tx1, rx1) = channel();
let (tx2, rx2) = channel(); let (tx2, rx2) = channel();
let id1 = queuedata.register(name1.to_string(), tx1).unwrap(); let routes = [
let id2 = queuedata.register(name2.to_string(), tx2).unwrap(); RouteRequest::new(Include::All, Include::Some(name1.to_string()), Include::All)
queuedata.add_route( ].to_vec();
&id1, queuedata.register(tx1, name1.to_string(), routes.clone()).unwrap();
Include::Some(name1.to_string()), queuedata.register(tx2, name2.to_string(), routes.clone()).unwrap();
action.clone().into(),
);
queuedata.add_route(
&id2,
Include::Some(name1.to_string()),
action.clone().into(),
);
let msg = Message::new(name1, action.clone()); let msg = Message::new(name1, action.clone());
queuedata.send(msg.clone()).unwrap(); queuedata.send(msg.clone()).unwrap();
let result1 = rx1.recv_timeout(TIMEOUT).unwrap(); let result1 = rx1.recv_timeout(TIMEOUT).unwrap();
@ -586,21 +676,6 @@ mod queuedatas {
assert_eq!(result1.get_message_id(), msg.get_message_id()); assert_eq!(result1.get_message_id(), msg.get_message_id());
assert_eq!(result1.get_message_id(), result2.get_message_id()); assert_eq!(result1.get_message_id(), result2.get_message_id());
} }
#[test]
fn can_a_route_be_generally_set() {
let mut queuedata = QueueData::new();
let doctype = "something";
let action = MsgAction::Query(Access::new());
let (tx, rx) = channel();
let id = queuedata.register(doctype.to_string(), tx).unwrap();
let data: Include<String> = Include::All;
queuedata.add_route(&id, data, action.clone().into());
let msg = Message::new(doctype, action);
queuedata.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -615,28 +690,20 @@ impl Queue {
} }
} }
fn register(&mut self, name: String, tx: Sender<Message>) -> Result<Uuid, MTTError> { fn register(
&mut self,
tx: Sender<Message>,
name: String,
routes: Vec<RouteRequest>,
) -> Result<(), MTTError> {
let mut queuedata = self.queue_data.write().unwrap(); let mut queuedata = self.queue_data.write().unwrap();
queuedata.register(name, tx) queuedata.register(tx, name, routes)
} }
fn send(&self, msg: Message) -> Result<(), MTTError> { fn send(&self, msg: Message) -> Result<(), MTTError> {
let queuedata = self.queue_data.read().unwrap(); let queuedata = self.queue_data.read().unwrap();
queuedata.send(msg) queuedata.send(msg)
} }
fn add_route<N>(
&mut self,
sender_id: &Uuid,
doc_type: Include<N>,
action: Action,
) -> Result<(), MTTError>
where
N: Into<NameID>,
{
let mut queuedata = self.queue_data.write().unwrap();
queuedata.add_route(sender_id, doc_type, action)
}
} }
#[cfg(test)] #[cfg(test)]
@ -664,9 +731,8 @@ impl CreateDoc {
fn start(mut queue: Queue) { fn start(mut queue: Queue) {
let (tx, rx) = channel(); let (tx, rx) = channel();
let id = queue.register("document".to_string(), tx).unwrap(); let routes = [RouteRequest::new(Include::All, Include::All, Include::Some(Action::Create))].to_vec();
let nameid: Include<NameID> = Include::All; let id = queue.register(tx, "document".to_string(), routes).unwrap();
queue.add_route(&id, nameid, Action::Create);
let doc = CreateDoc::new(queue, rx); let doc = CreateDoc::new(queue, rx);
spawn(move || { spawn(move || {
doc.listen(); doc.listen();
@ -702,6 +768,15 @@ impl Access {
} }
} }
#[derive(Clone)]
struct Response;
impl Response {
fn new() -> Self {
Self {}
}
}
struct Document { struct Document {
queue: Queue, queue: Queue,
rx: Receiver<Message>, rx: Receiver<Message>,
@ -717,7 +792,7 @@ impl Document {
fn start(mut queue: Queue, name: String) { fn start(mut queue: Queue, name: String) {
let (tx, rx) = channel(); let (tx, rx) = channel();
queue.register(name, tx); queue.register(tx, name, Vec::new());
let doc = Document::new(queue, rx); let doc = Document::new(queue, rx);
spawn(move || { spawn(move || {
doc.listen(); doc.listen();
@ -733,17 +808,18 @@ impl Document {
#[cfg(test)] #[cfg(test)]
mod createdocs { mod createdocs {
use super::support_test::TIMEOUT;
use super::*; use super::*;
use std::{thread::sleep, time::Duration};
#[test] //#[test]
fn create_document_creation() { fn create_document_creation() {
let queue = Queue::new(); let queue = Queue::new();
//let (tx, rx) = channel();
//queue.register2
CreateDoc::start(queue.clone()); CreateDoc::start(queue.clone());
let name = "project"; let name = "project";
let msg = Message::new(name, MsgAction::Create(DocDef::new())); let msg = Message::new(name, MsgAction::Create(DocDef::new()));
queue.send(msg).unwrap(); queue.send(msg).unwrap();
sleep(Duration::from_secs(1));
let msg2 = Message::new(name, MsgAction::Query(Access::new())); let msg2 = Message::new(name, MsgAction::Query(Access::new()));
queue.send(msg2).unwrap(); queue.send(msg2).unwrap();
} }