Got the system to error on duplicate documents.
This commit is contained in:
parent
60ad95a5e5
commit
0553524ec6
139
src/message.rs
139
src/message.rs
@ -19,12 +19,12 @@ mod support_test {
|
|||||||
enum MTTError {
|
enum MTTError {
|
||||||
DocumentAlreadyExists(String),
|
DocumentAlreadyExists(String),
|
||||||
DocumentNotFound(String),
|
DocumentNotFound(String),
|
||||||
RouteNoListeners,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||||
enum Action {
|
enum Action {
|
||||||
Create,
|
Create,
|
||||||
|
Error,
|
||||||
Query,
|
Query,
|
||||||
Reply,
|
Reply,
|
||||||
}
|
}
|
||||||
@ -33,6 +33,7 @@ impl From<MsgAction> for Action {
|
|||||||
fn from(value: MsgAction) -> Self {
|
fn from(value: MsgAction) -> Self {
|
||||||
match value {
|
match value {
|
||||||
MsgAction::Create(_) => Action::Create,
|
MsgAction::Create(_) => Action::Create,
|
||||||
|
MsgAction::Error(_) => Action::Error,
|
||||||
MsgAction::Query(_) => Action::Query,
|
MsgAction::Query(_) => Action::Query,
|
||||||
MsgAction::Reply(_) => Action::Reply,
|
MsgAction::Reply(_) => Action::Reply,
|
||||||
}
|
}
|
||||||
@ -76,9 +77,10 @@ impl From<&NameID> for NameID {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone, Debug)]
|
||||||
enum MsgAction {
|
enum MsgAction {
|
||||||
Create(DocDef),
|
Create(DocDef),
|
||||||
|
Error(MTTError),
|
||||||
Query(Access),
|
Query(Access),
|
||||||
Reply(Response),
|
Reply(Response),
|
||||||
}
|
}
|
||||||
@ -121,6 +123,14 @@ impl Message {
|
|||||||
action: MsgAction::Reply(resp),
|
action: MsgAction::Reply(resp),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn error(&self, err: MTTError) -> Self {
|
||||||
|
Self {
|
||||||
|
msg_id: self.msg_id.clone(),
|
||||||
|
document_id: self.document_id.clone(),
|
||||||
|
action: MsgAction::Error(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -200,6 +210,27 @@ mod messages {
|
|||||||
_ => unreachable!("should have been a reply"),
|
_ => unreachable!("should have been a reply"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn Can_make_error_message() {
|
||||||
|
let name = "testing";
|
||||||
|
let msg = Message::new(name, MsgAction::Query(Access::new()));
|
||||||
|
let err_msg = Uuid::new_v4().to_string();
|
||||||
|
let result = msg.error(MTTError::DocumentNotFound(err_msg.clone()));
|
||||||
|
|
||||||
|
assert_eq!(result.get_message_id(), msg.get_message_id());
|
||||||
|
match result.get_document_id() {
|
||||||
|
NameID::Name(data) => assert_eq!(data, name),
|
||||||
|
_ => unreachable!("should have been a name"),
|
||||||
|
}
|
||||||
|
match result.get_action() {
|
||||||
|
MsgAction::Error(data) => match data {
|
||||||
|
MTTError::DocumentNotFound(txt) => assert_eq!(txt, &err_msg),
|
||||||
|
_ => unreachable!("got {:?}, should have received not found", data),
|
||||||
|
},
|
||||||
|
_ => unreachable!("should have been a reply"),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@ -779,16 +810,12 @@ impl CreateDoc {
|
|||||||
fn listen(&self) {
|
fn listen(&self) {
|
||||||
loop {
|
loop {
|
||||||
let msg = self.rx.recv().unwrap();
|
let msg = self.rx.recv().unwrap();
|
||||||
match msg.get_document_id() {
|
Document::start(self.queue.clone(), msg);
|
||||||
NameID::Name(name) => Document::start(self.queue.clone(), name.clone()),
|
|
||||||
NameID::ID(_) => unreachable!("should be a name"),
|
|
||||||
}
|
|
||||||
self.queue.send(msg.reply(Response::new())).unwrap();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone, Debug)]
|
||||||
struct DocDef;
|
struct DocDef;
|
||||||
|
|
||||||
impl DocDef {
|
impl DocDef {
|
||||||
@ -797,7 +824,7 @@ impl DocDef {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone, Debug)]
|
||||||
struct Access;
|
struct Access;
|
||||||
|
|
||||||
impl Access {
|
impl Access {
|
||||||
@ -806,7 +833,7 @@ impl Access {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone, Debug)]
|
||||||
struct Response;
|
struct Response;
|
||||||
|
|
||||||
impl Response {
|
impl Response {
|
||||||
@ -828,18 +855,39 @@ impl Document {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(mut queue: Queue, name: String) {
|
fn start(mut queue: Queue, msg: Message) {
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
queue.register(tx, name, Vec::new());
|
let name = match msg.get_document_id() {
|
||||||
let doc = Document::new(queue, rx);
|
NameID::Name(name) => name.clone(),
|
||||||
|
NameID::ID(id) => id.to_string(),
|
||||||
|
};
|
||||||
|
let routes = [RouteRequest::new(
|
||||||
|
Include::All,
|
||||||
|
Include::Some(name.clone()),
|
||||||
|
Include::Some(Action::Query),
|
||||||
|
)]
|
||||||
|
.to_vec();
|
||||||
|
match queue.register(tx, name, routes) {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
let error = msg.error(err);
|
||||||
|
queue.send(error).unwrap();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let doc = Document::new(queue.clone(), rx);
|
||||||
spawn(move || {
|
spawn(move || {
|
||||||
doc.listen();
|
doc.listen();
|
||||||
});
|
});
|
||||||
|
let reply = msg.reply(Response::new());
|
||||||
|
queue.send(reply).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn listen(&self) {
|
fn listen(&self) {
|
||||||
loop {
|
loop {
|
||||||
self.rx.recv().unwrap();
|
let msg = self.rx.recv().unwrap();
|
||||||
|
let reply = msg.reply(Response::new());
|
||||||
|
self.queue.send(reply).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -849,23 +897,68 @@ mod createdocs {
|
|||||||
use super::support_test::TIMEOUT;
|
use super::support_test::TIMEOUT;
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
fn setup_create_doc(routes: Vec<RouteRequest>) -> (Queue, Receiver<Message>) {
|
||||||
fn create_document_creation() {
|
|
||||||
let mut queue = Queue::new();
|
let mut queue = Queue::new();
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
|
queue
|
||||||
|
.register(tx, Uuid::new_v4().to_string(), routes)
|
||||||
|
.unwrap();
|
||||||
|
CreateDoc::start(queue.clone());
|
||||||
|
(queue, rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn create_document_creation() {
|
||||||
|
let name = "project";
|
||||||
let routes = [RouteRequest::new(
|
let routes = [RouteRequest::new(
|
||||||
Include::All,
|
Include::All,
|
||||||
Include::All,
|
Include::All,
|
||||||
Include::Some(Action::Reply),
|
Include::Some(Action::Reply),
|
||||||
)]
|
)]
|
||||||
.to_vec();
|
.to_vec();
|
||||||
queue.register(tx, "testing".to_string(), routes).unwrap();
|
let (queue, rx) = setup_create_doc(routes);
|
||||||
CreateDoc::start(queue.clone());
|
let msg1 = Message::new(name, MsgAction::Create(DocDef::new()));
|
||||||
let name = "project";
|
queue.send(msg1.clone()).unwrap();
|
||||||
let msg = Message::new(name, MsgAction::Create(DocDef::new()));
|
let result1 = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
queue.send(msg).unwrap();
|
assert_eq!(result1.get_message_id(), msg1.get_message_id());
|
||||||
rx.recv_timeout(TIMEOUT).unwrap();
|
assert_eq!(result1.get_document_id(), msg1.get_document_id());
|
||||||
|
match result1.get_action() {
|
||||||
|
MsgAction::Reply(_) => {}
|
||||||
|
_ => unreachable!("got {:?}: should have been a reply.", result1.get_action()),
|
||||||
|
}
|
||||||
let msg2 = Message::new(name, MsgAction::Query(Access::new()));
|
let msg2 = Message::new(name, MsgAction::Query(Access::new()));
|
||||||
queue.send(msg2).unwrap();
|
queue.send(msg2.clone()).unwrap();
|
||||||
|
let result2 = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
|
assert_eq!(result2.get_message_id(), msg2.get_message_id());
|
||||||
|
assert_eq!(result2.get_document_id(), msg2.get_document_id());
|
||||||
|
match result2.get_action() {
|
||||||
|
MsgAction::Reply(_) => {}
|
||||||
|
_ => unreachable!("got {:?}: should have been a reply.", result1.get_action()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn does_duplicates_generate_error() {
|
||||||
|
let name = "duplicate";
|
||||||
|
let routes = [
|
||||||
|
RouteRequest::new(Include::All, Include::All, Include::Some(Action::Reply)),
|
||||||
|
RouteRequest::new(Include::All, Include::All, Include::Some(Action::Error)),
|
||||||
|
]
|
||||||
|
.to_vec();
|
||||||
|
let (queue, rx) = setup_create_doc(routes);
|
||||||
|
let msg = Message::new(name, MsgAction::Create(DocDef::new()));
|
||||||
|
queue.send(msg.clone()).unwrap();
|
||||||
|
rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
|
queue.send(msg.clone()).unwrap();
|
||||||
|
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
|
assert_eq!(result.get_message_id(), msg.get_message_id());
|
||||||
|
assert_eq!(result.get_document_id(), msg.get_document_id());
|
||||||
|
match result.get_action() {
|
||||||
|
MsgAction::Error(err) => match err {
|
||||||
|
MTTError::DocumentAlreadyExists(data) => assert_eq!(data, name),
|
||||||
|
_ => unreachable!("got {:?}: should have been a reply.", err),
|
||||||
|
},
|
||||||
|
_ => unreachable!("got {:?}: should have been a reply.", result.get_action()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user