morethantext/src/message.rs

3266 lines
106 KiB
Rust
Raw Normal View History

2025-07-17 09:12:23 -04:00
use std::{
2025-09-05 23:52:24 -04:00
collections::{HashMap, HashSet},
2025-07-17 09:12:23 -04:00
sync::{
mpsc::{channel, Receiver, Sender},
Arc, RwLock,
},
2025-07-25 11:08:47 -04:00
thread::spawn,
2025-07-17 09:12:23 -04:00
};
use uuid::Uuid;
#[cfg(test)]
mod support_test {
use std::time::Duration;
pub static TIMEOUT: Duration = Duration::from_millis(500);
}
#[derive(Clone, Debug)]
enum MTTError {
DocumentAlreadyExists(String),
2025-09-05 23:52:24 -04:00
DocumentFieldAlreadyExists(String, Field),
2025-08-10 16:20:47 -04:00
DocumentFieldMissing(String),
DocumentFieldNotFound(String),
2025-08-10 16:20:47 -04:00
DocumentFieldWrongDataType(FieldType, FieldType),
DocumentNotFound(String),
2025-09-11 23:49:25 -04:00
FieldDuplicate(String, Field),
}
2025-07-17 09:12:23 -04:00
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum Action {
2025-08-05 09:56:48 -04:00
Addition,
2025-07-25 11:08:47 -04:00
Create,
Error,
2025-07-28 10:49:34 -04:00
Query,
Reply,
2025-08-03 12:21:45 -04:00
Show,
2025-08-26 08:06:22 -04:00
Update,
}
2025-07-28 10:49:34 -04:00
impl From<MsgAction> for Action {
fn from(value: MsgAction) -> Self {
match value {
2025-08-05 09:56:48 -04:00
MsgAction::Addition(_) => Action::Addition,
2025-07-28 10:49:34 -04:00
MsgAction::Create(_) => Action::Create,
MsgAction::Error(_) => Action::Error,
2025-07-28 10:49:34 -04:00
MsgAction::Query(_) => Action::Query,
MsgAction::Reply(_) => Action::Reply,
2025-08-03 12:21:45 -04:00
MsgAction::Show => Action::Show,
2025-08-26 08:06:22 -04:00
MsgAction::Update(_) => Action::Update,
2025-07-28 10:49:34 -04:00
}
}
}
impl From<&MsgAction> for Action {
fn from(value: &MsgAction) -> Self {
let action = value.clone();
Self::from(action)
}
}
2025-07-25 11:08:47 -04:00
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
2025-07-17 09:12:23 -04:00
enum NameID {
ID(Uuid),
Name(String),
}
2025-07-17 09:12:23 -04:00
impl From<&str> for NameID {
fn from(value: &str) -> Self {
Self::Name(value.to_string())
}
}
2025-07-17 09:12:23 -04:00
impl From<String> for NameID {
fn from(value: String) -> Self {
Self::Name(value)
}
}
2025-07-17 09:12:23 -04:00
impl From<Uuid> for NameID {
fn from(value: Uuid) -> Self {
Self::ID(value)
}
}
2025-07-20 10:39:15 -04:00
impl From<&NameID> for NameID {
fn from(value: &NameID) -> Self {
value.clone()
}
}
#[derive(Clone, Debug)]
2025-07-28 10:49:34 -04:00
enum MsgAction {
2025-08-05 09:56:48 -04:00
Addition(Addition),
2025-07-28 10:49:34 -04:00
Create(DocDef),
2025-08-02 08:58:50 -04:00
// Alter
// Remove
Error(MTTError),
2025-08-05 09:56:48 -04:00
Query(Query),
Reply(Reply),
2025-08-03 12:21:45 -04:00
Show,
2025-08-02 08:58:50 -04:00
// Delete
2025-08-26 08:06:22 -04:00
Update(Update),
2025-08-02 08:58:50 -04:00
}
2025-08-05 09:56:48 -04:00
impl From<Addition> for MsgAction {
fn from(value: Addition) -> Self {
MsgAction::Addition(value)
}
}
2025-08-02 08:58:50 -04:00
impl From<DocDef> for MsgAction {
fn from(value: DocDef) -> Self {
MsgAction::Create(value)
}
}
impl From<MTTError> for MsgAction {
fn from(value: MTTError) -> Self {
MsgAction::Error(value)
}
}
2025-08-05 09:56:48 -04:00
impl From<Query> for MsgAction {
fn from(value: Query) -> Self {
2025-08-02 08:58:50 -04:00
MsgAction::Query(value)
}
}
2025-08-05 09:56:48 -04:00
impl From<Reply> for MsgAction {
fn from(value: Reply) -> Self {
2025-08-02 08:58:50 -04:00
MsgAction::Reply(value)
}
}
2025-08-26 08:06:22 -04:00
impl From<Update> for MsgAction {
fn from(value: Update) -> Self {
MsgAction::Update(value)
}
}
2025-08-02 08:58:50 -04:00
#[cfg(test)]
mod msgactions {
use super::*;
#[test]
fn turn_document_definition_into_action() {
let value = DocDef::new();
let result: MsgAction = value.into();
match result {
2025-08-02 09:55:13 -04:00
MsgAction::Create(_) => {}
2025-08-02 08:58:50 -04:00
_ => unreachable!("Got {:?}: dhould have been create", result),
}
}
#[test]
fn turn_error_into_action() {
let data = "data".to_string();
let value = MTTError::DocumentAlreadyExists(data.clone());
let result: MsgAction = value.into();
match result {
MsgAction::Error(result) => match result {
MTTError::DocumentAlreadyExists(output) => assert_eq!(output, data),
_ => unreachable!("Got {:?}: dhould have been create", result),
},
_ => unreachable!("Got {:?}: dhould have been create", result),
}
let value = MTTError::DocumentNotFound(data.clone());
let result: MsgAction = value.into();
match result {
MsgAction::Error(result) => match result {
MTTError::DocumentNotFound(output) => assert_eq!(output, data),
_ => unreachable!("Got {:?}: dhould have been create", result),
},
_ => unreachable!("Got {:?}: dhould have been create", result),
}
}
#[test]
fn turn_query_into_action() {
2025-08-05 09:56:48 -04:00
let value = Query::new();
2025-08-02 08:58:50 -04:00
let result: MsgAction = value.into();
match result {
2025-08-02 09:55:13 -04:00
MsgAction::Query(_) => {}
2025-08-02 08:58:50 -04:00
_ => unreachable!("Got {:?}: dhould have been query", result),
}
}
#[test]
fn turn_reply_into_action() {
2025-08-05 09:56:48 -04:00
let value = Reply::new();
2025-08-02 08:58:50 -04:00
let result: MsgAction = value.into();
match result {
2025-08-02 09:55:13 -04:00
MsgAction::Reply(_) => {}
2025-08-02 08:58:50 -04:00
_ => unreachable!("Got {:?}: dhould have been reply", result),
}
}
2025-07-28 10:49:34 -04:00
}
2025-08-03 12:21:45 -04:00
#[derive(Clone, Debug)]
struct Message {
msg_id: Uuid,
2025-07-17 09:12:23 -04:00
document_id: NameID,
2025-07-28 10:49:34 -04:00
action: MsgAction,
}
impl Message {
2025-08-02 08:58:50 -04:00
fn new<D, A>(doc_id: D, action: A) -> Self
2025-07-17 09:12:23 -04:00
where
D: Into<NameID>,
2025-08-02 08:58:50 -04:00
A: Into<MsgAction>,
2025-07-17 09:12:23 -04:00
{
Self {
msg_id: Uuid::new_v4(),
document_id: doc_id.into(),
2025-08-02 08:58:50 -04:00
action: action.into(),
}
}
fn get_message_id(&self) -> &Uuid {
&self.msg_id
}
2025-07-17 09:12:23 -04:00
fn get_document_id(&self) -> &NameID {
&self.document_id
}
2025-07-28 10:49:34 -04:00
fn get_action(&self) -> &MsgAction {
2025-07-17 09:12:23 -04:00
&self.action
}
2025-08-10 16:20:47 -04:00
fn response<A>(&self, action: A) -> Self
where
A: Into<MsgAction>,
{
Self {
msg_id: self.msg_id.clone(),
document_id: self.document_id.clone(),
action: action.into(),
}
}
}
#[cfg(test)]
mod messages {
use super::*;
#[test]
fn can_the_document_be_a_stringi_reference() {
let dts = ["one", "two"];
for document in dts.into_iter() {
2025-07-28 10:49:34 -04:00
let msg = Message::new(document, MsgAction::Create(DocDef::new()));
match msg.get_document_id() {
2025-07-17 09:12:23 -04:00
NameID::ID(_) => unreachable!("should have been a string id"),
NameID::Name(data) => assert_eq!(data, document),
}
2025-07-28 10:49:34 -04:00
match msg.get_action() {
MsgAction::Create(_) => {}
_ => unreachable!("should have been a create document"),
}
}
}
#[test]
fn can_the_document_be_a_string() {
let dts = ["one".to_string(), "two".to_string()];
for document in dts.into_iter() {
2025-08-05 09:56:48 -04:00
let msg = Message::new(document.clone(), MsgAction::Query(Query::new()));
match msg.get_document_id() {
2025-07-17 09:12:23 -04:00
NameID::ID(_) => unreachable!("should have been a string id"),
NameID::Name(data) => assert_eq!(data, &document),
}
2025-07-28 10:49:34 -04:00
match msg.get_action() {
MsgAction::Query(_) => {}
_ => unreachable!("should have been an access query"),
}
}
}
#[test]
fn can_the_document_be_an_id() {
let document = Uuid::new_v4();
2025-08-05 09:56:48 -04:00
let msg = Message::new(document.clone(), MsgAction::Query(Query::new()));
match msg.get_document_id() {
2025-07-17 09:12:23 -04:00
NameID::ID(data) => assert_eq!(data, &document),
NameID::Name(_) => unreachable!("should have been an id"),
}
2025-07-28 10:49:34 -04:00
match msg.get_action() {
MsgAction::Query(_) => {}
_ => unreachable!("should have been an access query"),
}
}
#[test]
fn is_the_message_id_random() {
let mut ids: Vec<Uuid> = Vec::new();
for _ in 0..5 {
2025-07-28 10:49:34 -04:00
let msg = Message::new("tester", MsgAction::Create(DocDef::new()));
let id = msg.get_message_id().clone();
assert!(!ids.contains(&id), "{:?} containts {}", ids, id);
ids.push(id);
}
}
#[test]
fn Can_make_reply_message() {
let name = "testing";
2025-08-05 09:56:48 -04:00
let msg = Message::new(name, MsgAction::Query(Query::new()));
let responce = Reply::new();
let reply = msg.response(responce);
assert_eq!(reply.get_message_id(), msg.get_message_id());
match reply.get_document_id() {
NameID::Name(data) => assert_eq!(data, name),
_ => unreachable!("should have been a name"),
}
match reply.get_action() {
MsgAction::Reply(_) => {}
_ => unreachable!("should have been a reply"),
}
}
#[test]
fn Can_make_error_message() {
let name = "testing";
2025-08-05 09:56:48 -04:00
let msg = Message::new(name, MsgAction::Query(Query::new()));
let err_msg = Uuid::new_v4().to_string();
let result = msg.response(MTTError::DocumentNotFound(err_msg.clone()));
assert_eq!(result.get_message_id(), msg.get_message_id());
match result.get_document_id() {
NameID::Name(data) => assert_eq!(data, name),
_ => unreachable!("should have been a name"),
}
match result.get_action() {
MsgAction::Error(data) => match data {
MTTError::DocumentNotFound(txt) => assert_eq!(txt, &err_msg),
_ => unreachable!("got {:?}, should have received not found", data),
},
_ => unreachable!("should have been a reply"),
}
}
#[test]
fn can_make_a_response_message() {
let doc_id = Uuid::new_v4();
let msg = Message::new(doc_id.clone(), MsgAction::Query(Query::new()));
let data = Uuid::new_v4().to_string();
let result1 = msg.response(MTTError::DocumentNotFound(data.clone()));
let result2 = msg.response(Reply::new());
assert_eq!(result1.get_message_id(), msg.get_message_id());
assert_eq!(result2.get_message_id(), msg.get_message_id());
assert_eq!(result1.get_document_id(), msg.get_document_id());
assert_eq!(result2.get_document_id(), msg.get_document_id());
let action1 = result1.get_action();
match action1 {
MsgAction::Error(err) => match err {
MTTError::DocumentNotFound(output) => assert_eq!(output, &data),
_ => unreachable!("got {:?}: should have received document not found", err),
},
_ => unreachable!("got {:?}: should have received error", action1),
}
let action2 = result2.get_action();
match action2 {
MsgAction::Reply(data) => assert_eq!(data.len(), 0),
_ => unreachable!("got {:?}: should have received a reply", action2),
}
}
2025-07-17 09:12:23 -04:00
}
2025-07-25 11:08:47 -04:00
#[derive(Clone, Debug)]
2025-07-21 13:31:23 -04:00
enum Include<T> {
All,
Some(T),
}
impl<T: PartialEq> PartialEq for Include<T> {
fn eq(&self, other: &Self) -> bool {
match self {
Include::All => true,
Include::Some(data) => match other {
Include::All => true,
Include::Some(other_data) => data == other_data,
},
}
}
}
2025-07-25 11:08:47 -04:00
#[cfg(test)]
mod includes {
use super::*;
#[test]
fn does_all_equal_evberything() {
let a: Include<isize> = Include::All;
let b: Include<isize> = Include::Some(5);
let c: Include<isize> = Include::Some(7);
assert!(a == a, "all should equal all");
assert!(a == b, "all should equal some");
assert!(b == a, "some should equal all");
assert!(b == b, "same some should equal");
assert!(b != c, "different somes do not equal");
}
}
#[derive(Clone, Eq, Hash, PartialEq)]
2025-07-21 13:31:23 -04:00
struct RouteID {
2025-07-25 11:08:47 -04:00
action: Option<Action>,
2025-07-17 09:12:23 -04:00
doc_type: Option<Uuid>,
2025-07-25 11:08:47 -04:00
msg_id: Option<Uuid>,
2025-07-17 09:12:23 -04:00
}
2025-07-21 13:31:23 -04:00
impl From<Route> for RouteID {
2025-07-28 10:49:34 -04:00
fn from(value: Route) -> Self {
Self {
action: match value.action {
Include::All => None,
Include::Some(action) => Some(action.clone()),
},
doc_type: match value.doc_type {
Include::All => None,
Include::Some(doc) => Some(doc.clone()),
},
msg_id: match value.msg_id {
Include::All => None,
Include::Some(id) => Some(id.clone()),
},
}
2025-07-21 13:31:23 -04:00
}
}
2025-07-25 11:08:47 -04:00
#[derive(Clone, Debug, PartialEq)]
2025-07-21 13:31:23 -04:00
struct Route {
2025-07-25 11:08:47 -04:00
action: Include<Action>,
2025-07-21 13:31:23 -04:00
doc_type: Include<Uuid>,
2025-07-25 11:08:47 -04:00
msg_id: Include<Uuid>,
2025-07-21 13:31:23 -04:00
}
2025-07-17 09:12:23 -04:00
impl Route {
2025-07-28 10:49:34 -04:00
fn new(msg_id: Include<Uuid>, doc: Include<Uuid>, action: Include<Action>) -> Self {
2025-07-17 09:12:23 -04:00
Self {
action: action,
2025-07-25 11:08:47 -04:00
doc_type: doc,
msg_id: msg_id,
2025-07-17 09:12:23 -04:00
}
}
}
2025-07-21 13:31:23 -04:00
impl From<RouteID> for Route {
fn from(value: RouteID) -> Self {
Self {
2025-07-25 11:08:47 -04:00
action: match value.action {
Some(data) => Include::Some(data.clone()),
None => Include::All,
},
2025-07-21 13:31:23 -04:00
doc_type: match value.doc_type {
Some(doc) => Include::Some(doc.clone()),
None => Include::All,
},
2025-07-25 11:08:47 -04:00
msg_id: match value.msg_id {
Some(msg) => Include::Some(msg.clone()),
None => Include::All,
},
2025-07-21 13:31:23 -04:00
}
}
}
impl From<&RouteID> for Route {
fn from(value: &RouteID) -> Self {
Self {
2025-07-28 10:49:34 -04:00
action: match &value.action {
2025-07-25 11:08:47 -04:00
Some(data) => Include::Some(data.clone()),
None => Include::All,
},
doc_type: match &value.doc_type {
2025-07-21 13:31:23 -04:00
Some(doc) => Include::Some(doc.clone()),
None => Include::All,
},
2025-07-25 11:08:47 -04:00
msg_id: match &value.msg_id {
Some(msg) => Include::Some(msg.clone()),
None => Include::All,
},
}
}
}
#[cfg(test)]
mod routes {
2025-07-25 11:08:47 -04:00
use super::*;
#[test]
fn can_a_route_set_action() {
let actions = [Action::Query, Action::Reply];
for action in actions.into_iter() {
let route = Route::new(Include::All, Include::All, Include::Some(action.clone()));
match route.msg_id {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
match route.doc_type {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
match route.action {
Include::All => unreachable!("should be a specific value"),
Include::Some(result) => assert_eq!(result, action),
}
}
}
#[test]
fn can_route_set_document_by_name() {
let doc_id = Uuid::new_v4();
2025-07-28 10:49:34 -04:00
let route = Route::new(Include::All, Include::Some(doc_id.clone()), Include::All);
match route.msg_id {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
match route.doc_type {
Include::All => unreachable!("should be a specific value"),
Include::Some(result) => assert_eq!(result, doc_id),
}
match route.action {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
2025-07-25 11:08:47 -04:00
}
#[test]
fn can_route_set_document_by_id() {
let id = Uuid::new_v4();
let route = Route::new(Include::All, Include::Some(id.clone()), Include::All);
match route.msg_id {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
match route.doc_type {
Include::All => unreachable!("should be a specific value"),
Include::Some(result) => assert_eq!(result, id),
}
match route.action {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
}
#[test]
fn can_route_be_set_by_message_id() {
let id = Uuid::new_v4();
let route = Route::new(Include::Some(id.clone()), Include::All, Include::All);
match route.msg_id {
Include::All => unreachable!("should be a specific value"),
Include::Some(result) => assert_eq!(result, id),
}
match route.doc_type {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
}
match route.action {
Include::All => {}
Include::Some(_) => unreachable!("should have been all"),
2025-07-21 13:31:23 -04:00
}
}
}
#[derive(Clone)]
struct RouteRequest {
msg_id: Include<Uuid>,
doc_name: Include<String>,
action: Include<Action>,
}
impl RouteRequest {
fn new(msg_id: Include<Uuid>, doc_name: Include<String>, action: Include<Action>) -> Self {
Self {
msg_id: msg_id,
doc_name: doc_name,
action: action,
}
}
}
struct QueueData {
senders: HashMap<Uuid, Sender<Message>>,
names: HashMap<String, Uuid>,
2025-07-21 13:31:23 -04:00
routes: HashMap<RouteID, Vec<Uuid>>,
}
impl QueueData {
fn new() -> Self {
2025-07-17 09:12:23 -04:00
Self {
senders: HashMap::new(),
names: HashMap::new(),
2025-07-17 09:12:23 -04:00
routes: HashMap::new(),
}
}
2025-07-20 10:39:15 -04:00
fn get_doc_id<N>(&self, nameid: N) -> Result<Uuid, MTTError>
where
N: Into<NameID>,
{
let sender_id = match nameid.into() {
NameID::Name(name) => match self.names.get(&name) {
Some(id) => id.clone(),
None => return Err(MTTError::DocumentNotFound(name.clone())),
},
NameID::ID(id) => id.clone(),
};
if self.senders.contains_key(&sender_id) {
Ok(sender_id)
} else {
Err(MTTError::DocumentNotFound(sender_id.to_string()))
}
}
fn register(
&mut self,
tx: Sender<Message>,
name: String,
routes: Vec<RouteRequest>,
) -> Result<(), MTTError> {
2025-07-17 09:12:23 -04:00
let mut id = Uuid::new_v4();
while self.senders.contains_key(&id) {
id = Uuid::new_v4();
}
match self.get_doc_id(name.clone()) {
Ok(_) => return Err(MTTError::DocumentAlreadyExists(name)),
Err(_) => {}
}
let mut holder: HashMap<RouteID, Vec<Uuid>> = HashMap::new();
for route in routes.iter() {
let doc_type = match &route.doc_name {
Include::Some(doc_name) => {
if doc_name == &name {
Include::Some(id.clone())
} else {
match self.get_doc_id(doc_name.to_string()) {
Ok(doc_id) => Include::Some(doc_id),
Err(err) => return Err(err),
}
}
}
Include::All => Include::All,
};
let route_id: RouteID =
Route::new(route.msg_id.clone(), doc_type, route.action.clone()).into();
match self.routes.get(&route_id) {
Some(senders) => {
let mut addition = senders.clone();
addition.push(id.clone());
holder.insert(route_id, addition);
}
None => {
let senders = [id.clone()].to_vec();
holder.insert(route_id, senders);
}
}
}
self.senders.insert(id.clone(), tx);
self.names.insert(name.clone(), id.clone());
for (route_id, senders) in holder.iter() {
self.routes.insert(route_id.clone(), senders.clone());
}
Ok(())
}
fn send(&self, msg: Message) -> Result<(), MTTError> {
2025-07-21 13:31:23 -04:00
let doc_id: Include<Uuid> = match self.get_doc_id(msg.get_document_id()) {
Ok(id) => Include::Some(id.clone()),
2025-07-25 11:08:47 -04:00
Err(err) => {
2025-07-28 10:49:34 -04:00
let action: Action = msg.get_action().into();
if action == Action::Create {
2025-07-25 11:08:47 -04:00
Include::Some(Uuid::nil())
} else {
2025-07-28 10:49:34 -04:00
return Err(err);
2025-07-25 11:08:47 -04:00
}
2025-07-28 10:49:34 -04:00
}
2025-07-17 09:12:23 -04:00
};
2025-07-28 10:49:34 -04:00
let route = Route::new(
Include::Some(msg.get_message_id().clone()),
doc_id,
Include::Some(msg.get_action().into()),
);
2025-07-21 13:31:23 -04:00
for (send_route, send_ids) in self.routes.iter() {
if route == send_route.into() {
for send_id in send_ids {
let tx = self.senders.get(&send_id).unwrap();
2025-07-20 10:39:15 -04:00
tx.send(msg.clone()).unwrap();
}
}
}
Ok(())
}
}
#[cfg(test)]
mod queuedatas {
use super::support_test::TIMEOUT;
use super::*;
use std::sync::mpsc::RecvTimeoutError;
#[test]
fn can_document_be_registered() {
let mut queuedata = QueueData::new();
let (tx, rx) = channel();
let name = Uuid::new_v4().to_string();
let routes = [
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Query),
),
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Reply),
),
]
.to_vec();
queuedata.register(tx, name.clone(), routes).unwrap();
2025-08-05 09:56:48 -04:00
let msg1 = Message::new(name.clone(), MsgAction::Query(Query::new()));
let msg2 = Message::new(name.clone(), MsgAction::Reply(Reply::new()));
let msg3 = Message::new(name.clone(), MsgAction::Create(DocDef::new()));
queuedata.send(msg1.clone()).unwrap();
queuedata.send(msg2.clone()).unwrap();
queuedata.send(msg3.clone()).unwrap();
let result1 = rx.recv_timeout(TIMEOUT).unwrap();
let result2 = rx.recv_timeout(TIMEOUT).unwrap();
match rx.recv_timeout(TIMEOUT) {
Ok(_) => unreachable!("should have timed out"),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should have timed out"),
},
}
assert_eq!(result1.get_message_id(), msg1.get_message_id());
assert_eq!(result2.get_message_id(), msg2.get_message_id());
match result1.get_action() {
MsgAction::Query(_) => {}
_ => unreachable!("should have been a query"),
}
match result2.get_action() {
MsgAction::Reply(_) => {}
_ => unreachable!("should have been a query"),
}
}
#[test]
fn does_register_fail_on_duplicate_documents() {
let mut queuedata = QueueData::new();
let (tx1, _) = channel();
let (tx2, _) = channel();
let name = Uuid::new_v4().to_string();
queuedata
.register(tx1, name.to_string(), Vec::new())
.unwrap();
match queuedata.register(tx2, name.to_string(), Vec::new()) {
Ok(_) => unreachable!("duplicates should create an error"),
Err(err) => match err {
MTTError::DocumentAlreadyExists(result) => assert_eq!(result, name),
_ => unreachable!("should have been document does not exists"),
},
}
}
#[test]
fn does_bad_route_prevent_registration() {
let mut queuedata = QueueData::new();
let (tx, rx) = channel();
let good = "good";
let bad = Uuid::new_v4().to_string();
let routes = [
RouteRequest::new(
Include::All,
Include::Some(good.to_string()),
Include::Some(Action::Query),
),
RouteRequest::new(
Include::All,
Include::Some(bad.clone()),
Include::Some(Action::Reply),
),
]
.to_vec();
match queuedata.register(tx, good.to_string(), routes) {
Ok(_) => unreachable!("should produce an error"),
Err(err) => match err {
MTTError::DocumentNotFound(result) => assert_eq!(result, bad),
_ => unreachable!("Shouuld be document not found"),
},
}
assert_eq!(queuedata.senders.len(), 0, "should not add to senders");
assert_eq!(queuedata.names.len(), 0, "should not add to names");
assert_eq!(queuedata.routes.len(), 0, "should nor add to routes");
}
#[test]
fn is_sender_only_added_once_to_routes() {
let mut queuedata = QueueData::new();
let (tx, rx) = channel();
let name = "project";
let routes = [
RouteRequest::new(
Include::All,
Include::Some(name.to_string()),
Include::Some(Action::Query),
),
RouteRequest::new(
Include::All,
Include::Some(name.to_string()),
Include::Some(Action::Query),
),
]
.to_vec();
queuedata.register(tx, name.to_string(), routes).unwrap();
for senders in queuedata.routes.values() {
assert_eq!(senders.len(), 1, "should be no double entries");
}
}
#[test]
fn does_a_bad_document_name_fail() {
let docname = Uuid::new_v4().to_string();
let queuedata = QueueData::new();
2025-08-05 09:56:48 -04:00
let msg = Message::new(docname.clone(), MsgAction::Query(Query::new()));
match queuedata.send(msg) {
Ok(_) => unreachable!("should have been an error"),
Err(data) => match data {
MTTError::DocumentNotFound(doc) => assert_eq!(doc, docname),
_ => unreachable!("should have been a not found error"),
},
}
}
2025-07-17 09:12:23 -04:00
#[test]
fn is_send_okay_if_no_one_is_listening() {
let mut queuedata = QueueData::new();
let name = "something";
let (tx, _) = channel();
queuedata
.register(tx, name.to_string(), Vec::new())
.unwrap();
2025-07-28 10:49:34 -04:00
let msg = Message::new("something", MsgAction::Create(DocDef::new()));
2025-07-17 09:12:23 -04:00
match queuedata.send(msg) {
Ok(_) => {}
Err(err) => unreachable!("got {:?}: should not error", err),
}
}
#[test]
2025-07-20 10:39:15 -04:00
fn can_more_than_one_document_respond() {
2025-07-17 09:12:23 -04:00
let mut queuedata = QueueData::new();
2025-07-20 10:39:15 -04:00
let name1 = "task";
let name2 = "work";
2025-08-05 09:56:48 -04:00
let action = MsgAction::Query(Query::new());
2025-07-17 09:12:23 -04:00
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
let routes = [RouteRequest::new(
Include::All,
Include::Some(name1.to_string()),
Include::All,
)]
.to_vec();
queuedata
.register(tx1, name1.to_string(), routes.clone())
.unwrap();
queuedata
.register(tx2, name2.to_string(), routes.clone())
.unwrap();
2025-07-20 10:39:15 -04:00
let msg = Message::new(name1, action.clone());
2025-07-17 09:12:23 -04:00
queuedata.send(msg.clone()).unwrap();
2025-07-20 10:39:15 -04:00
let result1 = rx1.recv_timeout(TIMEOUT).unwrap();
let result2 = rx2.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result1.get_message_id(), msg.get_message_id());
assert_eq!(result1.get_message_id(), result2.get_message_id());
2025-07-17 09:12:23 -04:00
}
}
#[derive(Clone)]
struct Queue {
queue_data: Arc<RwLock<QueueData>>,
}
impl Queue {
fn new() -> Self {
Self {
queue_data: Arc::new(RwLock::new(QueueData::new())),
}
}
2025-07-25 11:08:47 -04:00
fn register(
&mut self,
tx: Sender<Message>,
name: String,
routes: Vec<RouteRequest>,
) -> Result<(), MTTError> {
2025-07-25 11:08:47 -04:00
let mut queuedata = self.queue_data.write().unwrap();
queuedata.register(tx, name, routes)
2025-07-25 11:08:47 -04:00
}
fn send(&self, msg: Message) -> Result<(), MTTError> {
let queuedata = self.queue_data.read().unwrap();
queuedata.send(msg)
}
}
#[cfg(test)]
mod queues {
use super::*;
#[test]
fn create_a_queue() {
Queue::new();
}
}
2025-07-28 10:49:34 -04:00
struct CreateDoc {
queue: Queue,
rx: Receiver<Message>,
}
impl CreateDoc {
fn new(queue: Queue, rx: Receiver<Message>) -> Self {
Self {
queue: queue,
rx: rx,
}
}
fn start(mut queue: Queue) {
let (tx, rx) = channel();
let routes = [RouteRequest::new(
Include::All,
Include::All,
Include::Some(Action::Create),
)]
.to_vec();
let id = queue.register(tx, "document".to_string(), routes).unwrap();
2025-07-28 10:49:34 -04:00
let doc = CreateDoc::new(queue, rx);
spawn(move || {
doc.listen();
});
}
fn listen(&self) {
loop {
let msg = self.rx.recv().unwrap();
DocumentFile::start(self.queue.clone(), msg);
2025-07-28 10:49:34 -04:00
}
}
}
2025-08-04 23:51:57 -04:00
#[derive(Clone, Debug, PartialEq)]
enum FieldType {
None,
StaticString,
2025-08-05 09:56:48 -04:00
Uuid,
}
2025-08-17 08:56:34 -04:00
impl FieldType {
fn get_default(&self) -> Field {
match self {
FieldType::None => Field::None,
2025-08-17 08:56:34 -04:00
FieldType::StaticString => "".into(),
FieldType::Uuid => Uuid::new_v4().into(),
}
}
}
2025-08-11 12:17:37 -04:00
impl From<&Field> for FieldType {
fn from(value: &Field) -> Self {
match value {
Field::None => Self::None,
2025-08-11 12:17:37 -04:00
Field::StaticString(_) => Self::StaticString,
Field::Uuid(_) => Self::Uuid,
}
}
}
2025-08-17 08:56:34 -04:00
#[cfg(test)]
mod fieldtypes {
use super::*;
#[test]
fn can_get_defaults_for_uuid() {
let ftype = FieldType::Uuid;
let mut ids: Vec<Uuid> = Vec::new();
for _ in 0..5 {
let result = ftype.get_default();
match result {
Field::Uuid(data) => {
2025-08-20 10:26:56 -04:00
assert!(
!ids.contains(&data),
"found duplicate id {:?} in {:?}",
data,
ids
);
2025-08-17 08:56:34 -04:00
ids.push(data.clone());
2025-08-20 10:26:56 -04:00
}
2025-08-17 08:56:34 -04:00
_ => unreachable!("got {:?}: should have been uuid", result),
}
}
}
#[test]
fn can_get_defaults_for_static_string() {
let ftype = FieldType::StaticString;
let result = ftype.get_default();
match result {
Field::StaticString(data) => assert_eq!(data, ""),
_ => unreachable!("got {:?}: should have been static string", result),
}
}
}
2025-09-05 23:52:24 -04:00
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
2025-08-05 09:56:48 -04:00
enum Field {
None,
2025-08-05 09:56:48 -04:00
StaticString(String),
Uuid(Uuid),
}
impl Field {
fn get_type(&self) -> FieldType {
2025-08-11 12:17:37 -04:00
self.into()
2025-08-05 09:56:48 -04:00
}
}
2025-08-05 11:27:18 -04:00
impl From<String> for Field {
fn from(value: String) -> Self {
Self::StaticString(value)
}
}
impl From<&str> for Field {
fn from(value: &str) -> Self {
Self::from(value.to_string())
}
}
2025-08-05 11:27:18 -04:00
impl From<Uuid> for Field {
fn from(value: Uuid) -> Self {
Self::Uuid(value)
}
}
2025-08-05 09:56:48 -04:00
#[cfg(test)]
mod fields {
use super::*;
#[test]
fn can_create_static_string() {
2025-08-05 11:27:18 -04:00
let data = Uuid::new_v4().to_string();
let result: Field = data.clone().into();
match result.clone() {
Field::StaticString(output) => assert_eq!(output, data),
_ => unreachable!("got {:?}: should have been static string", result),
}
assert_eq!(result.get_type(), FieldType::StaticString);
}
#[test]
fn can_create_from_str() {
let holder = ["one", "two"];
for data in holder.into_iter() {
let result: Field = data.into();
match result.clone() {
Field::StaticString(output) => assert_eq!(output, data),
_ => unreachable!("got {:?}: should have been static string", result),
}
assert_eq!(result.get_type(), FieldType::StaticString);
}
}
#[test]
fn create_from_uuid() {
2025-08-05 09:56:48 -04:00
let data = Uuid::new_v4();
2025-08-05 11:27:18 -04:00
let result: Field = data.clone().into();
match result.clone() {
Field::Uuid(output) => assert_eq!(output, data),
_ => unreachable!("got {:?}: should have been uuid", result),
}
assert_eq!(result.get_type(), FieldType::Uuid);
2025-08-05 09:56:48 -04:00
}
}
2025-08-04 23:51:57 -04:00
#[derive(Clone, Debug)]
struct FieldSetting {
fieldtype: FieldType,
use_default: bool,
2025-09-05 23:52:24 -04:00
use_unique: bool,
default_value: Field,
2025-09-05 23:52:24 -04:00
unique: HashSet<Field>,
2025-08-04 23:51:57 -04:00
}
impl FieldSetting {
fn new(ftype: FieldType) -> Self {
2025-08-20 10:26:56 -04:00
Self {
fieldtype: ftype,
use_default: false,
2025-09-05 23:52:24 -04:00
use_unique: false,
default_value: Field::None,
2025-09-05 23:52:24 -04:00
unique: HashSet::new(),
2025-08-20 10:26:56 -04:00
}
2025-08-04 23:51:57 -04:00
}
fn set_default(&mut self, value: Option<Field>) -> Result<(), MTTError> {
match value {
Some(data) => {
match self.validate(Some(data.clone())) {
Ok(_) => {}
Err(err) => return Err(err),
}
self.default_value = data.clone();
}
None => self.default_value = Field::None,
}
self.use_default = true;
Ok(())
2025-08-20 10:26:56 -04:00
}
2025-08-28 13:18:18 -04:00
2025-09-05 23:52:24 -04:00
fn set_unique(&mut self) -> Result<(), MTTError> {
self.use_unique = true;
Ok(())
}
fn use_unique_value(&mut self, field: Field) {
if self.use_unique {
self.unique.insert(field);
}
}
2025-09-11 23:49:25 -04:00
fn remove_unique_value(&mut self, field: &Field) {
if self.use_unique {
self.unique.remove(field);
}
}
fn validate(&self, value: Option<Field>) -> Result<Field, MTTError> {
2025-08-30 10:58:34 -04:00
match value {
Some(data) => {
let vft: FieldType = (&data).into();
if vft != self.fieldtype {
return Err(MTTError::DocumentFieldWrongDataType(
self.fieldtype.clone(),
vft,
));
}
2025-09-05 23:52:24 -04:00
if self.use_unique {
if self.unique.get(&data).is_some() {
2025-09-11 23:49:25 -04:00
return Err(MTTError::FieldDuplicate("".to_string(), data.clone()));
2025-09-05 23:52:24 -04:00
}
}
2025-08-30 10:58:34 -04:00
Ok(data.clone())
}
None => {
if self.use_default {
match self.default_value {
Field::None => Ok(self.fieldtype.get_default()),
_ => Ok(self.default_value.clone()),
}
} else {
Err(MTTError::DocumentFieldMissing("".to_string()))
}
}
2025-08-28 13:18:18 -04:00
}
}
2025-08-04 23:51:57 -04:00
}
#[cfg(test)]
mod fieldsettings {
use super::*;
2025-08-28 13:18:18 -04:00
#[test]
fn validates_field_type() {
2025-08-28 13:18:18 -04:00
let fset = FieldSetting::new(FieldType::Uuid);
let value: Field = Uuid::new_v4().into();
match fset.validate(Some(value.clone())) {
2025-08-28 13:18:18 -04:00
Ok(data) => assert_eq!(data, value),
Err(err) => unreachable!("got {:?}: should have gotten a value", err),
}
}
#[test]
fn validates_for_bad_field_type() {
2025-08-28 13:18:18 -04:00
let fset = FieldSetting::new(FieldType::Uuid);
let value: Field = "text".into();
match fset.validate(Some(value)) {
2025-08-28 13:18:18 -04:00
Ok(data) => unreachable!("got {:?}: should have gotten an error", data),
Err(err) => match err {
MTTError::DocumentFieldWrongDataType(expected, got) => {
assert_eq!(expected, FieldType::Uuid);
assert_eq!(got, FieldType::StaticString);
}
_ => unreachable!("got {:?}: should have gotten a value", err),
},
}
}
2025-08-30 10:58:34 -04:00
#[test]
fn no_default_returns_error() {
let fset = FieldSetting::new(FieldType::Uuid);
match fset.validate(None) {
2025-08-30 10:58:34 -04:00
Ok(data) => unreachable!("got {:?}: should have gotten an error", data),
Err(err) => match err {
MTTError::DocumentFieldMissing(data) => assert_eq!(data, ""),
_ => unreachable!("got {:?}: should have gotten a value", err),
},
}
}
2025-08-04 23:51:57 -04:00
#[test]
fn returns_value_if_default_is_set() {
let mut fset = FieldSetting::new(FieldType::StaticString);
fset.set_default(None);
match fset.validate(None) {
Ok(data) => assert_eq!(data, "".into()),
Err(err) => unreachable!("got {:?}: should have gotten a value", err),
2025-08-20 10:26:56 -04:00
}
}
#[test]
fn returns_default_value() {
let mut fset = FieldSetting::new(FieldType::StaticString);
let input = "fred";
fset.set_default(Some(input.into()));
match fset.validate(None) {
Ok(data) => assert_eq!(data, input.into()),
Err(err) => unreachable!("got {:?}: should have gotten a value", err),
2025-08-04 23:51:57 -04:00
}
}
2025-09-05 23:52:24 -04:00
#[test]
fn validates_for_unique_value() {
2025-09-05 23:52:24 -04:00
let mut fset = FieldSetting::new(FieldType::Uuid);
let field: Field = Uuid::new_v4().into();
fset.set_unique();
fset.use_unique_value(field.clone());
match fset.validate(Some(field.clone())) {
2025-09-05 23:52:24 -04:00
Ok(data) => unreachable!("got {:?}: should have been error", data),
Err(err) => match err {
2025-09-11 23:49:25 -04:00
MTTError::FieldDuplicate(key, result) => {
assert_eq!(key, "");
assert_eq!(result, field);
}
2025-09-05 23:52:24 -04:00
_ => unreachable!("got {:?}: should be a duplicate field", err),
},
}
}
2025-08-04 23:51:57 -04:00
}
2025-08-05 09:56:48 -04:00
#[derive(Clone, Debug)]
2025-08-05 11:27:18 -04:00
struct Addition {
data: Document,
2025-08-05 11:27:18 -04:00
}
2025-08-05 09:56:48 -04:00
impl Addition {
fn new() -> Self {
2025-08-05 11:27:18 -04:00
Self {
data: Document::new(),
2025-08-05 11:27:18 -04:00
}
}
fn add_field<F>(&mut self, name: String, field: F)
where
F: Into<Field>,
{
self.data.add_field(name, field);
2025-08-05 11:27:18 -04:00
}
fn get_field(&self, name: &str) -> Option<&Field> {
self.data.get_field(name)
}
fn get_document(&self) -> Document {
self.data.clone()
2025-08-05 11:27:18 -04:00
}
}
#[cfg(test)]
mod additions {
use super::*;
#[test]
fn can_add_static_string() {
let mut add = Addition::new();
let name = Uuid::new_v4().to_string();
let data = Uuid::new_v4().to_string();
add.add_field(name.clone(), data.clone());
let result = add.get_field(&name).unwrap();
match result {
Field::StaticString(result) => assert_eq!(result, &data),
_ => unreachable!("got {:?}: should have received static string", result),
}
}
fn can_add_uuid() {
let mut add = Addition::new();
let name = Uuid::new_v4().to_string();
let data = Uuid::new_v4();
add.add_field(name.clone(), data.clone());
let result = add.get_field(&name).unwrap();
match result {
Field::Uuid(result) => assert_eq!(result, &data),
_ => unreachable!("got {:?}: should have received uuid", result),
}
2025-08-05 09:56:48 -04:00
}
fn can_get_document() {
let mut add = Addition::new();
let name = Uuid::new_v4().to_string();
let data = Uuid::new_v4();
add.add_field(name.clone(), data.clone());
let doc: Document = add.get_document();
match doc.get_field(&name).unwrap() {
Field::Uuid(result) => assert_eq!(result, &data),
_ => unreachable!("should have received uuid"),
}
}
2025-08-05 09:56:48 -04:00
}
#[derive(Clone, Debug)]
enum IndexType {
Index,
Unique,
}
impl IndexType {
fn create_index(&self) -> Index {
match self {
Self::Index => Index::new(),
Self::Unique => Index::new_unique(),
}
}
}
#[derive(Clone, Debug)]
struct DocDef {
2025-08-04 23:51:57 -04:00
fields: HashMap<String, FieldSetting>,
indexes: HashMap<String, IndexType>,
}
2025-07-28 10:49:34 -04:00
impl DocDef {
fn new() -> Self {
Self {
fields: HashMap::new(),
indexes: HashMap::new(),
}
}
2025-08-04 23:51:57 -04:00
fn add_field(&mut self, name: String, ftype: FieldType) {
self.fields.insert(name, FieldSetting::new(ftype));
}
2025-08-04 23:51:57 -04:00
fn get_field(&self, name: &str) -> Result<&FieldSetting, MTTError> {
match self.fields.get(name) {
Some(data) => Ok(data),
None => Err(MTTError::DocumentFieldNotFound(name.to_string())),
}
}
2025-08-10 16:20:47 -04:00
2025-08-21 11:51:56 -04:00
fn get_field_mut(&mut self, field_name: &str) -> Result<&mut FieldSetting, MTTError> {
match self.fields.get_mut(field_name) {
Some(data) => Ok(data),
None => return Err(MTTError::DocumentFieldNotFound(field_name.to_string())),
}
}
fn validate(&self, field_name: &str, value: Option<Field>) -> Result<Field, MTTError> {
2025-09-11 23:49:25 -04:00
let setting = match self.get_field(field_name) {
Ok(data) => data,
Err(err) => return Err(err),
};
setting.validate(value)
2025-09-11 23:49:25 -04:00
}
fn set_default(&mut self, field_name: &str, value: Option<Field>) -> Result<(), MTTError> {
2025-08-21 11:51:56 -04:00
let setting = match self.get_field_mut(field_name) {
Ok(data) => data,
Err(err) => return Err(err),
};
match setting.set_default(value) {
Ok(_) => Ok(()),
Err(err) => Err(err),
2025-08-21 11:51:56 -04:00
}
2025-08-21 10:38:13 -04:00
}
fn add_index(&mut self, field_name: String, index_type: IndexType) -> Result<(), MTTError> {
let setting = match self.get_field(&field_name) {
Ok(_) => {}
Err(err) => return Err(err),
};
self.indexes.insert(field_name, index_type);
Ok(())
}
2025-09-15 09:32:45 -04:00
fn create_indexes(&self) -> Indexes {
Indexes::new(&self.indexes)
2025-09-11 23:49:25 -04:00
}
2025-08-12 10:30:46 -04:00
fn iter(&self) -> impl Iterator<Item = (&String, &FieldSetting)> {
self.fields.iter()
2025-08-10 16:20:47 -04:00
}
}
#[cfg(test)]
mod docdefs {
use super::*;
#[test]
fn can_field_be_added() {
let mut docdef = DocDef::new();
let name = Uuid::new_v4().to_string();
2025-08-05 09:56:48 -04:00
let field_type = FieldType::Uuid;
2025-08-04 23:51:57 -04:00
docdef.add_field(name.clone(), field_type.clone());
let result = docdef.get_field(name.as_str()).unwrap();
match result.validate(Some(Uuid::new_v4().into())) {
Ok(_) => {}
Err(err) => unreachable!("got {:?}: should have been a value", err),
}
}
#[test]
fn produces_error_for_bad_fields() {
let docdef = DocDef::new();
let name = Uuid::new_v4().to_string();
match docdef.get_field(name.as_str()) {
Ok(_) => unreachable!("should return non existant field error"),
Err(err) => match err {
MTTError::DocumentFieldNotFound(data) => assert_eq!(data, name),
_ => unreachable!("got {:?}: should have been document field not found", err),
},
}
}
#[test]
fn can_multiple_fields_be_added() {
let mut docdef = DocDef::new();
let names = ["one", "two", "three"];
2025-08-04 23:51:57 -04:00
let field_type = FieldType::StaticString;
for name in names.iter() {
2025-08-04 23:51:57 -04:00
docdef.add_field(name.to_string(), field_type.clone());
}
for name in names.iter() {
let result = docdef.get_field(name).unwrap();
match result.validate(Some("".into())) {
Ok(_) => {}
Err(err) => unreachable!("got {:?}: should have been a value", err),
}
}
2025-07-28 10:49:34 -04:00
}
2025-08-20 10:26:56 -04:00
#[test]
fn can_change_field_default_to_function() {
let mut docdef = DocDef::new();
let name = "defaultfunction";
docdef.add_field(name.to_string(), FieldType::StaticString);
docdef.set_default(name, None);
match docdef.get_field(name).unwrap().validate(None) {
Ok(data) => match data {
Field::StaticString(result) => assert_eq!(result, ""),
_ => unreachable!("got {:?}: should return a static string", data),
},
Err(err) => unreachable!("got {:?}: should return a value", err),
2025-08-20 10:26:56 -04:00
}
}
2025-08-21 11:51:56 -04:00
#[test]
fn does_set_default_function_error_on_bad_field_name() {
let mut docdef = DocDef::new();
let field_name = Uuid::new_v4().to_string();
match docdef.set_default(field_name.as_str(), None) {
2025-08-21 11:51:56 -04:00
Ok(_) => unreachable!("should be an error"),
Err(err) => match err {
MTTError::DocumentFieldNotFound(data) => assert_eq!(data, field_name),
_ => unreachable!("got {:?}: should have been field not found", err),
},
}
}
#[test]
fn does_set_default_value_error_on_bad_field_name() {
let mut docdef = DocDef::new();
let field_name = Uuid::new_v4().to_string();
match docdef.set_default(field_name.as_str(), Some(Uuid::nil().into())) {
2025-08-21 11:51:56 -04:00
Ok(_) => unreachable!("should be an error"),
Err(err) => match err {
MTTError::DocumentFieldNotFound(data) => assert_eq!(data, field_name),
_ => unreachable!("got {:?}: should have been field not found", err),
},
}
}
#[test]
fn does_set_default_value_error_on_bad_field_type() {
let mut docdef = DocDef::new();
let name = "defaultvalue";
docdef.add_field(name.to_string(), FieldType::Uuid);
match docdef.set_default(name, Some("".into())) {
Ok(data) => unreachable!("got {:?}, should be an error", data),
2025-08-21 11:51:56 -04:00
Err(err) => match err {
MTTError::DocumentFieldWrongDataType(expected, got) => {
assert_eq!(expected, FieldType::Uuid);
assert_eq!(got, FieldType::StaticString);
}
_ => unreachable!("got {:?}: should have been field not found", err),
},
}
}
2025-07-28 10:49:34 -04:00
}
#[derive(Clone, Debug)]
2025-08-11 12:17:37 -04:00
enum Operand {
Equal,
}
2025-09-13 12:45:20 -04:00
impl Operand {
fn validate(&self, x: &Field, y: &Field) -> bool {
2025-09-13 12:45:20 -04:00
match self {
Self::Equal => x == y,
}
}
}
#[cfg(test)]
mod operands {
use super::*;
#[test]
fn equals_true() {
let data: Field = Uuid::new_v4().into();
assert!(Operand::Equal.validate(&data, &data));
2025-09-13 12:45:20 -04:00
}
#[test]
fn equals_false() {
let x: Field = Uuid::new_v4().into();
let mut y: Field = Uuid::new_v4().into();
while x == y {
y = Uuid::new_v4().into();
}
assert!(!Operand::Equal.validate(&x, &y));
2025-09-13 12:45:20 -04:00
}
}
2025-08-11 12:17:37 -04:00
#[derive(Clone, Debug)]
struct Specifier {
field_name: String,
operation: Operand,
value: Field,
}
impl Specifier {
fn new<F>(name: String, op: Operand, value: F) -> Self
where
F: Into<Field>,
{
Self {
field_name: name,
operation: op,
value: value.into(),
}
}
2025-09-13 12:45:20 -04:00
fn which_field(&self) -> String {
self.field_name.clone()
}
fn validate(&self, field: &Field) -> bool {
self.operation.validate(field, &self.value)
2025-09-13 12:45:20 -04:00
}
2025-08-11 12:17:37 -04:00
}
#[derive(Clone, Debug)]
struct Query {
specifiers: Vec<Specifier>,
}
2025-07-28 10:49:34 -04:00
2025-08-05 09:56:48 -04:00
impl Query {
2025-07-28 10:49:34 -04:00
fn new() -> Self {
2025-08-11 12:17:37 -04:00
Self {
specifiers: Vec::new(),
}
}
fn add_specifier<F>(&mut self, name: String, op: Operand, value: F)
where
F: Into<Field>,
{
let spec = Specifier::new(name, op, value);
self.specifiers.push(spec);
}
2025-08-28 10:15:28 -04:00
fn iter(&self) -> impl Iterator<Item = &Specifier> {
self.specifiers.iter()
2025-07-28 10:49:34 -04:00
}
}
#[derive(Clone, Debug)]
struct Reply {
data: Vec<Document>,
}
2025-08-05 09:56:48 -04:00
impl Reply {
fn new() -> Self {
Self { data: Vec::new() }
}
fn add(&mut self, doc: Document) {
self.data.push(doc);
}
2025-08-05 09:56:48 -04:00
fn len(&self) -> usize {
self.data.len()
}
2025-08-11 12:17:37 -04:00
fn iter(&self) -> impl Iterator<Item = &Document> {
self.data.iter()
2025-08-05 09:56:48 -04:00
}
}
#[cfg(test)]
mod replies {
use super::*;
#[test]
fn is_new_empty() {
let reply = Reply::new();
assert_eq!(reply.len(), 0, "should have no records");
}
#[test]
fn can_add_documents() {
let mut reply = Reply::new();
let doc = Document::new();
reply.add(doc.clone());
assert_eq!(reply.len(), 1);
reply.add(doc.clone());
assert_eq!(reply.len(), 2);
}
#[test]
fn can_retrieve_documents() {
let fieldname = "field".to_string();
let mut doc1 = Document::new();
doc1.add_field(fieldname.clone(), "one");
let mut doc2 = Document::new();
doc2.add_field(fieldname.clone(), "two");
let mut reply = Reply::new();
reply.add(doc1);
reply.add(doc2);
2025-08-11 12:17:37 -04:00
let mut reply_iter = reply.iter();
let mut result1 = reply_iter.next().unwrap();
match result1.get_field(&fieldname).unwrap() {
Field::StaticString(output) => assert_eq!(output, "one"),
_ => unreachable!("got {:?}: should have been static string", result1),
}
let result2 = reply_iter.next().unwrap();
match result2.get_field(&fieldname).unwrap() {
Field::StaticString(output) => assert_eq!(output, "two"),
_ => unreachable!("got {:?}: should have been static string", result2),
}
match reply_iter.next() {
None => {}
Some(_) => unreachable!("should be out of data"),
}
2025-08-05 09:56:48 -04:00
}
}
#[derive(Clone, Debug)]
2025-07-25 11:08:47 -04:00
struct Document {
2025-08-05 09:56:48 -04:00
data: HashMap<String, Field>,
}
impl Document {
fn new() -> Self {
Self {
data: HashMap::new(),
}
}
fn add_field<F>(&mut self, name: String, field: F)
where
F: Into<Field>,
{
self.data.insert(name, field.into());
}
fn get_field(&self, name: &str) -> Option<&Field> {
self.data.get(name)
}
2025-08-12 10:30:46 -04:00
fn iter(&self) -> impl Iterator<Item = (&String, &Field)> {
self.data.iter()
}
}
#[cfg(test)]
mod documents {
use super::*;
#[test]
fn can_add_static_string() {
let mut add = Document::new();
let name = Uuid::new_v4().to_string();
let data = Uuid::new_v4().to_string();
add.add_field(name.clone(), data.clone());
let result = add.get_field(&name).unwrap();
match result {
Field::StaticString(result) => assert_eq!(result, &data),
_ => unreachable!("got {:?}: should have received static string", result),
}
}
fn can_add_uuid() {
let mut add = Document::new();
let name = Uuid::new_v4().to_string();
let data = Uuid::new_v4();
add.add_field(name.clone(), data.clone());
let result = add.get_field(&name).unwrap();
match result {
Field::Uuid(result) => assert_eq!(result, &data),
_ => unreachable!("got {:?}: should have received uuid", result),
}
}
2025-08-05 09:56:48 -04:00
}
2025-08-26 08:06:22 -04:00
#[derive(Clone, Debug)]
struct Update {
query: Query,
values: Document,
}
impl Update {
fn new() -> Self {
Self {
query: Query::new(),
values: Document::new(),
}
}
2025-08-28 10:15:28 -04:00
fn get_query(&self) -> &Query {
&self.query
}
fn get_query_mut(&mut self) -> &mut Query {
&mut self.query
}
2025-08-28 10:15:28 -04:00
fn get_values(&self) -> &Document {
&self.values
}
fn get_values_mut(&mut self) -> &mut Document {
&mut self.values
}
2025-08-26 08:06:22 -04:00
}
2025-09-05 23:52:24 -04:00
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
2025-08-28 10:15:28 -04:00
struct Oid {
oid: Uuid,
}
impl Oid {
fn new() -> Self {
Self {
oid: Uuid::new_v4(),
}
}
}
2025-09-13 12:45:20 -04:00
struct Index {
data: HashMap<Field, HashSet<Oid>>,
unique: bool,
2025-09-13 12:45:20 -04:00
}
impl Index {
fn new() -> Self {
Self {
data: HashMap::new(),
unique: false,
2025-09-13 12:45:20 -04:00
}
}
fn new_unique() -> Self {
Self {
data: HashMap::new(),
unique: true,
}
}
fn add(&mut self, field: Field, oid: Oid) -> Result<(), MTTError> {
2025-09-13 12:45:20 -04:00
let oids = match self.data.get_mut(&field) {
Some(data) => data,
None => {
self.data.insert(field.clone(), HashSet::new());
2025-09-13 12:45:20 -04:00
self.data.get_mut(&field).unwrap()
}
};
if self.unique && oids.len() > 0 {
return Err(MTTError::FieldDuplicate("".to_string(), field));
} else {
oids.insert(oid);
}
Ok(())
2025-09-13 12:45:20 -04:00
}
fn get(&self, spec: &Specifier) -> Vec<Oid> {
let mut output = Vec::new();
for (field, oids) in self.data.iter() {
if spec.validate(field) {
2025-09-13 12:45:20 -04:00
for oid in oids.iter() {
output.push(oid.clone());
}
}
}
output
}
fn remove(&mut self, field: &Field, oid: &Oid) {
match self.data.get_mut(field) {
Some(oids) => {
oids.remove(oid);
if oids.len() == 0 {
self.data.remove(field);
}
}
None => {}
};
}
fn validate(&self, field: &Field) -> Result<(), MTTError> {
if self.unique {
match self.data.get(field) {
Some(_) => return Err(MTTError::FieldDuplicate("".to_string(), field.clone())),
None => {}
}
}
Ok(())
}
2025-09-13 12:45:20 -04:00
}
2025-09-15 09:32:45 -04:00
struct Indexes {
data: HashMap<String, Index>,
}
impl Indexes {
fn new(settings: &HashMap<String, IndexType>) -> Self {
let mut output = HashMap::new();
for (key, value) in settings.iter() {
output.insert(key.clone(), value.create_index());
}
Self { data: output }
}
fn add_to_index(&mut self, field_name: &str, field: Field, oid: Oid) {
let index = match self.data.get_mut(field_name) {
Some(data) => data,
None => return,
};
index.add(field, oid);
}
fn remove_from_index(&mut self, field_name: &str, field: &Field, oid: &Oid) {
let index = match self.data.get_mut(field_name) {
Some(data) => data,
None => return,
};
index.remove(field, oid);
}
fn validate(&self, field_name: &str, value: &Field) -> Result<(), MTTError> {
match self.data.get(field_name) {
Some(index) => match index.validate(value) {
Ok(_) => {}
Err(err) => return Err(err),
},
None => {}
}
Ok(())
}
}
2025-09-13 12:45:20 -04:00
#[cfg(test)]
mod indexes {
use super::*;
fn get_fields(count: usize) -> Vec<Field> {
let mut output = Vec::new();
while output.len() < count {
let field: Field = Uuid::new_v4().into();
if !output.contains(&field) {
output.push(field);
}
}
output
}
fn get_oids(count: usize) -> Vec<Oid> {
let mut output = Vec::new();
while output.len() < count {
let oid = Oid::new();
if !output.contains(&oid) {
output.push(oid);
}
}
output
}
#[test]
fn add_to_index() {
let mut index = Index::new();
let count = 3;
let fields = get_fields(count);
let oids = get_oids(count);
for i in 0..count {
index.add(fields[i].clone(), oids[i].clone());
}
for i in 0..count {
let spec = Specifier::new("stuff".to_string(), Operand::Equal, fields[i].clone());
let result = index.get(&spec);
assert_eq!(result.len(), 1);
assert_eq!(result[0], oids[i]);
}
}
#[test]
fn index_can_handle_multiple_entries() {
let mut index = Index::new();
let count = 3;
let fields = get_fields(1);
let oids = get_oids(count);
for i in 0..count {
index.add(fields[0].clone(), oids[i].clone());
}
let spec = Specifier::new("unimportant".to_string(), Operand::Equal, fields[0].clone());
let result = index.get(&spec);
assert_eq!(result.len(), 3);
for oid in oids {
assert!(result.contains(&oid));
}
}
#[test]
fn can_remove_oid() {
let mut index = Index::new();
let count = 3;
let pos = 1;
let fields = get_fields(1);
let oids = get_oids(count);
for i in 0..count {
index.add(fields[0].clone(), oids[i].clone());
}
index.remove(&fields[0], &oids[pos]);
let spec = Specifier::new("x".to_string(), Operand::Equal, fields[0].clone());
let result = index.get(&spec);
assert!(!result.contains(&oids[pos]), "should have removed oid");
}
#[test]
fn are_empty_indexes_removed() {
let mut index = Index::new();
let field: Field = Uuid::new_v4().into();
let oid = Oid::new();
index.add(field.clone(), oid.clone());
index.remove(&field, &oid);
assert_eq!(index.data.len(), 0);
}
#[test]
fn do_unique_indexes_error_on_duplicates() {
let mut index = Index::new_unique();
let field: Field = "fred".into();
let oids = get_oids(2);
index.add(field.clone(), oids[0].clone()).unwrap();
match index.add(field.clone(), oids[0].clone()) {
Ok(_) => unreachable!("should have been an error"),
Err(err) => match err {
MTTError::FieldDuplicate(field_name, value) => {
assert_eq!(field_name, "");
assert_eq!(value, field);
}
_ => unreachable!("got {:?}: should have been duplicate field", err),
},
}
}
#[test]
fn index_returns_validate() {
let mut index = Index::new();
let field: Field = "stuff".into();
let oid = Oid::new();
index.add(field.clone(), oid).unwrap();
match index.validate(&field) {
Ok(_) => {}
Err(err) => unreachable!("got {:?}: should have returned without issue", err),
}
}
#[test]
fn unique_return_duplicate_error() {
let mut index = Index::new_unique();
let field: Field = "fred".into();
let oid = Oid::new();
index.add(field.clone(), oid).unwrap();
match index.validate(&field) {
Ok(_) => unreachable!("should have gotten a duplication error"),
Err(err) => match err {
MTTError::FieldDuplicate(field_name, value) => {
assert_eq!(field_name, "");
assert_eq!(value, field);
}
_ => unreachable!("got {:?}: should have been duplicate field", err),
},
}
}
2025-09-13 12:45:20 -04:00
}
struct DocumentFile {
docdef: DocDef,
2025-08-28 10:15:28 -04:00
docs: HashMap<Oid, Document>,
2025-09-15 09:32:45 -04:00
indexes: Indexes,
2025-07-25 11:08:47 -04:00
queue: Queue,
rx: Receiver<Message>,
}
impl DocumentFile {
fn new(queue: Queue, rx: Receiver<Message>, docdef: DocDef) -> Self {
2025-07-25 11:08:47 -04:00
Self {
docdef: docdef.clone(),
2025-08-28 10:15:28 -04:00
docs: HashMap::new(),
indexes: docdef.create_indexes(),
2025-07-25 11:08:47 -04:00
queue: queue,
rx: rx,
}
}
fn start(mut queue: Queue, msg: Message) {
2025-07-25 11:08:47 -04:00
let (tx, rx) = channel();
let name = match msg.get_document_id() {
NameID::Name(name) => name.clone(),
NameID::ID(id) => id.to_string(),
};
2025-08-03 12:21:45 -04:00
let routes = [
2025-08-09 08:03:36 -04:00
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Addition),
),
2025-08-03 12:21:45 -04:00
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Query),
),
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Show),
),
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Update),
),
2025-08-03 12:21:45 -04:00
]
.to_vec();
match queue.register(tx, name, routes) {
Ok(_) => {}
Err(err) => {
let error = msg.response(err);
queue.send(error).unwrap();
return;
}
}
let action = msg.get_action();
let docdef = match action {
MsgAction::Create(data) => data.clone(),
_ => unreachable!("got {:?}: should have been a create message", action),
};
let mut doc = DocumentFile::new(queue.clone(), rx, docdef);
2025-07-25 11:08:47 -04:00
spawn(move || {
doc.listen();
});
let reply = msg.response(Reply::new());
queue.send(reply).unwrap();
2025-07-25 11:08:47 -04:00
}
fn listen(&mut self) {
2025-07-25 11:08:47 -04:00
loop {
let msg = self.rx.recv().unwrap();
let result = match msg.get_action() {
MsgAction::Addition(data) => self.add_document(data),
MsgAction::Query(query) => self.query(query),
2025-08-28 10:15:28 -04:00
MsgAction::Update(update) => self.update(update),
_ => Reply::new().into(),
};
self.queue.send(msg.response(result)).unwrap();
2025-07-25 11:08:47 -04:00
}
}
2025-08-13 11:17:51 -04:00
fn get_docdef(&self) -> &DocDef {
&self.docdef
}
2025-08-28 10:15:28 -04:00
fn get_documents<'a>(&self) -> impl Iterator<Item = (&Oid, &Document)> {
2025-08-11 12:17:37 -04:00
self.docs.iter()
}
2025-09-15 09:32:45 -04:00
fn validate(&self, field_name: &str, value: Option<Field>) -> Result<Field, MTTError> {
let output = match self.docdef.validate(field_name, value) {
Ok(data) => data,
Err(err) => return Err(err),
};
match self.indexes.validate(field_name, &output) {
Ok(_) => {}
Err(err) => return Err(err),
}
Ok(output)
}
2025-09-11 23:49:25 -04:00
fn add_field_to_error(key: String, err: MTTError) -> MTTError {
match err {
MTTError::DocumentFieldMissing(_) => MTTError::DocumentFieldMissing(key),
MTTError::FieldDuplicate(_, field) => MTTError::FieldDuplicate(key, field.clone()),
_ => err.clone(),
}
}
2025-09-15 09:32:45 -04:00
fn add_to_index(&mut self, field_name: &str, field: Field, oid: Oid) {
self.indexes.add_to_index(field_name, field, oid)
}
fn remove_from_index(&mut self, field_name: &str, field: &Field, oid: &Oid) {
self.indexes.remove_from_index(field_name, field, oid);
}
fn add_document(&mut self, addition: &Addition) -> MsgAction {
2025-08-10 16:20:47 -04:00
let mut holder = Document::new();
let doc = addition.get_document();
2025-08-12 10:30:46 -04:00
for (key, value) in doc.iter() {
2025-09-15 09:32:45 -04:00
match self.validate(key, Some(value.clone())) {
Ok(data) => {
holder.add_field(key.clone(), value.clone());
}
2025-09-11 23:49:25 -04:00
Err(err) => return Self::add_field_to_error(key.to_string(), err).into(),
2025-08-10 16:20:47 -04:00
}
}
2025-08-20 10:26:56 -04:00
for (key, value) in self.docdef.iter() {
2025-08-10 16:20:47 -04:00
match holder.get_field(key) {
Some(_) => {}
2025-09-15 09:32:45 -04:00
None => match self.validate(key, None) {
Ok(data) => holder.add_field(key.clone(), data.clone()),
2025-09-11 23:49:25 -04:00
Err(err) => return Self::add_field_to_error(key.to_string(), err).into(),
2025-08-20 10:26:56 -04:00
},
}
}
2025-08-28 10:15:28 -04:00
let mut oid = Oid::new();
while self.docs.contains_key(&oid) {
oid = Oid::new();
}
2025-09-15 09:32:45 -04:00
self.docs.insert(oid.clone(), holder.clone());
2025-09-09 09:05:38 -04:00
for (key, value) in holder.iter() {
2025-09-15 09:32:45 -04:00
self.add_to_index(key, value.clone(), oid.clone());
2025-09-09 09:05:38 -04:00
}
2025-08-10 16:20:47 -04:00
let mut reply = Reply::new();
reply.add(holder);
reply.into()
}
2025-08-28 10:15:28 -04:00
fn run_query(&self, query: &Query) -> Result<Vec<Oid>, MTTError> {
let mut reply = Reply::new();
for specifier in query.iter() {
2025-09-15 09:32:45 -04:00
match self.validate(&specifier.field_name, Some(specifier.value.clone())) {
2025-09-11 23:49:25 -04:00
Ok(_) => {}
Err(err) => match err {
MTTError::FieldDuplicate(_, _) => {}
_ => return Err(err),
},
2025-08-28 10:15:28 -04:00
}
}
let mut result = Vec::new();
for (oid, doc) in self.get_documents() {
let mut output = true;
for specifier in query.iter() {
let value = doc.get_field(&specifier.field_name).unwrap();
if value != &specifier.value {
output = false;
}
}
if output {
result.push(oid.clone());
}
}
Ok(result)
}
fn query(&self, query: &Query) -> MsgAction {
2025-08-28 10:15:28 -04:00
match self.run_query(query) {
Ok(result) => {
let mut reply = Reply::new();
for oid in result.iter() {
reply.add(self.docs.get(oid).unwrap().clone());
}
reply.into()
}
2025-08-13 11:17:51 -04:00
Err(err) => err.into(),
}
}
2025-08-28 10:15:28 -04:00
fn update(&mut self, update: &Update) -> MsgAction {
let oids = match self.run_query(update.get_query()) {
Ok(result) => result,
Err(err) => return err.into(),
};
2025-09-15 09:32:45 -04:00
let mut holder: HashMap<Oid, [Document; 2]> = HashMap::new();
2025-08-28 10:15:28 -04:00
for oid in oids.iter() {
2025-09-15 09:32:45 -04:00
let doc = self.docs.get(oid).unwrap();
let old_new = [doc.clone(), doc.clone()];
holder.insert(oid.clone(), old_new);
}
let mut index_holder = self.docdef.create_indexes();
for (oid, docs) in holder.iter_mut() {
let mut updated = Document::new();
2025-08-28 10:15:28 -04:00
for (key, value) in update.get_values().iter() {
2025-09-15 09:32:45 -04:00
match self.validate(key, Some(value.clone())) {
Ok(field) => match index_holder.validate(key, &field) {
Ok(_) => {
index_holder.add_to_index(key, field.clone(), oid.clone());
docs[1].add_field(key.clone(), field.clone());
}
Err(err) => return Self::add_field_to_error(key.to_string(), err).into(),
},
2025-08-28 13:18:18 -04:00
Err(err) => return err.into(),
}
2025-08-28 10:15:28 -04:00
}
2025-09-15 09:32:45 -04:00
}
let mut reply = Reply::new();
for (oid, docs) in holder.iter() {
self.docs.insert(oid.clone(), docs[1].clone());
reply.add(docs[1].clone());
for (key, value) in docs[0].iter() {
self.remove_from_index(key, value, oid);
}
2025-08-28 10:15:28 -04:00
}
reply.into()
}
}
2025-08-02 08:58:50 -04:00
#[cfg(test)]
mod document_files {
2025-08-02 09:55:13 -04:00
use super::{support_test::TIMEOUT, *};
use std::sync::mpsc::RecvTimeoutError;
2025-08-09 08:03:36 -04:00
fn standard_routes() -> Vec<RouteRequest> {
[
RouteRequest::new(Include::All, Include::All, Include::Some(Action::Reply)),
RouteRequest::new(Include::All, Include::All, Include::Some(Action::Error)),
]
.to_vec()
}
2025-08-26 08:06:22 -04:00
fn create_docdef(field_types: Vec<FieldType>) -> (DocDef, String) {
let mut output = DocDef::new();
2025-08-12 09:01:25 -04:00
let mut count = 0;
for field_type in field_types.iter() {
output.add_field(format!("field{}", count), field_type.clone());
count += 1;
}
2025-08-26 08:06:22 -04:00
(output, format!("name-{}", Uuid::new_v4()))
}
2025-08-03 12:21:45 -04:00
fn test_doc(
name: &str,
docdef: DocDef,
routes: Vec<RouteRequest>,
) -> (Queue, Receiver<Message>) {
let (tx, rx) = channel();
let mut queue = Queue::new();
let msg = Message::new(name, docdef);
DocumentFile::start(queue.clone(), msg);
2025-08-03 12:21:45 -04:00
queue
.register(tx, Uuid::new_v4().to_string(), routes)
.unwrap();
(queue, rx)
2025-08-02 09:55:13 -04:00
}
2025-08-02 08:58:50 -04:00
2025-08-09 08:03:36 -04:00
#[test]
fn does_not_respond_to_create() {
let docdef = DocDef::new();
let name = "quiet";
let (mut queue, rx) = test_doc(name, docdef, standard_routes());
let other = "alternate";
let (tx, _) = channel();
queue.register(tx, other.to_string(), Vec::new()).unwrap();
let msg = Message::new(name, DocDef::new());
queue.send(msg).unwrap();
match rx.recv_timeout(TIMEOUT) {
Ok(msg) => unreachable!("should not receive: {:?}", msg),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should have timed out"),
},
}
}
2025-08-02 08:58:50 -04:00
#[test]
2025-08-03 12:21:45 -04:00
fn can_show_document_details() {
2025-08-02 09:55:13 -04:00
let docdef = DocDef::new();
let name = "first";
2025-08-09 08:03:36 -04:00
let (queue, rx) = test_doc(name, docdef, standard_routes());
2025-08-03 12:21:45 -04:00
let msg = Message::new(name, MsgAction::Show);
queue.send(msg.clone()).unwrap();
let show = rx.recv_timeout(TIMEOUT).unwrap();
}
2025-08-05 09:56:48 -04:00
#[test]
fn can_query_new_document() {
let docdef = DocDef::new();
let name = "second";
2025-08-09 08:03:36 -04:00
let (queue, rx) = test_doc(name, docdef, standard_routes());
let query = Message::new(name, Query::new());
2025-08-05 09:56:48 -04:00
queue.send(query).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
match result.get_action() {
MsgAction::Reply(data) => assert_eq!(data.len(), 0),
_ => unreachable!(
"got {:?}: should have received a reply",
result.get_action()
),
2025-08-05 09:56:48 -04:00
}
}
2025-08-04 23:51:57 -04:00
#[test]
2025-08-03 12:21:45 -04:00
fn only_responses_to_its_show_request() {
let docdef = DocDef::new();
let name = "quiet";
2025-08-09 08:03:36 -04:00
let (mut queue, rx) = test_doc(name, docdef, standard_routes());
2025-08-03 12:21:45 -04:00
let other = "alternate";
let (tx, _) = channel();
queue.register(tx, other.to_string(), Vec::new()).unwrap();
let msg = Message::new(other, MsgAction::Show);
queue.send(msg).unwrap();
match rx.recv_timeout(TIMEOUT) {
Ok(msg) => unreachable!("should not receive: {:?}", msg),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should have timed out"),
},
}
2025-08-02 08:58:50 -04:00
}
2025-08-05 09:56:48 -04:00
#[test]
fn only_responses_to_its_query_request() {
let docdef = DocDef::new();
let name = "quiet";
2025-08-09 08:03:36 -04:00
let (mut queue, rx) = test_doc(name, docdef, standard_routes());
2025-08-05 09:56:48 -04:00
let other = "alternate";
let (tx, _) = channel();
queue.register(tx, other.to_string(), Vec::new()).unwrap();
let msg = Message::new(other, Query::new());
queue.send(msg).unwrap();
match rx.recv_timeout(TIMEOUT) {
Ok(msg) => unreachable!("should not receive: {:?}", msg),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should have timed out"),
},
}
}
#[test]
fn can_document_be_added() {
let mut docdef = DocDef::new();
let name = "field";
let doc_name = "document";
2025-08-05 09:56:48 -04:00
let data = Uuid::new_v4();
docdef.add_field(name.to_string(), FieldType::Uuid);
2025-08-09 08:03:36 -04:00
let (queue, rx) = test_doc(doc_name, docdef, standard_routes());
let mut new_doc = Addition::new();
new_doc.add_field(name.to_string(), data.clone());
let msg = Message::new(doc_name, new_doc);
queue.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
match result.get_action() {
MsgAction::Reply(output) => {
assert_eq!(output.len(), 1);
2025-08-11 12:17:37 -04:00
let holder = output.iter().next().unwrap();
match holder.get_field(name) {
Some(field) => match field {
Field::Uuid(store) => assert_eq!(store, &data),
_ => unreachable!(
"got {:?}: should have received uuid",
holder.get_field(name).unwrap()
),
},
None => unreachable!("{:?} did not contain field '{}'", holder, name),
}
}
_ => unreachable!("got {:?}: should have been a reply", result),
}
let msg = Message::new(doc_name, Query::new());
queue.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
match result.get_action() {
MsgAction::Reply(output) => {
assert_eq!(output.len(), 1);
2025-08-11 12:17:37 -04:00
let holder = output.iter().next().unwrap();
match holder.get_field(name) {
Some(field) => match field {
Field::Uuid(store) => assert_eq!(store, &data),
_ => unreachable!(
"got {:?}: should have received uuid",
holder.get_field(name).unwrap()
),
},
None => unreachable!("{:?} did not contain field '{}'", holder, name),
}
}
_ => unreachable!("got {:?}: should have been a reply", result),
}
2025-08-05 09:56:48 -04:00
}
2025-08-09 08:03:36 -04:00
#[test]
fn only_responses_to_its_additions() {
let docdef = DocDef::new();
let name = "quiet";
let (mut queue, rx) = test_doc(name, docdef, standard_routes());
let other = "alternate";
let (tx, _) = channel();
queue.register(tx, other.to_string(), Vec::new()).unwrap();
let msg = Message::new(other, Addition::new());
queue.send(msg).unwrap();
match rx.recv_timeout(TIMEOUT) {
Ok(msg) => unreachable!("should not receive: {:?}", msg),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should have timed out"),
},
}
}
#[test]
fn can_add_multiple_documents() {
let count = 4;
let mut docdef = DocDef::new();
let name = "field";
let doc_name = "document";
let data = Uuid::new_v4();
docdef.add_field(name.to_string(), FieldType::Uuid);
let (queue, rx) = test_doc(doc_name, docdef, standard_routes());
let mut new_doc = Addition::new();
new_doc.add_field(name.to_string(), data.clone());
for _ in 0..count {
let msg = Message::new(doc_name, new_doc.clone());
queue.send(msg.clone()).unwrap();
rx.recv_timeout(TIMEOUT).unwrap(); // eats the confirmation reply.
}
let msg = Message::new(doc_name, Query::new());
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
match result.get_action() {
MsgAction::Reply(data) => assert_eq!(data.len(), count),
_ => unreachable!("got {:?}: should have been a reply", result.get_action()),
}
}
#[test]
fn errors_on_wrong_field_name() {
2025-08-26 08:06:22 -04:00
let (docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec());
let field_name = Uuid::new_v4().to_string();
2025-08-26 08:06:22 -04:00
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let mut addition = Addition::new();
addition.add_field(field_name.clone(), Uuid::new_v4());
let msg = Message::new(doc_name, addition);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
match result.get_action() {
MsgAction::Error(err) => match err {
MTTError::DocumentFieldNotFound(data) => assert_eq!(data, &field_name),
_ => unreachable!("got {:?}: should have been document field not found.", err),
2025-08-10 16:20:47 -04:00
},
_ => unreachable!("got {:?}: should have been an error", result.get_action()),
}
}
#[test]
fn errors_on_wrong_field_type() {
2025-08-26 08:06:22 -04:00
let (docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec());
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-10 16:20:47 -04:00
let mut addition = Addition::new();
addition.add_field("field0".to_string(), "astring");
let msg = Message::new(doc_name, addition);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
match result.get_action() {
MsgAction::Error(err) => match err {
MTTError::DocumentFieldWrongDataType(expected, got) => {
assert_eq!(got, &FieldType::StaticString);
2025-08-10 16:20:47 -04:00
assert_eq!(expected, &FieldType::Uuid);
}
_ => unreachable!(
"got {:?}: should have been document field data mismatch.",
err
),
},
_ => unreachable!("got {:?}: should have been an error", result.get_action()),
}
}
#[test]
fn errors_on_missing_fields() {
2025-08-26 08:06:22 -04:00
let (docdef, doc_name) = create_docdef([FieldType::Uuid, FieldType::Uuid].to_vec());
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-10 16:20:47 -04:00
let mut addition = Addition::new();
addition.add_field("field0".to_string(), Uuid::nil());
let msg = Message::new(doc_name, addition);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
match result.get_action() {
MsgAction::Error(err) => match err {
MTTError::DocumentFieldMissing(field) => assert_eq!(field, "field1"),
_ => unreachable!("got {:?}: should have been document field missing", err),
},
_ => unreachable!("got {:?}: should have been an error", result.get_action()),
}
}
2025-08-11 12:17:37 -04:00
#[test]
fn does_query_return_related_entries() {
2025-08-26 08:06:22 -04:00
let (docdef, doc_name) = create_docdef([FieldType::Uuid, FieldType::Uuid].to_vec());
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-11 12:17:37 -04:00
let field0 = Uuid::new_v4();
let field1 = Uuid::new_v4();
for _ in 0..3 {
let mut addition = Addition::new();
addition.add_field("field0".to_string(), Uuid::new_v4());
addition.add_field("field1".to_string(), Uuid::new_v4());
2025-08-26 08:06:22 -04:00
let msg = Message::new(doc_name.clone(), addition);
2025-08-11 12:17:37 -04:00
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
}
let mut addition = Addition::new();
addition.add_field("field0".to_string(), field0.clone());
addition.add_field("field1".to_string(), field1.clone());
2025-08-26 08:06:22 -04:00
let msg = Message::new(doc_name.clone(), addition);
2025-08-11 12:17:37 -04:00
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
for _ in 0..3 {
let mut addition = Addition::new();
addition.add_field("field0".to_string(), Uuid::new_v4());
addition.add_field("field1".to_string(), Uuid::new_v4());
2025-08-26 08:06:22 -04:00
let msg = Message::new(doc_name.clone(), addition);
2025-08-11 12:17:37 -04:00
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
}
let mut query = Query::new();
query.add_specifier("field0".to_string(), Operand::Equal, field0.clone());
let msg = Message::new(doc_name, query);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
2025-08-12 10:30:46 -04:00
let field0: Field = field0.into();
let field1: Field = field1.into();
2025-08-11 12:17:37 -04:00
match action {
MsgAction::Reply(data) => {
assert_eq!(data.len(), 1, "should return one entry");
2025-08-12 10:30:46 -04:00
for doc in data.iter() {
assert_eq!(doc.get_field("field0").unwrap(), &field0);
assert_eq!(doc.get_field("field1").unwrap(), &field1);
}
}
_ => unreachable!("got {:?}: should have been a reply", action),
}
}
#[test]
fn gets_all_documents_in_query() {
2025-08-26 08:06:22 -04:00
let (docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec());
2025-08-12 10:30:46 -04:00
let count = 4;
let input = Uuid::new_v4();
2025-08-26 08:06:22 -04:00
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-12 10:30:46 -04:00
for _ in 0..3 {
let mut addition = Addition::new();
addition.add_field("field0".to_string(), Uuid::new_v4());
2025-08-26 08:06:22 -04:00
let msg = Message::new(doc_name.clone(), addition);
2025-08-12 10:30:46 -04:00
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
}
for _ in 0..count {
let mut addition = Addition::new();
addition.add_field("field0".to_string(), input.clone());
2025-08-26 08:06:22 -04:00
let msg = Message::new(doc_name.clone(), addition);
2025-08-12 10:30:46 -04:00
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
}
let mut query = Query::new();
query.add_specifier("field0".to_string(), Operand::Equal, input.clone());
let msg = Message::new(doc_name, query);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
let input: Field = input.into();
match action {
MsgAction::Reply(data) => {
assert_eq!(data.len(), count, "should return {} entries", count);
for doc in data.iter() {
assert_eq!(doc.get_field("field0").unwrap(), &input);
}
}
_ => unreachable!("got {:?}: should have been a reply", action),
}
}
#[test]
fn query_should_work_with_multiple_fields() {
2025-08-26 08:06:22 -04:00
let (docdef, doc_name) = create_docdef([FieldType::Uuid, FieldType::Uuid].to_vec());
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-12 10:30:46 -04:00
let field0 = Uuid::new_v4();
let field1 = Uuid::new_v4();
let input = [
[Uuid::new_v4(), Uuid::new_v4()],
[field0.clone(), field1.clone()],
[field1.clone(), field0.clone()],
[field0.clone(), Uuid::new_v4()],
[Uuid::new_v4(), field1.clone()],
];
for combo in input.iter() {
let mut addition = Addition::new();
addition.add_field("field0".to_string(), combo[0].clone());
addition.add_field("field1".to_string(), combo[1].clone());
2025-08-26 08:06:22 -04:00
let msg = Message::new(doc_name.clone(), addition);
2025-08-12 10:30:46 -04:00
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
}
let mut query = Query::new();
query.add_specifier("field0".to_string(), Operand::Equal, field0.clone());
query.add_specifier("field1".to_string(), Operand::Equal, field1.clone());
let msg = Message::new(doc_name, query);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
let field0: Field = field0.into();
let field1: Field = field1.into();
match action {
MsgAction::Reply(data) => {
assert_eq!(data.len(), 1, "should return one entry");
for doc in data.iter() {
assert_eq!(doc.get_field("field0").unwrap(), &field0);
assert_eq!(doc.get_field("field1").unwrap(), &field1);
}
2025-08-11 12:17:37 -04:00
}
_ => unreachable!("got {:?}: should have been a reply", action),
}
}
2025-08-13 11:17:51 -04:00
#[test]
fn errors_on_bad_field_name() {
2025-08-26 08:06:22 -04:00
let (docdef, doc_name) = create_docdef(Vec::new());
2025-08-13 11:17:51 -04:00
let field_name = "wrong";
2025-08-26 08:06:22 -04:00
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-13 11:17:51 -04:00
let mut query = Query::new();
query.add_specifier(field_name.to_string(), Operand::Equal, Uuid::new_v4());
let msg = Message::new(doc_name, query);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Error(data) => match data {
MTTError::DocumentFieldNotFound(output) => assert_eq!(output, field_name),
_ => unreachable!("got {:?}: should been field not found", data),
},
_ => unreachable!("got {:?}: should have been a error", action),
}
}
#[test]
fn errors_on_bad_field_type() {
2025-08-26 08:06:22 -04:00
let (docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec());
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-13 11:17:51 -04:00
let mut query = Query::new();
query.add_specifier("field0".to_string(), Operand::Equal, "wrong");
let msg = Message::new(doc_name, query);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Error(data) => match data {
MTTError::DocumentFieldWrongDataType(expected, got) => {
assert_eq!(expected, &FieldType::Uuid);
assert_eq!(got, &FieldType::StaticString);
}
_ => unreachable!("got {:?}: should been field not found", data),
},
_ => unreachable!("got {:?}: should have been a error", action),
}
}
2025-08-20 10:26:56 -04:00
#[test]
fn can_use_default_values() {
2025-08-26 08:06:22 -04:00
let (mut docdef, doc_name) = create_docdef([FieldType::StaticString].to_vec());
docdef.set_default("field0", None);
2025-08-26 08:06:22 -04:00
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-20 10:26:56 -04:00
let new_doc = Addition::new();
let msg = Message::new(doc_name, new_doc);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(docs) => {
assert_eq!(docs.len(), 1);
for doc in docs.iter() {
let expected: Field = "".into();
assert_eq!(doc.get_field("field0").unwrap(), &expected);
}
}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
}
2025-08-21 10:38:13 -04:00
#[test]
fn can_a_default_value_be_set() {
2025-08-26 08:06:22 -04:00
let (mut docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec());
2025-08-21 10:38:13 -04:00
let input = Uuid::nil();
docdef.set_default("field0", Some(input.into()));
2025-08-26 08:06:22 -04:00
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-21 10:38:13 -04:00
let new_doc = Addition::new();
let msg = Message::new(doc_name, new_doc);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(docs) => {
assert_eq!(docs.len(), 1);
let expected: Field = input.into();
2025-08-21 11:51:56 -04:00
for doc in docs.iter() {
assert_eq!(doc.get_field("field0").unwrap(), &expected);
}
}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
}
#[test]
fn can_default_values_be_overridden() {
2025-08-26 08:06:22 -04:00
let (mut docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec());
docdef.set_default("field0", None);
2025-08-26 08:06:22 -04:00
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-21 11:51:56 -04:00
let mut new_doc = Addition::new();
new_doc.add_field("field0".to_string(), Uuid::nil());
let msg = Message::new(doc_name, new_doc);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(docs) => {
assert_eq!(docs.len(), 1);
let expected: Field = Uuid::nil().into();
2025-08-21 10:38:13 -04:00
for doc in docs.iter() {
assert_eq!(doc.get_field("field0").unwrap(), &expected);
}
}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
}
2025-08-26 08:06:22 -04:00
#[test]
fn empty_update_query_results_in_zero_changes() {
let (mut docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec());
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let mut update = Update::new();
update
.get_query_mut()
.add_specifier("field0".to_string(), Operand::Equal, Uuid::new_v4());
update
.get_values_mut()
.add_field("field0".to_string(), Uuid::nil());
let msg = Message::new(doc_name, update);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(docs) => assert_eq!(docs.len(), 0),
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
}
#[test]
fn only_responses_to_its_update_request() {
let (mut docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec());
let (mut queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let alt_doc_name = "alternate";
let (tx, _) = channel();
queue
.register(tx, alt_doc_name.to_string(), Vec::new())
.unwrap();
let mut update = Update::new();
update
.get_query_mut()
.add_specifier("field0".to_string(), Operand::Equal, Uuid::new_v4());
update
.get_values_mut()
.add_field("field0".to_string(), Uuid::nil());
let msg = Message::new(alt_doc_name, update);
queue.send(msg).unwrap();
match rx.recv_timeout(TIMEOUT) {
Ok(msg) => unreachable!("should not receive: {:?}", msg),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should have timed out"),
},
}
2025-08-26 08:06:22 -04:00
}
2025-08-28 10:15:28 -04:00
#[test]
fn changes_information_requested() {
let (mut docdef, doc_name) =
create_docdef([FieldType::Uuid, FieldType::StaticString].to_vec());
let (mut queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let id = Uuid::new_v4();
let old = "old";
let new = "new";
let mut addition = Addition::new();
addition.add_field("field0".to_string(), id.clone());
addition.add_field("field1".to_string(), old);
let msg = Message::new(doc_name.clone(), addition);
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
let mut update = Update::new();
update
.get_query_mut()
.add_specifier("field0".to_string(), Operand::Equal, id.clone());
update.get_values_mut().add_field("field1".to_string(), new);
let msg = Message::new(doc_name.clone(), update);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(docs) => {
assert_eq!(docs.len(), 1);
let expected_id: Field = id.into();
let output: Field = new.into();
for doc in docs.iter() {
assert_eq!(doc.get_field("field0").unwrap(), &expected_id);
assert_eq!(doc.get_field("field1").unwrap(), &output);
}
}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
let mut query = Query::new();
query.add_specifier("field0".to_string(), Operand::Equal, id.clone());
let msg = Message::new(doc_name, query);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(docs) => {
assert_eq!(docs.len(), 1);
let expected_id: Field = id.into();
let output: Field = new.into();
for doc in docs.iter() {
assert_eq!(doc.get_field("field0").unwrap(), &expected_id);
2025-08-28 10:37:42 -04:00
assert_eq!(doc.get_field("field1").unwrap(), &output);
}
}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
}
#[test]
fn changes_only_the_queried() {
let (mut docdef, doc_name) =
create_docdef([FieldType::Uuid, FieldType::StaticString].to_vec());
let (mut queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let id1 = Uuid::new_v4();
let id2 = Uuid::new_v4();
let old = "old";
let new = "new";
for id in [id1.clone(), id2.clone()].into_iter() {
let mut addition = Addition::new();
addition.add_field("field0".to_string(), id);
addition.add_field("field1".to_string(), old);
let msg = Message::new(doc_name.clone(), addition);
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
}
let mut update = Update::new();
update
.get_query_mut()
.add_specifier("field0".to_string(), Operand::Equal, id1.clone());
update.get_values_mut().add_field("field1".to_string(), new);
let msg = Message::new(doc_name.clone(), update);
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
let mut query = Query::new();
query.add_specifier("field0".to_string(), Operand::Equal, id2.clone());
let msg = Message::new(doc_name, query);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(docs) => {
assert_eq!(docs.len(), 1);
let expected_id: Field = id2.into();
let output: Field = old.into();
for doc in docs.iter() {
assert_eq!(doc.get_field("field0").unwrap(), &expected_id);
2025-08-28 10:49:18 -04:00
assert_eq!(doc.get_field("field1").unwrap(), &output);
}
}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
}
#[test]
fn can_handle_multiple_updates() {
let (mut docdef, doc_name) =
create_docdef([FieldType::Uuid, FieldType::StaticString].to_vec());
let (mut queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let count = 3;
let id = Uuid::new_v4();
let old = "old";
let new = "new";
let mut addition = Addition::new();
addition.add_field("field0".to_string(), id.clone());
addition.add_field("field1".to_string(), old);
let msg = Message::new(doc_name.clone(), addition);
for _ in 0..count {
queue.send(msg.clone()).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
}
let mut update = Update::new();
update
.get_query_mut()
.add_specifier("field0".to_string(), Operand::Equal, id.clone());
update.get_values_mut().add_field("field1".to_string(), new);
let msg = Message::new(doc_name.clone(), update);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(docs) => {
assert_eq!(docs.len(), count);
let expected_id: Field = id.into();
let output: Field = new.into();
for doc in docs.iter() {
assert_eq!(doc.get_field("field0").unwrap(), &expected_id);
assert_eq!(doc.get_field("field1").unwrap(), &output);
}
}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
let mut query = Query::new();
query.add_specifier("field0".to_string(), Operand::Equal, id.clone());
let msg = Message::new(doc_name, query);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(docs) => {
assert_eq!(docs.len(), count);
let expected_id: Field = id.into();
let output: Field = new.into();
for doc in docs.iter() {
assert_eq!(doc.get_field("field0").unwrap(), &expected_id);
2025-08-28 10:15:28 -04:00
assert_eq!(doc.get_field("field1").unwrap(), &output);
}
}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
}
2025-08-28 11:02:47 -04:00
#[test]
fn update_errors_on_bad_field_name() {
let (mut docdef, doc_name) =
create_docdef([FieldType::Uuid, FieldType::StaticString].to_vec());
let (mut queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let id = Uuid::new_v4();
let old = "old";
let new = "new";
let mut addition = Addition::new();
addition.add_field("field0".to_string(), id.clone());
addition.add_field("field1".to_string(), old);
let msg = Message::new(doc_name.clone(), addition);
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
let mut update = Update::new();
2025-08-28 13:18:18 -04:00
update
.get_query_mut()
.add_specifier("field0".to_string(), Operand::Equal, id.clone());
update.get_values_mut().add_field("wrong".to_string(), new);
let msg = Message::new(doc_name.clone(), update);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Error(err) => match err {
MTTError::DocumentFieldNotFound(data) => assert_eq!(data, "wrong"),
_ => unreachable!("got {:?}: should have gotten an missing field", err),
},
_ => unreachable!("got {:?}: should have gotten an error", action),
}
}
#[test]
fn update_errors_on_bad_field_type() {
2025-09-05 23:52:24 -04:00
let (docdef, doc_name) = create_docdef([FieldType::Uuid, FieldType::StaticString].to_vec());
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
2025-08-28 13:18:18 -04:00
let id = Uuid::new_v4();
let old = "old";
let new = Uuid::nil();
let mut addition = Addition::new();
addition.add_field("field0".to_string(), id.clone());
addition.add_field("field1".to_string(), old);
let msg = Message::new(doc_name.clone(), addition);
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
let mut update = Update::new();
2025-08-28 11:02:47 -04:00
update
.get_query_mut()
.add_specifier("field0".to_string(), Operand::Equal, id.clone());
update.get_values_mut().add_field("field1".to_string(), new);
let msg = Message::new(doc_name.clone(), update);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
2025-08-28 13:18:18 -04:00
match action {
MsgAction::Error(err) => match err {
MTTError::DocumentFieldWrongDataType(expected, got) => {
assert_eq!(expected, &FieldType::StaticString);
assert_eq!(got, &FieldType::Uuid);
}
_ => unreachable!("got {:?}: should have gotten an missing field", err),
},
_ => unreachable!("got {:?}: should have gotten an error", action),
}
2025-08-28 11:02:47 -04:00
}
2025-09-05 23:52:24 -04:00
#[test]
fn can_field_be_marked_unique() {
let (mut docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec());
2025-09-15 09:32:45 -04:00
docdef.add_index("field0".to_string(), IndexType::Unique);
2025-09-05 23:52:24 -04:00
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let field0 = Uuid::new_v4();
let mut addition = Addition::new();
addition.add_field("field0".to_string(), field0.clone());
let msg = Message::new(doc_name.clone(), addition.clone());
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
let msg2 = Message::new(doc_name.clone(), addition);
queue.send(msg2).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Error(err) => match err {
2025-09-11 23:49:25 -04:00
MTTError::FieldDuplicate(key, field) => {
let expected: Field = field0.into();
assert_eq!(key, "field0");
assert_eq!(field, &expected);
}
2025-09-05 23:52:24 -04:00
_ => unreachable!("got {:?}: should have gotten an missing field", err),
},
_ => unreachable!("got {:?}: should have gotten an error", action),
}
}
2025-09-07 17:04:14 -04:00
2025-09-09 09:05:38 -04:00
#[test]
2025-09-07 17:04:14 -04:00
fn unique_value_remains_available_if_failure_occurs() {
let (mut docdef, doc_name) = create_docdef([FieldType::Uuid, FieldType::Uuid].to_vec());
2025-09-15 09:32:45 -04:00
docdef.add_index("field0".to_string(), IndexType::Unique);
2025-09-07 17:04:14 -04:00
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let field0 = Uuid::new_v4();
let mut bad_addition = Addition::new();
bad_addition.add_field("field0".to_string(), field0.clone());
bad_addition.add_field("field1".to_string(), "");
let msg = Message::new(doc_name.clone(), bad_addition.clone());
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
let mut good_addition = Addition::new();
good_addition.add_field("field0".to_string(), field0.clone());
good_addition.add_field("field1".to_string(), field0.clone());
2025-09-09 09:05:38 -04:00
let msg = Message::new(doc_name.clone(), good_addition.clone());
2025-09-07 17:04:14 -04:00
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(_) => {}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
}
2025-09-11 23:49:25 -04:00
#[test]
fn updating_unique_removes_old_entry() {
let (mut docdef, doc_name) = create_docdef([FieldType::Uuid].to_vec());
2025-09-15 09:32:45 -04:00
docdef.add_index("field0".to_string(), IndexType::Unique);
2025-09-11 23:49:25 -04:00
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let old = Uuid::new_v4();
let mut new = Uuid::new_v4();
while old == new {
new = Uuid::new_v4();
}
let fold: Field = old.into();
let fnew: Field = new.into();
let mut addition = Addition::new();
addition.add_field("field0".to_string(), old.clone());
let msg = Message::new(doc_name.clone(), addition.clone());
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
let mut update = Update::new();
let query = update.get_query_mut();
query.add_specifier("field0".to_string(), Operand::Equal, old.clone());
let values = update.get_values_mut();
values.add_field("field0".to_string(), new.clone());
let msg = Message::new(doc_name.clone(), update);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(data) => {
assert_eq!(data.len(), 1);
for doc in data.iter() {
assert_eq!(
doc.get_field("field0").unwrap(),
&fnew,
"got {:?} as a reply",
data
);
}
}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
let msg = Message::new(doc_name.clone(), addition.clone());
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(data) => {
assert_eq!(data.len(), 1);
for doc in data.iter() {
assert_eq!(
doc.get_field("field0").unwrap(),
&fold,
"got {:?} as a reply",
data
);
}
}
_ => unreachable!("got {:?}: should have gotten a reply", action),
}
}
2025-09-13 12:45:20 -04:00
2025-09-15 09:32:45 -04:00
#[test]
2025-09-13 12:45:20 -04:00
fn unique_available_after_bad_change() {
let mut ids: Vec<Uuid> = Vec::new();
while ids.len() < 3 {
let id = Uuid::new_v4();
if !ids.contains(&id) {
ids.push(id);
}
}
let (mut docdef, doc_name) =
create_docdef([FieldType::Uuid, FieldType::StaticString].to_vec());
2025-09-15 09:32:45 -04:00
docdef.add_index("field0".to_string(), IndexType::Unique);
2025-09-13 12:45:20 -04:00
let (queue, rx) = test_doc(doc_name.as_str(), docdef, standard_routes());
let field1 = "fred";
for index in 0..2 {
let mut addition = Addition::new();
addition.add_field("field0".to_string(), ids[index].clone());
addition.add_field("field1".to_string(), field1);
let msg = Message::new(doc_name.clone(), addition.clone());
queue.send(msg).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
}
let mut update = Update::new();
let query = update.get_query_mut();
query.add_specifier("field1".to_string(), Operand::Equal, field1);
let values = update.get_values_mut();
values.add_field("field0".to_string(), ids[2].clone());
let msg = Message::new(doc_name.clone(), update);
queue.send(msg).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Error(err) => match err {
MTTError::FieldDuplicate(key, field) => {
let expected: Field = ids[2].into();
assert_eq!(key, "field0");
assert_eq!(field, &expected);
}
_ => unreachable!("got {:?}: should have gotten an missing field", err),
},
_ => unreachable!("got {:?}: should have gotten an error", action),
}
}
2025-08-02 08:58:50 -04:00
}
#[cfg(test)]
2025-07-28 10:49:34 -04:00
mod createdocs {
use super::support_test::TIMEOUT;
use super::*;
fn setup_create_doc(routes: Vec<RouteRequest>) -> (Queue, Receiver<Message>) {
let mut queue = Queue::new();
let (tx, rx) = channel();
queue
.register(tx, Uuid::new_v4().to_string(), routes)
.unwrap();
CreateDoc::start(queue.clone());
(queue, rx)
}
#[test]
fn create_document_creation() {
let name = "project";
let routes = [RouteRequest::new(
Include::All,
Include::All,
Include::Some(Action::Reply),
)]
.to_vec();
let (queue, rx) = setup_create_doc(routes);
let msg1 = Message::new(name, MsgAction::Create(DocDef::new()));
queue.send(msg1.clone()).unwrap();
let result1 = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result1.get_message_id(), msg1.get_message_id());
assert_eq!(result1.get_document_id(), msg1.get_document_id());
match result1.get_action() {
MsgAction::Reply(_) => {}
_ => unreachable!("got {:?}: should have been a reply.", result1.get_action()),
}
2025-08-05 09:56:48 -04:00
let msg2 = Message::new(name, MsgAction::Query(Query::new()));
queue.send(msg2.clone()).unwrap();
let result2 = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result2.get_message_id(), msg2.get_message_id());
assert_eq!(result2.get_document_id(), msg2.get_document_id());
match result2.get_action() {
MsgAction::Reply(_) => {}
_ => unreachable!("got {:?}: should have been a reply.", result1.get_action()),
}
}
#[test]
fn does_duplicates_generate_error() {
let name = "duplicate";
let routes = [
RouteRequest::new(Include::All, Include::All, Include::Some(Action::Reply)),
RouteRequest::new(Include::All, Include::All, Include::Some(Action::Error)),
]
.to_vec();
let (queue, rx) = setup_create_doc(routes);
2025-07-28 10:49:34 -04:00
let msg = Message::new(name, MsgAction::Create(DocDef::new()));
queue.send(msg.clone()).unwrap();
rx.recv_timeout(TIMEOUT).unwrap();
queue.send(msg.clone()).unwrap();
let result = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
assert_eq!(result.get_document_id(), msg.get_document_id());
match result.get_action() {
MsgAction::Error(err) => match err {
MTTError::DocumentAlreadyExists(data) => assert_eq!(data, name),
_ => unreachable!("got {:?}: should have been a reply.", err),
},
_ => unreachable!("got {:?}: should have been a reply.", result.get_action()),
}
}
}