morethantext/src/message.rs

1123 lines
33 KiB
Rust
Raw Normal View History

2025-07-17 09:12:23 -04:00
use std::{
collections::HashMap,
sync::{
mpsc::{channel, Receiver, Sender},
Arc, RwLock,
},
2025-07-25 11:08:47 -04:00
thread::spawn,
2025-07-17 09:12:23 -04:00
};
use uuid::Uuid;
#[cfg(test)]
mod support_test {
use std::time::Duration;
pub static TIMEOUT: Duration = Duration::from_millis(500);
}
#[derive(Clone, Debug)]
enum MTTError {
DocumentAlreadyExists(String),
DocumentNotFound(String),
}
2025-07-17 09:12:23 -04:00
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum Action {
2025-07-25 11:08:47 -04:00
Create,
Error,
2025-07-28 10:49:34 -04:00
Query,
Reply,
2025-08-03 12:21:45 -04:00
Show,
}
2025-07-28 10:49:34 -04:00
impl From<MsgAction> for Action {
fn from(value: MsgAction) -> Self {
match value {
MsgAction::Create(_) => Action::Create,
MsgAction::Error(_) => Action::Error,
2025-07-28 10:49:34 -04:00
MsgAction::Query(_) => Action::Query,
MsgAction::Reply(_) => Action::Reply,
2025-08-03 12:21:45 -04:00
MsgAction::Show => Action::Show,
2025-07-28 10:49:34 -04:00
}
}
}
impl From<&MsgAction> for Action {
fn from(value: &MsgAction) -> Self {
let action = value.clone();
Self::from(action)
}
}
2025-07-25 11:08:47 -04:00
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
2025-07-17 09:12:23 -04:00
enum NameID {
ID(Uuid),
Name(String),
}
2025-07-17 09:12:23 -04:00
impl From<&str> for NameID {
fn from(value: &str) -> Self {
Self::Name(value.to_string())
}
}
2025-07-17 09:12:23 -04:00
impl From<String> for NameID {
fn from(value: String) -> Self {
Self::Name(value)
}
}
2025-07-17 09:12:23 -04:00
impl From<Uuid> for NameID {
fn from(value: Uuid) -> Self {
Self::ID(value)
}
}
2025-07-20 10:39:15 -04:00
impl From<&NameID> for NameID {
fn from(value: &NameID) -> Self {
value.clone()
}
}
#[derive(Clone, Debug)]
2025-07-28 10:49:34 -04:00
enum MsgAction {
Create(DocDef),
2025-08-02 08:58:50 -04:00
// Alter
// Remove
Error(MTTError),
2025-07-28 10:49:34 -04:00
Query(Access),
Reply(Response),
2025-08-03 12:21:45 -04:00
Show,
2025-08-02 08:58:50 -04:00
// Add
// Delete
}
impl From<DocDef> for MsgAction {
fn from(value: DocDef) -> Self {
MsgAction::Create(value)
}
}
impl From<MTTError> for MsgAction {
fn from(value: MTTError) -> Self {
MsgAction::Error(value)
}
}
impl From<Access> for MsgAction {
fn from(value: Access) -> Self {
MsgAction::Query(value)
}
}
impl From<Response> for MsgAction {
fn from(value: Response) -> Self {
MsgAction::Reply(value)
}
}
#[cfg(test)]
mod msgactions {
use super::*;
#[test]
fn turn_document_definition_into_action() {
let value = DocDef::new();
let result: MsgAction = value.into();
match result {
2025-08-02 09:55:13 -04:00
MsgAction::Create(_) => {}
2025-08-02 08:58:50 -04:00
_ => unreachable!("Got {:?}: dhould have been create", result),
}
}
#[test]
fn turn_error_into_action() {
let data = "data".to_string();
let value = MTTError::DocumentAlreadyExists(data.clone());
let result: MsgAction = value.into();
match result {
MsgAction::Error(result) => match result {
MTTError::DocumentAlreadyExists(output) => assert_eq!(output, data),
_ => unreachable!("Got {:?}: dhould have been create", result),
},
_ => unreachable!("Got {:?}: dhould have been create", result),
}
let value = MTTError::DocumentNotFound(data.clone());
let result: MsgAction = value.into();
match result {
MsgAction::Error(result) => match result {
MTTError::DocumentNotFound(output) => assert_eq!(output, data),
_ => unreachable!("Got {:?}: dhould have been create", result),
},
_ => unreachable!("Got {:?}: dhould have been create", result),
}
}
#[test]
fn turn_query_into_action() {
let value = Access::new();
let result: MsgAction = value.into();
match result {
2025-08-02 09:55:13 -04:00
MsgAction::Query(_) => {}
2025-08-02 08:58:50 -04:00
_ => unreachable!("Got {:?}: dhould have been query", result),
}
}
#[test]
fn turn_reply_into_action() {
let value = Response::new();
let result: MsgAction = value.into();
match result {
2025-08-02 09:55:13 -04:00
MsgAction::Reply(_) => {}
2025-08-02 08:58:50 -04:00
_ => unreachable!("Got {:?}: dhould have been reply", result),
}
}
2025-07-28 10:49:34 -04:00
}
2025-08-03 12:21:45 -04:00
#[derive(Clone, Debug)]
struct Message {
msg_id: Uuid,
2025-07-17 09:12:23 -04:00
document_id: NameID,
2025-07-28 10:49:34 -04:00
action: MsgAction,
}
impl Message {
2025-08-02 08:58:50 -04:00
fn new<D, A>(doc_id: D, action: A) -> Self
2025-07-17 09:12:23 -04:00
where
D: Into<NameID>,
2025-08-02 08:58:50 -04:00
A: Into<MsgAction>,
2025-07-17 09:12:23 -04:00
{
Self {
msg_id: Uuid::new_v4(),
document_id: doc_id.into(),
2025-08-02 08:58:50 -04:00
action: action.into(),
}
}
fn get_message_id(&self) -> &Uuid {
&self.msg_id
}
2025-07-17 09:12:23 -04:00
fn get_document_id(&self) -> &NameID {
&self.document_id
}
2025-07-28 10:49:34 -04:00
fn get_action(&self) -> &MsgAction {
2025-07-17 09:12:23 -04:00
&self.action
}
fn reply(&self, resp: Response) -> Self {
Self {
msg_id: self.msg_id.clone(),
document_id: self.document_id.clone(),
action: MsgAction::Reply(resp),
}
}
fn error(&self, err: MTTError) -> Self {
Self {
msg_id: self.msg_id.clone(),
document_id: self.document_id.clone(),
action: MsgAction::Error(err),
}
}
}
#[cfg(test)]
mod messages {
use super::*;
#[test]
fn can_the_document_be_a_stringi_reference() {
let dts = ["one", "two"];
for document in dts.into_iter() {
2025-07-28 10:49:34 -04:00
let msg = Message::new(document, MsgAction::Create(DocDef::new()));
match msg.get_document_id() {
2025-07-17 09:12:23 -04:00
NameID::ID(_) => unreachable!("should have been a string id"),
NameID::Name(data) => assert_eq!(data, document),
}
2025-07-28 10:49:34 -04:00
match msg.get_action() {
MsgAction::Create(_) => {}
_ => unreachable!("should have been a create document"),
}
}
}
#[test]
fn can_the_document_be_a_string() {
let dts = ["one".to_string(), "two".to_string()];
for document in dts.into_iter() {
2025-07-28 10:49:34 -04:00
let msg = Message::new(document.clone(), MsgAction::Query(Access::new()));
match msg.get_document_id() {
2025-07-17 09:12:23 -04:00
NameID::ID(_) => unreachable!("should have been a string id"),
NameID::Name(data) => assert_eq!(data, &document),
}
2025-07-28 10:49:34 -04:00
match msg.get_action() {
MsgAction::Query(_) => {}
_ => unreachable!("should have been an access query"),
}
}
}
#[test]
fn can_the_document_be_an_id() {
let document = Uuid::new_v4();
2025-07-28 10:49:34 -04:00
let msg = Message::new(document.clone(), MsgAction::Query(Access::new()));
match msg.get_document_id() {
2025-07-17 09:12:23 -04:00
NameID::ID(data) => assert_eq!(data, &document),
NameID::Name(_) => unreachable!("should have been an id"),
}
2025-07-28 10:49:34 -04:00
match msg.get_action() {
MsgAction::Query(_) => {}
_ => unreachable!("should have been an access query"),
}
}
#[test]
fn is_the_message_id_random() {
let mut ids: Vec<Uuid> = Vec::new();
for _ in 0..5 {
2025-07-28 10:49:34 -04:00
let msg = Message::new("tester", MsgAction::Create(DocDef::new()));
let id = msg.get_message_id().clone();
assert!(!ids.contains(&id), "{:?} containts {}", ids, id);
ids.push(id);
}
}
#[test]
fn Can_make_reply_message() {
let name = "testing";
let msg = Message::new(name, MsgAction::Query(Access::new()));
let responce = Response::new();
let reply = msg.reply(responce);
assert_eq!(reply.get_message_id(), msg.get_message_id());
match reply.get_document_id() {
NameID::Name(data) => assert_eq!(data, name),
_ => unreachable!("should have been a name"),
}
match reply.get_action() {
MsgAction::Reply(_) => {}
_ => unreachable!("should have been a reply"),
}
}
#[test]
fn Can_make_error_message() {
let name = "testing";
let msg = Message::new(name, MsgAction::Query(Access::new()));
let err_msg = Uuid::new_v4().to_string();
let result = msg.error(MTTError::DocumentNotFound(err_msg.clone()));
assert_eq!(result.get_message_id(), msg.get_message_id());
match result.get_document_id() {
NameID::Name(data) => assert_eq!(data, name),
_ => unreachable!("should have been a name"),
}
match result.get_action() {
MsgAction::Error(data) => match data {
MTTError::DocumentNotFound(txt) => assert_eq!(txt, &err_msg),
_ => unreachable!("got {:?}, should have received not found", data),
},
_ => unreachable!("should have been a reply"),
}
}
2025-07-17 09:12:23 -04:00
}
2025-07-25 11:08:47 -04:00
#[derive(Clone, Debug)]
2025-07-21 13:31:23 -04:00
enum Include<T> {
All,
Some(T),
}
impl<T: PartialEq> PartialEq for Include<T> {
fn eq(&self, other: &Self) -> bool {
match self {
Include::All => true,
Include::Some(data) => match other {
Include::All => true,
Include::Some(other_data) => data == other_data,
},
}
}
}
2025-07-25 11:08:47 -04:00
#[cfg(test)]
mod includes {
use super::*;
#[test]
fn does_all_equal_evberything() {
let a: Include<isize> = Include::All;
let b: Include<isize> = Include::Some(5);
let c: Include<isize> = Include::Some(7);
assert!(a == a, "all should equal all");
assert!(a == b, "all should equal some");
assert!(b == a, "some should equal all");
assert!(b == b, "same some should equal");
assert!(b != c, "different somes do not equal");
}
}
#[derive(Clone, Eq, Hash, PartialEq)]
2025-07-21 13:31:23 -04:00
struct RouteID {
2025-07-25 11:08:47 -04:00
action: Option<Action>,
2025-07-17 09:12:23 -04:00
doc_type: Option<Uuid>,
2025-07-25 11:08:47 -04:00
msg_id: Option<Uuid>,
2025-07-17 09:12:23 -04:00
}
2025-07-21 13:31:23 -04:00
impl From<Route> for RouteID {
2025-07-28 10:49:34 -04:00
fn from(value: Route) -> Self {
Self {
action: match value.action {
Include::All => None,
Include::Some(action) => Some(action.clone()),
},
doc_type: match value.doc_type {
Include::All => None,
Include::Some(doc) => Some(doc.clone()),
},
msg_id: match value.msg_id {
Include::All => None,
Include::Some(id) => Some(id.clone()),
},
}
2025-07-21 13:31:23 -04:00
}
}
2025-07-25 11:08:47 -04:00
#[derive(Clone, Debug, PartialEq)]
2025-07-21 13:31:23 -04:00
struct Route {
2025-07-25 11:08:47 -04:00
action: Include<Action>,
2025-07-21 13:31:23 -04:00
doc_type: Include<Uuid>,
2025-07-25 11:08:47 -04:00
msg_id: Include<Uuid>,
2025-07-21 13:31:23 -04:00
}
2025-07-17 09:12:23 -04:00
impl Route {
2025-07-28 10:49:34 -04:00
fn new(msg_id: Include<Uuid>, doc: Include<Uuid>, action: Include<Action>) -> Self {
2025-07-17 09:12:23 -04:00
Self {
action: action,
2025-07-25 11:08:47 -04:00
doc_type: doc,
msg_id: msg_id,
2025-07-17 09:12:23 -04:00
}
}
}
2025-07-21 13:31:23 -04:00
impl From<RouteID> for Route {
fn from(value: RouteID) -> Self {
Self {
2025-07-25 11:08:47 -04:00
action: match value.action {
Some(data) => Include::Some(data.clone()),
None => Include::All,
},
2025-07-21 13:31:23 -04:00
doc_type: match value.doc_type {
Some(doc) => Include::Some(doc.clone()),
None => Include::All,
},
2025-07-25 11:08:47 -04:00
msg_id: match value.msg_id {
Some(msg) => Include::Some(msg.clone()),
None => Include::All,
},
2025-07-21 13:31:23 -04:00
}
}
}
impl From<&RouteID> for Route {
fn from(value: &RouteID) -> Self {
Self {
2025-07-28 10:49:34 -04:00
action: match &value.action {
2025-07-25 11:08:47 -04:00
Some(data) => Include::Some(data.clone()),
None => Include::All,
},
doc_type: match &value.doc_type {
2025-07-21 13:31:23 -04:00
Some(doc) => Include::Some(doc.clone()),
None => Include::All,
},
2025-07-25 11:08:47 -04:00
msg_id: match &value.msg_id {
Some(msg) => Include::Some(msg.clone()),
None => Include::All,
},
}
}
}
#[cfg(test)]
mod roiutes {
use super::*;
#[test]
fn can_a_route_set_action() {
let actions = [Action::Query, Action::Reply];
for action in actions.into_iter() {
let route = Route::new(Include::All, Include::All, Include::Some(action.clone()));
match route.msg_id {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
match route.doc_type {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
match route.action {
Include::All => unreachable!("should be a specific value"),
Include::Some(result) => assert_eq!(result, action),
}
}
}
#[test]
fn can_route_set_document_by_name() {
let doc_id = Uuid::new_v4();
2025-07-28 10:49:34 -04:00
let route = Route::new(Include::All, Include::Some(doc_id.clone()), Include::All);
match route.msg_id {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
match route.doc_type {
Include::All => unreachable!("should be a specific value"),
Include::Some(result) => assert_eq!(result, doc_id),
}
match route.action {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
2025-07-25 11:08:47 -04:00
}
#[test]
fn can_route_set_document_by_id() {
let id = Uuid::new_v4();
let route = Route::new(Include::All, Include::Some(id.clone()), Include::All);
match route.msg_id {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
match route.doc_type {
Include::All => unreachable!("should be a specific value"),
Include::Some(result) => assert_eq!(result, id),
}
match route.action {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
}
#[test]
fn can_route_be_set_by_message_id() {
let id = Uuid::new_v4();
let route = Route::new(Include::Some(id.clone()), Include::All, Include::All);
match route.msg_id {
Include::All => unreachable!("should be a specific value"),
Include::Some(result) => assert_eq!(result, id),
}
match route.doc_type {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
match route.action {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
2025-07-21 13:31:23 -04:00
}
}
}
#[derive(Clone)]
struct RouteRequest {
msg_id: Include<Uuid>,
doc_name: Include<String>,
action: Include<Action>,
}
impl RouteRequest {
fn new(msg_id: Include<Uuid>, doc_name: Include<String>, action: Include<Action>) -> Self {
Self {
msg_id: msg_id,
doc_name: doc_name,
action: action,
}
}
}
struct QueueData {
senders: HashMap<Uuid, Sender<Message>>,
names: HashMap<String, Uuid>,
2025-07-21 13:31:23 -04:00
routes: HashMap<RouteID, Vec<Uuid>>,
}
impl QueueData {
fn new() -> Self {
2025-07-17 09:12:23 -04:00
Self {
senders: HashMap::new(),
names: HashMap::new(),
2025-07-17 09:12:23 -04:00
routes: HashMap::new(),
}
}
2025-07-20 10:39:15 -04:00
fn get_doc_id<N>(&self, nameid: N) -> Result<Uuid, MTTError>
where
N: Into<NameID>,
{
let sender_id = match nameid.into() {
NameID::Name(name) => match self.names.get(&name) {
Some(id) => id.clone(),
None => return Err(MTTError::DocumentNotFound(name.clone())),
},
NameID::ID(id) => id.clone(),
};
if self.senders.contains_key(&sender_id) {
Ok(sender_id)
} else {
Err(MTTError::DocumentNotFound(sender_id.to_string()))
}
}
fn register(
&mut self,
tx: Sender<Message>,
name: String,
routes: Vec<RouteRequest>,
) -> Result<(), MTTError> {
2025-07-17 09:12:23 -04:00
let mut id = Uuid::new_v4();
while self.senders.contains_key(&id) {
id = Uuid::new_v4();
}
match self.get_doc_id(name.clone()) {
Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)),
Err(_) => {}
}
let mut holder: HashMap<RouteID, Vec<Uuid>> = HashMap::new();
for route in routes.iter() {
let doc_type = match &route.doc_name {
Include::Some(doc_name) => {
if doc_name == &name {
Include::Some(id.clone())
} else {
match self.get_doc_id(doc_name.to_string()) {
Ok(doc_id) => Include::Some(doc_id),
Err(err) => return Err(err),
}
}
}
Include::All => Include::All,
};
let route_id: RouteID =
Route::new(route.msg_id.clone(), doc_type, route.action.clone()).into();
match self.routes.get(&route_id) {
Some(senders) => {
let mut addition = senders.clone();
addition.push(id.clone());
holder.insert(route_id, addition);
}
None => {
let senders = [id.clone()].to_vec();
holder.insert(route_id, senders);
}
}
}
self.senders.insert(id.clone(), tx);
self.names.insert(name.clone(), id.clone());
for (route_id, senders) in holder.iter() {
self.routes.insert(route_id.clone(), senders.clone());
}
Ok(())
}
fn send(&self, msg: Message) -> Result<(), MTTError> {
2025-07-21 13:31:23 -04:00
let doc_id: Include<Uuid> = match self.get_doc_id(msg.get_document_id()) {
Ok(id) => Include::Some(id.clone()),
2025-07-25 11:08:47 -04:00
Err(err) => {
2025-07-28 10:49:34 -04:00
let action: Action = msg.get_action().into();
if action == Action::Create {
2025-07-25 11:08:47 -04:00
Include::Some(Uuid::nil())
} else {
2025-07-28 10:49:34 -04:00
return Err(err);
2025-07-25 11:08:47 -04:00
}
2025-07-28 10:49:34 -04:00
}
2025-07-17 09:12:23 -04:00
};
2025-07-28 10:49:34 -04:00
let route = Route::new(
Include::Some(msg.get_message_id().clone()),
doc_id,
Include::Some(msg.get_action().into()),
);
2025-07-21 13:31:23 -04:00
for (send_route, send_ids) in self.routes.iter() {
if route == send_route.into() {
for send_id in send_ids {
let tx = self.senders.get(&send_id).unwrap();
2025-07-20 10:39:15 -04:00
tx.send(msg.clone()).unwrap();
}
}
}
Ok(())
}
}
#[cfg(test)]
mod queuedatas {
use super::support_test::TIMEOUT;
use super::*;
use std::sync::mpsc::RecvTimeoutError;
#[test]
fn can_document_be_registered() {
let mut queuedata = QueueData::new();
let (tx, rx) = channel();
let name = Uuid::new_v4().to_string();
let routes = [
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Query),
),
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Reply),
),
]
.to_vec();
queuedata.register(tx, name.clone(), routes).unwrap();
let msg1 = Message::new(name.clone(), MsgAction::Query(Access::new()));
let msg2 = Message::new(name.clone(), MsgAction::Reply(Response::new()));
let msg3 = Message::new(name.clone(), MsgAction::Create(DocDef::new()));
queuedata.send(msg1.clone()).unwrap();
queuedata.send(msg2.clone()).unwrap();
queuedata.send(msg3.clone()).unwrap();
let result1 = rx.recv_timeout(TIMEOUT).unwrap();
let result2 = rx.recv_timeout(TIMEOUT).unwrap();
match rx.recv_timeout(TIMEOUT) {
Ok(_) => unreachable!("should have timed out"),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should have timed out"),
},
}
assert_eq!(result1.get_message_id(), msg1.get_message_id());
assert_eq!(result2.get_message_id(), msg2.get_message_id());
match result1.get_action() {
MsgAction::Query(_) => {}
_ => unreachable!("should have been a query"),
}
match result2.get_action() {
MsgAction::Reply(_) => {}
_ => unreachable!("should have been a query"),
}
}
#[test]
fn does_register_fail_on_duplicate_documents() {
let mut queuedata = QueueData::new();
let (tx1, _) = channel();
let (tx2, _) = channel();
let name = Uuid::new_v4().to_string();
queuedata
.register(tx1, name.to_string(), Vec::new())
.unwrap();
match queuedata.register(tx2, name.to_string(), Vec::new()) {
Ok(_) => unreachable!("duplicates should create an error"),
Err(err) => match err {
MTTError::DocumentAlreadyExists(result) => assert_eq!(result, name),
_ => unreachable!("should have been document does not exists"),
},
}
}
#[test]
fn does_bad_route_prevent_registration() {
let mut queuedata = QueueData::new();
let (tx, rx) = channel();
let good = "good";
let bad = Uuid::new_v4().to_string();
let routes = [
RouteRequest::new(
Include::All,
Include::Some(good.to_string()),
Include::Some(Action::Query),
),
RouteRequest::new(
Include::All,
Include::Some(bad.clone()),
Include::Some(Action::Reply),
),
]
.to_vec();
match queuedata.register(tx, good.to_string(), routes) {
Ok(_) => unreachable!("should produce an error"),
Err(err) => match err {
MTTError::DocumentNotFound(result) => assert_eq!(result, bad),
_ => unreachable!("Shouuld be document not found"),
},
}
assert_eq!(queuedata.senders.len(), 0, "should not add to senders");
assert_eq!(queuedata.names.len(), 0, "should not add to names");
assert_eq!(queuedata.routes.len(), 0, "should nor add to routes");
}
#[test]
fn is_sender_only_added_once_to_routes() {
let mut queuedata = QueueData::new();
let (tx, rx) = channel();
let name = "project";
let routes = [
RouteRequest::new(
Include::All,
Include::Some(name.to_string()),
Include::Some(Action::Query),
),
RouteRequest::new(
Include::All,
Include::Some(name.to_string()),
Include::Some(Action::Query),
),
]
.to_vec();
queuedata.register(tx, name.to_string(), routes).unwrap();
for senders in queuedata.routes.values() {
assert_eq!(senders.len(), 1, "should be no double entries");
}
}
#[test]
fn does_a_bad_document_name_fail() {
let docname = Uuid::new_v4().to_string();
let queuedata = QueueData::new();
2025-07-28 10:49:34 -04:00
let msg = Message::new(docname.clone(), MsgAction::Query(Access::new()));
match queuedata.send(msg) {
Ok(_) => unreachable!("should have been an error"),
Err(data) => match data {
MTTError::DocumentNotFound(doc) => assert_eq!(doc, docname),
_ => unreachable!("should have been a not found error"),
},
}
}
2025-07-17 09:12:23 -04:00
#[test]
fn is_send_okay_if_no_one_is_listening() {
let mut queuedata = QueueData::new();
let name = "something";
let (tx, _) = channel();
queuedata
.register(tx, name.to_string(), Vec::new())
.unwrap();
2025-07-28 10:49:34 -04:00
let msg = Message::new("something", MsgAction::Create(DocDef::new()));
2025-07-17 09:12:23 -04:00
match queuedata.send(msg) {
Ok(_) => {}
Err(err) => unreachable!("got {:?}: should not error", err),
}
}
#[test]
2025-07-20 10:39:15 -04:00
fn can_more_than_one_document_respond() {
2025-07-17 09:12:23 -04:00
let mut queuedata = QueueData::new();
2025-07-20 10:39:15 -04:00
let name1 = "task";
let name2 = "work";
2025-07-28 10:49:34 -04:00
let action = MsgAction::Query(Access::new());
2025-07-17 09:12:23 -04:00
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
let routes = [RouteRequest::new(
Include::All,
Include::Some(name1.to_string()),
Include::All,
)]
.to_vec();
queuedata
.register(tx1, name1.to_string(), routes.clone())
.unwrap();
queuedata
.register(tx2, name2.to_string(), routes.clone())
.unwrap();
2025-07-20 10:39:15 -04:00
let msg = Message::new(name1, action.clone());
2025-07-17 09:12:23 -04:00
queuedata.send(msg.clone()).unwrap();
2025-07-20 10:39:15 -04:00
let result1 = rx1.recv_timeout(TIMEOUT).unwrap();
let result2 = rx2.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result1.get_message_id(), msg.get_message_id());
assert_eq!(result1.get_message_id(), result2.get_message_id());
2025-07-17 09:12:23 -04:00
}
}
#[derive(Clone)]
struct Queue {
queue_data: Arc<RwLock<QueueData>>,
}
impl Queue {
fn new() -> Self {
Self {
queue_data: Arc::new(RwLock::new(QueueData::new())),
}
}
2025-07-25 11:08:47 -04:00
fn register(
&mut self,
tx: Sender<Message>,
name: String,
routes: Vec<RouteRequest>,
) -> Result<(), MTTError> {
2025-07-25 11:08:47 -04:00
let mut queuedata = self.queue_data.write().unwrap();
queuedata.register(tx, name, routes)
2025-07-25 11:08:47 -04:00
}
fn send(&self, msg: Message) -> Result<(), MTTError> {
let queuedata = self.queue_data.read().unwrap();
queuedata.send(msg)
}
}
#[cfg(test)]
mod queues {
use super::*;
#[test]
fn create_a_queue() {
Queue::new();
}
}
2025-07-28 10:49:34 -04:00
struct CreateDoc {
queue: Queue,
rx: Receiver<Message>,
}
impl CreateDoc {
fn new(queue: Queue, rx: Receiver<Message>) -> Self {
Self {
queue: queue,
rx: rx,
}
}
fn start(mut queue: Queue) {
let (tx, rx) = channel();
let routes = [RouteRequest::new(
Include::All,
Include::All,
Include::Some(Action::Create),
)]
.to_vec();
let id = queue.register(tx, "document".to_string(), routes).unwrap();
2025-07-28 10:49:34 -04:00
let doc = CreateDoc::new(queue, rx);
spawn(move || {
doc.listen();
});
}
fn listen(&self) {
loop {
let msg = self.rx.recv().unwrap();
Document::start(self.queue.clone(), msg);
2025-07-28 10:49:34 -04:00
}
}
}
#[derive(Clone, Debug)]
2025-07-28 10:49:34 -04:00
struct DocDef;
impl DocDef {
fn new() -> Self {
Self {}
}
}
#[derive(Clone, Debug)]
2025-07-28 10:49:34 -04:00
struct Access;
impl Access {
fn new() -> Self {
Self {}
}
}
#[derive(Clone, Debug)]
struct Response;
impl Response {
fn new() -> Self {
Self {}
}
}
2025-07-25 11:08:47 -04:00
struct Document {
queue: Queue,
rx: Receiver<Message>,
}
impl Document {
2025-07-25 11:08:47 -04:00
fn new(queue: Queue, rx: Receiver<Message>) -> Self {
Self {
queue: queue,
rx: rx,
}
}
fn start(mut queue: Queue, msg: Message) {
2025-07-25 11:08:47 -04:00
let (tx, rx) = channel();
let name = match msg.get_document_id() {
NameID::Name(name) => name.clone(),
NameID::ID(id) => id.to_string(),
};
2025-08-03 12:21:45 -04:00
let routes = [
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Query),
),
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Show),
),
]
.to_vec();
match queue.register(tx, name, routes) {
Ok(_) => {}
Err(err) => {
let error = msg.error(err);
queue.send(error).unwrap();
return;
}
}
let doc = Document::new(queue.clone(), rx);
2025-07-25 11:08:47 -04:00
spawn(move || {
doc.listen();
});
let reply = msg.reply(Response::new());
queue.send(reply).unwrap();
2025-07-25 11:08:47 -04:00
}
2025-07-25 11:08:47 -04:00
fn listen(&self) {
loop {
let msg = self.rx.recv().unwrap();
let reply = msg.reply(Response::new());
self.queue.send(reply).unwrap();
2025-07-25 11:08:47 -04:00
}
}
}
2025-08-02 08:58:50 -04:00
#[cfg(test)]
mod documents {
2025-08-02 09:55:13 -04:00
use super::{support_test::TIMEOUT, *};
use std::sync::mpsc::RecvTimeoutError;
2025-08-03 12:21:45 -04:00
fn test_doc(
name: &str,
docdef: DocDef,
routes: Vec<RouteRequest>,
) -> (Queue, Receiver<Message>) {
let (tx, rx) = channel();
let mut queue = Queue::new();
let msg = Message::new(name, docdef);
Document::start(queue.clone(), msg);
queue
.register(tx, Uuid::new_v4().to_string(), routes)
.unwrap();
(queue, rx)
2025-08-02 09:55:13 -04:00
}
2025-08-02 08:58:50 -04:00
#[test]
2025-08-03 12:21:45 -04:00
fn can_show_document_details() {
2025-08-02 09:55:13 -04:00
let docdef = DocDef::new();
let name = "first";
2025-08-03 12:21:45 -04:00
let routes = [RouteRequest::new(
Include::All,
Include::All,
Include::Some(Action::Reply),
)]
.to_vec();
let (queue, rx) = test_doc(name, docdef, routes);
let msg = Message::new(name, MsgAction::Show);
queue.send(msg.clone()).unwrap();
let show = rx.recv_timeout(TIMEOUT).unwrap();
}
fn only_responses_to_its_show_request() {
let docdef = DocDef::new();
let name = "quiet";
let routes = [RouteRequest::new(
Include::All,
Include::All,
Include::Some(Action::Reply),
)]
.to_vec();
let (mut queue, rx) = test_doc(name, docdef, routes);
let other = "alternate";
let (tx, _) = channel();
queue.register(tx, other.to_string(), Vec::new()).unwrap();
let msg = Message::new(other, MsgAction::Show);
queue.send(msg).unwrap();
match rx.recv_timeout(TIMEOUT) {
Ok(msg) => unreachable!("should not receive: {:?}", msg),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should have timed out"),
},
}
2025-08-02 08:58:50 -04:00
}
}
#[cfg(test)]
2025-07-28 10:49:34 -04:00
mod createdocs {
use super::support_test::TIMEOUT;
use super::*;
fn setup_create_doc(routes: Vec<RouteRequest>) -> (Queue, Receiver<Message>) {
let mut queue = Queue::new();
let (tx, rx) = channel();
queue
.register(tx, Uuid::new_v4().to_string(), routes)
.unwrap();
CreateDoc::start(queue.clone());
(queue, rx)
}
#[test]
fn create_document_creation() {
let name = "project";
let routes = [RouteRequest::new(
Include::All,
Include::All,
Include::Some(Action::Reply),
)]
.to_vec();
let (queue, rx) = setup_create_doc(routes);
let msg1 = Message::new(name, MsgAction::Create(DocDef::new()));
queue.send(msg1.clone()).unwrap();
let result1 = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result1.get_message_id(), msg1.get_message_id());
assert_eq!(result1.get_document_id(), msg1.get_document_id());
match result1.get_action() {
MsgAction::Reply(_) => {}
_ => unreachable!("got {:?}: should have been a reply.", result1.get_action()),
}
let msg2 = Message::new(name, MsgAction::Query(Access::new()));
queue.send(msg2.clone()).unwrap();
let result2 = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result2.get_message_id(), msg2.get_message_id());
assert_eq!(result2.get_document_id(), msg2.get_document_id());
match result2.get_action() {
MsgAction::Reply(_) => {}
_ => unreachable!("got {:?}: should have been a reply.", result1.get_action()),
}
}
#[test]
fn does_duplicates_generate_error() {
let name = "duplicate";
let routes = [
RouteRequest::new(Include::All, Include::All, Include::Some(Action::Reply)),
RouteRequest::new(Include::All, Include::All, Include::Some(Action::Error)),
]
.to_vec();
let (queue, rx) = setup_create_doc(routes);
2025-07-28 10:49:34 -04:00
let msg = Message::new(name, MsgAction::Create(DocDef::new()));
queue.send(msg.clone()).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
queue.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
assert_eq!(result.get_document_id(), msg.get_document_id());
match result.get_action() {
MsgAction::Error(err) => match err {
MTTError::DocumentAlreadyExists(data) => assert_eq!(data, name),
_ => unreachable!("got {:?}: should have been a reply.", err),
},
_ => unreachable!("got {:?}: should have been a reply.", result.get_action()),
}
}
}