Able to create new documents.
This commit is contained in:
parent
3b641e7c28
commit
1572e2f86a
177
src/message.rs
177
src/message.rs
@ -1,4 +1,3 @@
|
||||
use crate::field::Field;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
@ -19,13 +18,29 @@ enum MTTError {
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
enum Action {
|
||||
Create,
|
||||
Query,
|
||||
//
|
||||
NewDocumentType,
|
||||
Query,
|
||||
Reply,
|
||||
Update,
|
||||
}
|
||||
|
||||
impl From<MsgAction> for Action {
|
||||
fn from(value: MsgAction) -> Self {
|
||||
match value {
|
||||
MsgAction::Create(_) => Action::Create,
|
||||
MsgAction::Query(_) => Action::Query,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&MsgAction> for Action {
|
||||
fn from(value: &MsgAction) -> Self {
|
||||
let action = value.clone();
|
||||
Self::from(action)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
enum NameID {
|
||||
ID(Uuid),
|
||||
@ -56,16 +71,21 @@ impl From<&NameID> for NameID {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum MsgAction {
|
||||
Create(DocDef),
|
||||
Query(Access),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Message {
|
||||
msg_id: Uuid,
|
||||
document_id: NameID,
|
||||
action: Action,
|
||||
//instructions: ?,
|
||||
action: MsgAction,
|
||||
}
|
||||
|
||||
impl Message {
|
||||
fn new<D>(doc_id: D, action: Action) -> Self
|
||||
fn new<D>(doc_id: D, action: MsgAction) -> Self
|
||||
where
|
||||
D: Into<NameID>,
|
||||
{
|
||||
@ -84,7 +104,7 @@ impl Message {
|
||||
&self.document_id
|
||||
}
|
||||
|
||||
fn get_action(&self) -> &Action {
|
||||
fn get_action(&self) -> &MsgAction {
|
||||
&self.action
|
||||
}
|
||||
}
|
||||
@ -97,12 +117,15 @@ mod messages {
|
||||
fn can_the_document_be_a_stringi_reference() {
|
||||
let dts = ["one", "two"];
|
||||
for document in dts.into_iter() {
|
||||
let msg = Message::new(document, Action::NewDocumentType);
|
||||
let msg = Message::new(document, MsgAction::Create(DocDef::new()));
|
||||
match msg.get_document_id() {
|
||||
NameID::ID(_) => unreachable!("should have been a string id"),
|
||||
NameID::Name(data) => assert_eq!(data, document),
|
||||
}
|
||||
assert_eq!(msg.get_action(), &Action::NewDocumentType);
|
||||
match msg.get_action() {
|
||||
MsgAction::Create(_) => {}
|
||||
_ => unreachable!("should have been a create document"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,31 +133,37 @@ mod messages {
|
||||
fn can_the_document_be_a_string() {
|
||||
let dts = ["one".to_string(), "two".to_string()];
|
||||
for document in dts.into_iter() {
|
||||
let msg = Message::new(document.clone(), Action::Update);
|
||||
let msg = Message::new(document.clone(), MsgAction::Query(Access::new()));
|
||||
match msg.get_document_id() {
|
||||
NameID::ID(_) => unreachable!("should have been a string id"),
|
||||
NameID::Name(data) => assert_eq!(data, &document),
|
||||
}
|
||||
assert_eq!(msg.get_action(), &Action::Update);
|
||||
match msg.get_action() {
|
||||
MsgAction::Query(_) => {}
|
||||
_ => unreachable!("should have been an access query"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_the_document_be_an_id() {
|
||||
let document = Uuid::new_v4();
|
||||
let msg = Message::new(document.clone(), Action::Query);
|
||||
let msg = Message::new(document.clone(), MsgAction::Query(Access::new()));
|
||||
match msg.get_document_id() {
|
||||
NameID::ID(data) => assert_eq!(data, &document),
|
||||
NameID::Name(_) => unreachable!("should have been an id"),
|
||||
}
|
||||
assert_eq!(msg.action, Action::Query);
|
||||
match msg.get_action() {
|
||||
MsgAction::Query(_) => {}
|
||||
_ => unreachable!("should have been an access query"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_the_message_id_random() {
|
||||
let mut ids: Vec<Uuid> = Vec::new();
|
||||
for _ in 0..5 {
|
||||
let msg = Message::new("tester", Action::NewDocumentType);
|
||||
let msg = Message::new("tester", MsgAction::Create(DocDef::new()));
|
||||
let id = msg.get_message_id().clone();
|
||||
assert!(!ids.contains(&id), "{:?} containts {}", ids, id);
|
||||
ids.push(id);
|
||||
@ -211,8 +240,7 @@ struct Route {
|
||||
}
|
||||
|
||||
impl Route {
|
||||
fn new(msg_id: Include<Uuid>, doc: Include<Uuid>, action: Include<Action>) -> Self
|
||||
{
|
||||
fn new(msg_id: Include<Uuid>, doc: Include<Uuid>, action: Include<Action>) -> Self {
|
||||
Self {
|
||||
action: action,
|
||||
doc_type: doc,
|
||||
@ -389,14 +417,19 @@ impl QueueData {
|
||||
let doc_id: Include<Uuid> = match self.get_doc_id(msg.get_document_id()) {
|
||||
Ok(id) => Include::Some(id.clone()),
|
||||
Err(err) => {
|
||||
if msg.get_action() == &Action::Create {
|
||||
let action: Action = msg.get_action().into();
|
||||
if action == Action::Create {
|
||||
Include::Some(Uuid::nil())
|
||||
} else {
|
||||
return Err(err)
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
let route = Route::new(Include::Some(msg.get_message_id().clone()), doc_id, Include::Some(msg.get_action().clone()));
|
||||
let route = Route::new(
|
||||
Include::Some(msg.get_message_id().clone()),
|
||||
doc_id,
|
||||
Include::Some(msg.get_action().into()),
|
||||
);
|
||||
for (send_route, send_ids) in self.routes.iter() {
|
||||
if route == send_route.into() {
|
||||
for send_id in send_ids {
|
||||
@ -452,11 +485,11 @@ mod queuedatas {
|
||||
let mut queuedata = QueueData::new();
|
||||
let id = queuedata.register(name.clone(), tx).unwrap();
|
||||
queuedata.add_route(&id, Include::Some(name.clone()), action);
|
||||
let msg = Message::new(name.clone(), Action::Query);
|
||||
let msg = Message::new(name.clone(), MsgAction::Query(Access::new()));
|
||||
queuedata.send(msg.clone()).unwrap();
|
||||
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||
assert_eq!(result.get_message_id(), msg.get_message_id());
|
||||
let msg = Message::new(id.clone(), Action::Query);
|
||||
let msg = Message::new(id.clone(), MsgAction::Query(Access::new()));
|
||||
queuedata.send(msg.clone()).unwrap();
|
||||
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||
assert_eq!(result.get_message_id(), msg.get_message_id());
|
||||
@ -466,7 +499,7 @@ mod queuedatas {
|
||||
fn does_a_bad_document_name_fail() {
|
||||
let docname = Uuid::new_v4().to_string();
|
||||
let queuedata = QueueData::new();
|
||||
let msg = Message::new(docname.clone(), Action::Query);
|
||||
let msg = Message::new(docname.clone(), MsgAction::Query(Access::new()));
|
||||
match queuedata.send(msg) {
|
||||
Ok(_) => unreachable!("should have been an error"),
|
||||
Err(data) => match data {
|
||||
@ -498,7 +531,7 @@ mod queuedatas {
|
||||
let name = "something";
|
||||
let (tx, _) = channel();
|
||||
queuedata.register(name.to_string(), tx).unwrap();
|
||||
let msg = Message::new("something", Action::NewDocumentType);
|
||||
let msg = Message::new("something", MsgAction::Create(DocDef::new()));
|
||||
match queuedata.send(msg) {
|
||||
Ok(_) => {}
|
||||
Err(err) => unreachable!("got {:?}: should not error", err),
|
||||
@ -512,11 +545,11 @@ mod queuedatas {
|
||||
let (tx, rx) = channel();
|
||||
let id = queuedata.register(doctype.to_string(), tx).unwrap();
|
||||
queuedata.add_route(&id, Include::Some(doctype.to_string()), Action::Query);
|
||||
let msg = Message::new(doctype, Action::Query);
|
||||
let msg = Message::new(doctype, MsgAction::Query(Access::new()));
|
||||
queuedata.send(msg.clone()).unwrap();
|
||||
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||
assert_eq!(result.get_message_id(), msg.get_message_id());
|
||||
let msg = Message::new(doctype, Action::Reply);
|
||||
let msg = Message::new(doctype, MsgAction::Query(Access::new()));
|
||||
match rx.recv_timeout(TIMEOUT) {
|
||||
Ok(_) => unreachable!("should timeout"),
|
||||
Err(err) => match err {
|
||||
@ -531,13 +564,21 @@ mod queuedatas {
|
||||
let mut queuedata = QueueData::new();
|
||||
let name1 = "task";
|
||||
let name2 = "work";
|
||||
let action = Action::Query;
|
||||
let action = MsgAction::Query(Access::new());
|
||||
let (tx1, rx1) = channel();
|
||||
let (tx2, rx2) = channel();
|
||||
let id1 = queuedata.register(name1.to_string(), tx1).unwrap();
|
||||
let id2 = queuedata.register(name2.to_string(), tx2).unwrap();
|
||||
queuedata.add_route(&id1, Include::Some(name1.to_string()), action.clone());
|
||||
queuedata.add_route(&id2, Include::Some(name1.to_string()), action.clone());
|
||||
queuedata.add_route(
|
||||
&id1,
|
||||
Include::Some(name1.to_string()),
|
||||
action.clone().into(),
|
||||
);
|
||||
queuedata.add_route(
|
||||
&id2,
|
||||
Include::Some(name1.to_string()),
|
||||
action.clone().into(),
|
||||
);
|
||||
let msg = Message::new(name1, action.clone());
|
||||
queuedata.send(msg.clone()).unwrap();
|
||||
let result1 = rx1.recv_timeout(TIMEOUT).unwrap();
|
||||
@ -550,11 +591,11 @@ mod queuedatas {
|
||||
fn can_a_route_be_generally_set() {
|
||||
let mut queuedata = QueueData::new();
|
||||
let doctype = "something";
|
||||
let action = Action::Query;
|
||||
let action = MsgAction::Query(Access::new());
|
||||
let (tx, rx) = channel();
|
||||
let id = queuedata.register(doctype.to_string(), tx).unwrap();
|
||||
let data: Include<String> = Include::All;
|
||||
queuedata.add_route(&id, data, action.clone());
|
||||
queuedata.add_route(&id, data, action.clone().into());
|
||||
let msg = Message::new(doctype, action);
|
||||
queuedata.send(msg.clone()).unwrap();
|
||||
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||
@ -608,6 +649,59 @@ mod queues {
|
||||
}
|
||||
}
|
||||
|
||||
struct CreateDoc {
|
||||
queue: Queue,
|
||||
rx: Receiver<Message>,
|
||||
}
|
||||
|
||||
impl CreateDoc {
|
||||
fn new(queue: Queue, rx: Receiver<Message>) -> Self {
|
||||
Self {
|
||||
queue: queue,
|
||||
rx: rx,
|
||||
}
|
||||
}
|
||||
|
||||
fn start(mut queue: Queue) {
|
||||
let (tx, rx) = channel();
|
||||
let id = queue.register("document".to_string(), tx).unwrap();
|
||||
let nameid: Include<NameID> = Include::All;
|
||||
queue.add_route(&id, nameid, Action::Create);
|
||||
let doc = CreateDoc::new(queue, rx);
|
||||
spawn(move || {
|
||||
doc.listen();
|
||||
});
|
||||
}
|
||||
|
||||
fn listen(&self) {
|
||||
loop {
|
||||
let msg = self.rx.recv().unwrap();
|
||||
match msg.get_document_id() {
|
||||
NameID::Name(name) => Document::start(self.queue.clone(), name.clone()),
|
||||
NameID::ID(_) => unreachable!("should be a name"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DocDef;
|
||||
|
||||
impl DocDef {
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Access;
|
||||
|
||||
impl Access {
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
struct Document {
|
||||
queue: Queue,
|
||||
rx: Receiver<Message>,
|
||||
@ -621,9 +715,9 @@ impl Document {
|
||||
}
|
||||
}
|
||||
|
||||
fn start(mut queue: Queue) {
|
||||
fn start(mut queue: Queue, name: String) {
|
||||
let (tx, rx) = channel();
|
||||
queue.register("document".to_string(), tx);
|
||||
queue.register(name, tx);
|
||||
let doc = Document::new(queue, rx);
|
||||
spawn(move || {
|
||||
doc.listen();
|
||||
@ -638,24 +732,19 @@ impl Document {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod documents {
|
||||
mod createdocs {
|
||||
use super::*;
|
||||
use std::{thread::sleep, time::Duration};
|
||||
|
||||
//#[test]
|
||||
#[test]
|
||||
fn create_document_creation() {
|
||||
let queue = Queue::new();
|
||||
Document::start(queue.clone());
|
||||
CreateDoc::start(queue.clone());
|
||||
let name = "project";
|
||||
let msg = Message::new(name, Action::Create);
|
||||
let msg = Message::new(name, MsgAction::Create(DocDef::new()));
|
||||
queue.send(msg).unwrap();
|
||||
let msg2 = Message::new(name, Action::Query);
|
||||
sleep(Duration::from_secs(1));
|
||||
let msg2 = Message::new(name, MsgAction::Query(Access::new()));
|
||||
queue.send(msg2).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Create a double hash map. posswible names that leads to an id that is int eh ids
|
||||
// \and the second is the id and the sender to be used.and a third for who wants to
|
||||
// listen to what.
|
||||
//
|
||||
// The queue has a read write lock on the abbove strucutee. A clone of this is given to
|
||||
// every process.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user