Removed the unnecessary Result on queue send.
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1s
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1s
This commit is contained in:
parent
5833d20ea6
commit
02c533b53b
@ -27,7 +27,7 @@ impl Clock {
|
|||||||
let id = queue.add_sender(tx);
|
let id = queue.add_sender(tx);
|
||||||
let reg_msg = Register::new(id, RegMsg::AddDocName([Name::english("clock")].to_vec()));
|
let reg_msg = Register::new(id, RegMsg::AddDocName([Name::english("clock")].to_vec()));
|
||||||
let msg = Message::new(NameType::None, reg_msg.clone());
|
let msg = Message::new(NameType::None, reg_msg.clone());
|
||||||
queue.send(msg).unwrap();
|
queue.send(msg);
|
||||||
rx.recv().unwrap();
|
rx.recv().unwrap();
|
||||||
spawn(move || {
|
spawn(move || {
|
||||||
clock.listen();
|
clock.listen();
|
||||||
@ -36,12 +36,10 @@ impl Clock {
|
|||||||
|
|
||||||
fn listen(&self) {
|
fn listen(&self) {
|
||||||
loop {
|
loop {
|
||||||
self.queue
|
self.queue.send(Message::new(
|
||||||
.send(Message::new(
|
Name::english("clock"),
|
||||||
Name::english("clock"),
|
MsgAction::OnUpdate(Records::new(Names::new())),
|
||||||
MsgAction::OnUpdate(Records::new(Names::new())),
|
));
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
sleep(Duration::from_secs(1));
|
sleep(Duration::from_secs(1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -64,7 +62,7 @@ mod clocks {
|
|||||||
id.clone(),
|
id.clone(),
|
||||||
RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)),
|
RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)),
|
||||||
);
|
);
|
||||||
queue.send(Message::new(NameType::None, request)).unwrap();
|
queue.send(Message::new(NameType::None, request));
|
||||||
rx.recv_timeout(TIMEOUT).unwrap();
|
rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
let mut holder: Vec<Message> = Vec::new();
|
let mut holder: Vec<Message> = Vec::new();
|
||||||
let start = Utc::now();
|
let start = Utc::now();
|
||||||
@ -76,9 +74,7 @@ mod clocks {
|
|||||||
assert!((end - start) > TimeDelta::seconds(1));
|
assert!((end - start) > TimeDelta::seconds(1));
|
||||||
assert!((end - start) < TimeDelta::seconds(2));
|
assert!((end - start) < TimeDelta::seconds(2));
|
||||||
let reg_request = Register::new(id, RegMsg::GetNameID(Name::english("clock")));
|
let reg_request = Register::new(id, RegMsg::GetNameID(Name::english("clock")));
|
||||||
queue
|
queue.send(Message::new(NameType::None, reg_request));
|
||||||
.send(Message::new(NameType::None, reg_request))
|
|
||||||
.unwrap();
|
|
||||||
rx.recv_timeout(TIMEOUT).unwrap();
|
rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
for msg in holder.iter() {
|
for msg in holder.iter() {
|
||||||
let action = msg.get_action();
|
let action = msg.get_action();
|
||||||
|
|||||||
@ -45,7 +45,7 @@ impl CreateDoc {
|
|||||||
let id = queue.add_sender(tx);
|
let id = queue.add_sender(tx);
|
||||||
for route in routes.iter() {
|
for route in routes.iter() {
|
||||||
let regmsg = Register::new(id.clone(), RegMsg::AddRoute(route.clone()));
|
let regmsg = Register::new(id.clone(), RegMsg::AddRoute(route.clone()));
|
||||||
queue.send(Message::new(NameType::None, regmsg)).unwrap();
|
queue.send(Message::new(NameType::None, regmsg));
|
||||||
rx.recv().unwrap();
|
rx.recv().unwrap();
|
||||||
}
|
}
|
||||||
let doc = CreateDoc::new(queue, rx);
|
let doc = CreateDoc::new(queue, rx);
|
||||||
@ -97,9 +97,7 @@ mod createdocs {
|
|||||||
fn register_paths(&self, paths: Vec<Path>) {
|
fn register_paths(&self, paths: Vec<Path>) {
|
||||||
for path in paths.iter() {
|
for path in paths.iter() {
|
||||||
let regmsg = Register::new(self.rx_id.clone(), RegMsg::AddRoute(path.clone()));
|
let regmsg = Register::new(self.rx_id.clone(), RegMsg::AddRoute(path.clone()));
|
||||||
self.queue
|
self.queue.send(Message::new(NameType::None, regmsg));
|
||||||
.send(Message::new(NameType::None, regmsg))
|
|
||||||
.unwrap();
|
|
||||||
self.rx.recv_timeout(TIMEOUT).unwrap();
|
self.rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -118,7 +116,7 @@ mod createdocs {
|
|||||||
let rx = doc_creator.get_receiver();
|
let rx = doc_creator.get_receiver();
|
||||||
let name = Name::english("project");
|
let name = Name::english("project");
|
||||||
let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone())));
|
let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone())));
|
||||||
queue.send(msg1.clone()).unwrap();
|
queue.send(msg1.clone());
|
||||||
let result1 = rx.recv_timeout(TIMEOUT).unwrap();
|
let result1 = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result1.get_message_id(),
|
result1.get_message_id(),
|
||||||
@ -131,7 +129,7 @@ mod createdocs {
|
|||||||
_ => unreachable!("got {:?}: should have been a reply.", result1.get_action()),
|
_ => unreachable!("got {:?}: should have been a reply.", result1.get_action()),
|
||||||
}
|
}
|
||||||
let msg2 = Message::new(name, Query::new());
|
let msg2 = Message::new(name, Query::new());
|
||||||
queue.send(msg2.clone()).unwrap();
|
queue.send(msg2.clone());
|
||||||
let result2 = rx.recv_timeout(TIMEOUT).unwrap();
|
let result2 = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
assert_eq!(result2.get_message_id(), msg2.get_message_id());
|
assert_eq!(result2.get_message_id(), msg2.get_message_id());
|
||||||
match result2.get_action() {
|
match result2.get_action() {
|
||||||
@ -156,8 +154,8 @@ mod createdocs {
|
|||||||
let name = Name::english("duplicate");
|
let name = Name::english("duplicate");
|
||||||
let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone())));
|
let msg1 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone())));
|
||||||
let msg2 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone())));
|
let msg2 = Message::new(NameType::None, MsgAction::Create(DocDef::new(name.clone())));
|
||||||
queue.send(msg1.clone()).unwrap();
|
queue.send(msg1.clone());
|
||||||
queue.send(msg2.clone()).unwrap();
|
queue.send(msg2.clone());
|
||||||
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
assert_eq!(result.get_message_id(), msg2.get_message_id());
|
assert_eq!(result.get_message_id(), msg2.get_message_id());
|
||||||
match result.get_action() {
|
match result.get_action() {
|
||||||
@ -497,14 +495,14 @@ impl DocumentFile {
|
|||||||
let id = queue.add_sender(tx);
|
let id = queue.add_sender(tx);
|
||||||
let reg_msg = Register::new(id, RegMsg::AddDocName(names.clone()));
|
let reg_msg = Register::new(id, RegMsg::AddDocName(names.clone()));
|
||||||
let rmsg = msg.response(reg_msg.clone());
|
let rmsg = msg.response(reg_msg.clone());
|
||||||
queue.send(rmsg.clone()).unwrap();
|
queue.send(rmsg.clone());
|
||||||
let name_result = rx.recv().unwrap();
|
let name_result = rx.recv().unwrap();
|
||||||
let name_id = match name_result.get_action() {
|
let name_id = match name_result.get_action() {
|
||||||
MsgAction::Register(data) => match data.get_msg() {
|
MsgAction::Register(data) => match data.get_msg() {
|
||||||
RegMsg::DocumentNameID(data) => data,
|
RegMsg::DocumentNameID(data) => data,
|
||||||
RegMsg::Error(err) => {
|
RegMsg::Error(err) => {
|
||||||
queue.remove_sender(&id);
|
queue.remove_sender(&id);
|
||||||
queue.send(msg.response(err.clone())).unwrap();
|
queue.send(msg.response(err.clone()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
_ => unreachable!("should only return a name id or an error"),
|
_ => unreachable!("should only return a name id or an error"),
|
||||||
@ -515,14 +513,14 @@ impl DocumentFile {
|
|||||||
for path_action in docdef.iter_routes() {
|
for path_action in docdef.iter_routes() {
|
||||||
let request = reg_msg.response(RegMsg::AddRoute(path_action.path()));
|
let request = reg_msg.response(RegMsg::AddRoute(path_action.path()));
|
||||||
let add_route = rmsg.response(request);
|
let add_route = rmsg.response(request);
|
||||||
queue.send(add_route).unwrap();
|
queue.send(add_route);
|
||||||
let result = rx.recv().unwrap();
|
let result = rx.recv().unwrap();
|
||||||
let route_id = match result.get_action() {
|
let route_id = match result.get_action() {
|
||||||
MsgAction::Register(data) => match data.get_msg() {
|
MsgAction::Register(data) => match data.get_msg() {
|
||||||
RegMsg::RouteID(data) => data,
|
RegMsg::RouteID(data) => data,
|
||||||
RegMsg::Error(err) => {
|
RegMsg::Error(err) => {
|
||||||
queue.remove_sender(&id);
|
queue.remove_sender(&id);
|
||||||
queue.send(msg.response(err.clone())).unwrap();
|
queue.send(msg.response(err.clone()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
_ => unreachable!("should only return a route id or an error"),
|
_ => unreachable!("should only return a route id or an error"),
|
||||||
@ -536,7 +534,7 @@ impl DocumentFile {
|
|||||||
doc.listen();
|
doc.listen();
|
||||||
});
|
});
|
||||||
let reply = msg.response(Reply::new());
|
let reply = msg.response(Reply::new());
|
||||||
queue.send(reply.clone()).unwrap();
|
queue.send(reply.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn listen(&mut self) {
|
fn listen(&mut self) {
|
||||||
@ -549,7 +547,7 @@ impl DocumentFile {
|
|||||||
DocFuncType::Add => self.add_document(&msg),
|
DocFuncType::Add => self.add_document(&msg),
|
||||||
DocFuncType::Delete => self.delete(&msg),
|
DocFuncType::Delete => self.delete(&msg),
|
||||||
DocFuncType::Query => self.query(&msg),
|
DocFuncType::Query => self.query(&msg),
|
||||||
DocFuncType::Show => self.queue.send(msg.response(Reply::new())).unwrap(),
|
DocFuncType::Show => self.queue.send(msg.response(Reply::new())),
|
||||||
DocFuncType::Update => self.update(&msg),
|
DocFuncType::Update => self.update(&msg),
|
||||||
DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action),
|
DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action),
|
||||||
DocFuncType::Trigger(action) => self.trigger(&msg, action),
|
DocFuncType::Trigger(action) => self.trigger(&msg, action),
|
||||||
@ -589,7 +587,7 @@ impl DocumentFile {
|
|||||||
Ok(id) => id,
|
Ok(id) => id,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let reply = msg.response(err);
|
let reply = msg.response(err);
|
||||||
self.queue.send(reply).unwrap();
|
self.queue.send(reply);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -604,7 +602,7 @@ impl DocumentFile {
|
|||||||
Ok(data) => data,
|
Ok(data) => data,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let reply = msg.response(err);
|
let reply = msg.response(err);
|
||||||
self.queue.send(reply).unwrap();
|
self.queue.send(reply);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -623,10 +621,9 @@ impl DocumentFile {
|
|||||||
self.docs.insert(oid.clone(), holder.clone());
|
self.docs.insert(oid.clone(), holder.clone());
|
||||||
records.insert(oid, holder);
|
records.insert(oid, holder);
|
||||||
}
|
}
|
||||||
self.queue.send(msg.response(records.clone())).unwrap();
|
self.queue.send(msg.response(records.clone()));
|
||||||
self.queue
|
self.queue
|
||||||
.send(msg.response(MsgAction::OnAddition(records)))
|
.send(msg.response(MsgAction::OnAddition(records)));
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete(&mut self, msg: &Message) {
|
fn delete(&mut self, msg: &Message) {
|
||||||
@ -638,7 +635,7 @@ impl DocumentFile {
|
|||||||
Ok(data) => data,
|
Ok(data) => data,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let reply = msg.response(err);
|
let reply = msg.response(err);
|
||||||
self.queue.send(reply).unwrap();
|
self.queue.send(reply);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -649,10 +646,8 @@ impl DocumentFile {
|
|||||||
self.docs.remove(oid);
|
self.docs.remove(oid);
|
||||||
}
|
}
|
||||||
let rec = Records::with_data(self.docdef.get_field_names().clone(), records);
|
let rec = Records::with_data(self.docdef.get_field_names().clone(), records);
|
||||||
self.queue.send(msg.response(rec.clone())).unwrap();
|
self.queue.send(msg.response(rec.clone()));
|
||||||
self.queue
|
self.queue.send(msg.response(MsgAction::OnDelete(rec)));
|
||||||
.send(msg.response(MsgAction::OnDelete(rec)))
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_query(&self, query: &Query) -> Result<InternalRecords, MTTError> {
|
fn run_query(&self, query: &Query) -> Result<InternalRecords, MTTError> {
|
||||||
@ -715,15 +710,13 @@ impl DocumentFile {
|
|||||||
Ok(data) => data,
|
Ok(data) => data,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let reply = msg.response(err);
|
let reply = msg.response(err);
|
||||||
self.queue.send(reply).unwrap();
|
self.queue.send(reply);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let recs = Records::with_data(self.docdef.get_field_names().clone(), records);
|
let recs = Records::with_data(self.docdef.get_field_names().clone(), records);
|
||||||
self.queue.send(msg.response(recs.clone())).unwrap();
|
self.queue.send(msg.response(recs.clone()));
|
||||||
self.queue
|
self.queue.send(msg.response(MsgAction::OnQuery(recs)));
|
||||||
.send(msg.response(MsgAction::OnQuery(recs)))
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_update(
|
fn run_update(
|
||||||
@ -770,8 +763,7 @@ impl DocumentFile {
|
|||||||
}
|
}
|
||||||
let recs = Records::with_data(self.docdef.get_field_names().clone(), updates);
|
let recs = Records::with_data(self.docdef.get_field_names().clone(), updates);
|
||||||
self.queue
|
self.queue
|
||||||
.send(msg.response(MsgAction::OnUpdate(recs.clone())))
|
.send(msg.response(MsgAction::OnUpdate(recs.clone())));
|
||||||
.unwrap();
|
|
||||||
Ok(recs)
|
Ok(recs)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -784,7 +776,7 @@ impl DocumentFile {
|
|||||||
Ok(result) => result,
|
Ok(result) => result,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let reply = msg.response(err);
|
let reply = msg.response(err);
|
||||||
self.queue.send(reply).unwrap();
|
self.queue.send(reply);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -792,11 +784,11 @@ impl DocumentFile {
|
|||||||
Ok(output) => output,
|
Ok(output) => output,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let reply = msg.response(err);
|
let reply = msg.response(err);
|
||||||
self.queue.send(reply).unwrap();
|
self.queue.send(reply);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.queue.send(msg.response(data)).unwrap();
|
self.queue.send(msg.response(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn existing_query(&mut self, msg: &Message, action: &MsgAction) {
|
fn existing_query(&mut self, msg: &Message, action: &MsgAction) {
|
||||||
@ -814,8 +806,7 @@ impl DocumentFile {
|
|||||||
|
|
||||||
fn trigger(&self, msg: &Message, action: &MsgAction) {
|
fn trigger(&self, msg: &Message, action: &MsgAction) {
|
||||||
self.queue
|
self.queue
|
||||||
.send(msg.forward(self.name_id.clone(), action.clone()))
|
.send(msg.forward(self.name_id.clone(), action.clone()));
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -890,12 +881,12 @@ mod document_files {
|
|||||||
self.sender_id.clone()
|
self.sender_id.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send<A>(&self, action: A) -> Result<(), MTTError>
|
fn send<A>(&self, action: A)
|
||||||
where
|
where
|
||||||
A: Into<MsgAction>,
|
A: Into<MsgAction>,
|
||||||
{
|
{
|
||||||
let msg = Message::new(self.docdef.get_document_names()[0].clone(), action);
|
let msg = Message::new(self.docdef.get_document_names()[0].clone(), action);
|
||||||
self.queue.send(msg)
|
self.queue.send(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(&mut self, routes: Vec<Path>) {
|
fn start(&mut self, routes: Vec<Path>) {
|
||||||
@ -908,7 +899,7 @@ mod document_files {
|
|||||||
let request =
|
let request =
|
||||||
Register::new(self.sender_id.clone(), RegMsg::AddRoute(route.clone()));
|
Register::new(self.sender_id.clone(), RegMsg::AddRoute(route.clone()));
|
||||||
let add_route = Message::new(NameType::None, request);
|
let add_route = Message::new(NameType::None, request);
|
||||||
self.queue.send(add_route).unwrap();
|
self.queue.send(add_route);
|
||||||
self.rx.recv().unwrap();
|
self.rx.recv().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -923,7 +914,7 @@ mod document_files {
|
|||||||
);
|
);
|
||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
self.send(add).unwrap();
|
self.send(add);
|
||||||
match self.rx.recv_timeout(TIMEOUT) {
|
match self.rx.recv_timeout(TIMEOUT) {
|
||||||
Ok(_) => {} // eats the addition response.
|
Ok(_) => {} // eats the addition response.
|
||||||
Err(err) => match err {
|
Err(err) => match err {
|
||||||
@ -957,7 +948,7 @@ mod document_files {
|
|||||||
test_doc.start(standard_paths());
|
test_doc.start(standard_paths());
|
||||||
let docdef = DocDef::new(alt);
|
let docdef = DocDef::new(alt);
|
||||||
let msg = Message::new(name.clone(), docdef);
|
let msg = Message::new(name.clone(), docdef);
|
||||||
test_doc.get_queue().send(msg).unwrap();
|
test_doc.get_queue().send(msg);
|
||||||
match test_doc.get_receiver().recv_timeout(TIMEOUT) {
|
match test_doc.get_receiver().recv_timeout(TIMEOUT) {
|
||||||
Ok(msg) => unreachable!("should not receive: {:?}", msg),
|
Ok(msg) => unreachable!("should not receive: {:?}", msg),
|
||||||
Err(err) => match err {
|
Err(err) => match err {
|
||||||
@ -983,7 +974,7 @@ mod document_files {
|
|||||||
];
|
];
|
||||||
for msg_action in msg_actions.iter() {
|
for msg_action in msg_actions.iter() {
|
||||||
let msg = Message::new(name.clone(), msg_action.clone());
|
let msg = Message::new(name.clone(), msg_action.clone());
|
||||||
queue.send(msg.clone()).unwrap();
|
queue.send(msg.clone());
|
||||||
let result = match test_doc.get_receiver().recv_timeout(TIMEOUT) {
|
let result = match test_doc.get_receiver().recv_timeout(TIMEOUT) {
|
||||||
Ok(data) => data.clone(),
|
Ok(data) => data.clone(),
|
||||||
Err(err) => unreachable!("for {:?} got {:?}", msg_action, err),
|
Err(err) => unreachable!("for {:?} got {:?}", msg_action, err),
|
||||||
@ -1023,7 +1014,7 @@ mod document_files {
|
|||||||
RegMsg::AddDocName([alt.clone()].to_vec()),
|
RegMsg::AddDocName([alt.clone()].to_vec()),
|
||||||
);
|
);
|
||||||
let setup = Message::new(NameType::None, reg_msg.clone());
|
let setup = Message::new(NameType::None, reg_msg.clone());
|
||||||
queue.send(setup).unwrap();
|
queue.send(setup);
|
||||||
test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let msg_actions = [
|
let msg_actions = [
|
||||||
MsgAction::Addition(Addition::new()),
|
MsgAction::Addition(Addition::new()),
|
||||||
@ -1037,7 +1028,7 @@ mod document_files {
|
|||||||
for msg_action in msg_actions.iter() {
|
for msg_action in msg_actions.iter() {
|
||||||
let msg = Message::new(alt.clone(), msg_action.clone());
|
let msg = Message::new(alt.clone(), msg_action.clone());
|
||||||
msgs.insert(msg.get_message_id().clone(), msg_action.clone());
|
msgs.insert(msg.get_message_id().clone(), msg_action.clone());
|
||||||
queue.send(msg).unwrap();
|
queue.send(msg);
|
||||||
}
|
}
|
||||||
match test_doc.get_receiver().recv_timeout(TIMEOUT) {
|
match test_doc.get_receiver().recv_timeout(TIMEOUT) {
|
||||||
Ok(msg) => unreachable!(
|
Ok(msg) => unreachable!(
|
||||||
@ -1074,7 +1065,7 @@ mod document_files {
|
|||||||
test_doc.populate([item.clone()].to_vec());
|
test_doc.populate([item.clone()].to_vec());
|
||||||
}
|
}
|
||||||
let msg = Message::new(doc_name.clone(), Query::new());
|
let msg = Message::new(doc_name.clone(), Query::new());
|
||||||
queue.send(msg.clone()).unwrap();
|
queue.send(msg.clone());
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result.get_message_id(),
|
result.get_message_id(),
|
||||||
@ -1113,7 +1104,7 @@ mod document_files {
|
|||||||
let mut add = Addition::new();
|
let mut add = Addition::new();
|
||||||
add.add_field(field_name.clone(), data.clone());
|
add.add_field(field_name.clone(), data.clone());
|
||||||
let msg = Message::new(doc_name.clone(), add);
|
let msg = Message::new(doc_name.clone(), add);
|
||||||
queue.send(msg.clone()).unwrap();
|
queue.send(msg.clone());
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result.get_message_id(),
|
result.get_message_id(),
|
||||||
@ -1153,7 +1144,7 @@ mod document_files {
|
|||||||
test_doc.populate([item.clone()].to_vec());
|
test_doc.populate([item.clone()].to_vec());
|
||||||
}
|
}
|
||||||
let msg = Message::new(doc_name.clone(), Delete::new(Query::new()));
|
let msg = Message::new(doc_name.clone(), Delete::new(Query::new()));
|
||||||
queue.send(msg.clone()).unwrap();
|
queue.send(msg.clone());
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result.get_message_id(),
|
result.get_message_id(),
|
||||||
@ -1203,7 +1194,7 @@ mod document_files {
|
|||||||
.get_values_mut()
|
.get_values_mut()
|
||||||
.add_field(field_name.clone(), Uuid::nil());
|
.add_field(field_name.clone(), Uuid::nil());
|
||||||
let msg = Message::new(doc_name.clone(), update);
|
let msg = Message::new(doc_name.clone(), update);
|
||||||
queue.send(msg.clone()).unwrap();
|
queue.send(msg.clone());
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
result.get_message_id(),
|
result.get_message_id(),
|
||||||
@ -1239,7 +1230,7 @@ mod document_files {
|
|||||||
let mut new_doc = Addition::new();
|
let mut new_doc = Addition::new();
|
||||||
new_doc.add_field(name.clone(), data.clone());
|
new_doc.add_field(name.clone(), data.clone());
|
||||||
let testing = |msg: Message| {
|
let testing = |msg: Message| {
|
||||||
queue.send(msg.clone()).unwrap();
|
queue.send(msg.clone());
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
assert_eq!(result.get_message_id(), msg.get_message_id());
|
assert_eq!(result.get_message_id(), msg.get_message_id());
|
||||||
match result.get_action() {
|
match result.get_action() {
|
||||||
@ -1276,10 +1267,10 @@ mod document_files {
|
|||||||
for i in 0..count {
|
for i in 0..count {
|
||||||
let mut new_doc = Addition::new();
|
let mut new_doc = Addition::new();
|
||||||
new_doc.add_field(name.clone(), i);
|
new_doc.add_field(name.clone(), i);
|
||||||
queue.send(Message::new(doc_name.clone(), new_doc)).unwrap();
|
queue.send(Message::new(doc_name.clone(), new_doc));
|
||||||
test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
}
|
}
|
||||||
queue.send(Message::new(doc_name, Query::new())).unwrap();
|
queue.send(Message::new(doc_name, Query::new()));
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
let mut entries: HashSet<i128> = (0..count).collect();
|
let mut entries: HashSet<i128> = (0..count).collect();
|
||||||
@ -1319,12 +1310,10 @@ mod document_files {
|
|||||||
let name = Name::english("bad");
|
let name = Name::english("bad");
|
||||||
let mut addition = Addition::new();
|
let mut addition = Addition::new();
|
||||||
addition.add_field(name.clone(), "doesn't matter");
|
addition.add_field(name.clone(), "doesn't matter");
|
||||||
queue
|
queue.send(Message::new(
|
||||||
.send(Message::new(
|
test_doc.get_docdef().get_document_names()[0].clone(),
|
||||||
test_doc.get_docdef().get_document_names()[0].clone(),
|
addition,
|
||||||
addition,
|
));
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
match result.get_action() {
|
match result.get_action() {
|
||||||
MsgAction::Error(err) => match err {
|
MsgAction::Error(err) => match err {
|
||||||
@ -1342,12 +1331,10 @@ mod document_files {
|
|||||||
let queue = test_doc.get_queue();
|
let queue = test_doc.get_queue();
|
||||||
let mut addition = Addition::new();
|
let mut addition = Addition::new();
|
||||||
addition.add_field(Name::english("field0"), "string");
|
addition.add_field(Name::english("field0"), "string");
|
||||||
queue
|
queue.send(Message::new(
|
||||||
.send(Message::new(
|
test_doc.get_docdef().get_document_names()[0].clone(),
|
||||||
test_doc.get_docdef().get_document_names()[0].clone(),
|
addition,
|
||||||
addition,
|
));
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
match result.get_action() {
|
match result.get_action() {
|
||||||
MsgAction::Error(err) => match err {
|
MsgAction::Error(err) => match err {
|
||||||
@ -1371,12 +1358,10 @@ mod document_files {
|
|||||||
let queue = test_doc.get_queue();
|
let queue = test_doc.get_queue();
|
||||||
let mut addition = Addition::new();
|
let mut addition = Addition::new();
|
||||||
addition.add_field(Name::english("field0"), 1);
|
addition.add_field(Name::english("field0"), 1);
|
||||||
queue
|
queue.send(Message::new(
|
||||||
.send(Message::new(
|
test_doc.get_docdef().get_document_names()[0].clone(),
|
||||||
test_doc.get_docdef().get_document_names()[0].clone(),
|
addition,
|
||||||
addition,
|
));
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
match result.get_action() {
|
match result.get_action() {
|
||||||
MsgAction::Error(err) => match err {
|
MsgAction::Error(err) => match err {
|
||||||
@ -1403,12 +1388,10 @@ mod document_files {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
let mut query = Query::new();
|
let mut query = Query::new();
|
||||||
query.add(Name::english("field0"), calc);
|
query.add(Name::english("field0"), calc);
|
||||||
queue
|
queue.send(Message::new(
|
||||||
.send(Message::new(
|
test_doc.get_docdef().get_document_names()[0].clone(),
|
||||||
test_doc.get_docdef().get_document_names()[0].clone(),
|
query,
|
||||||
query,
|
));
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1441,12 +1424,10 @@ mod document_files {
|
|||||||
calc.add_value(1).unwrap();
|
calc.add_value(1).unwrap();
|
||||||
let mut query = Query::new();
|
let mut query = Query::new();
|
||||||
query.add(Name::english("field0"), calc);
|
query.add(Name::english("field0"), calc);
|
||||||
queue
|
queue.send(Message::new(
|
||||||
.send(Message::new(
|
test_doc.get_docdef().get_document_names()[0].clone(),
|
||||||
test_doc.get_docdef().get_document_names()[0].clone(),
|
query,
|
||||||
query,
|
));
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1483,12 +1464,10 @@ mod document_files {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
let mut query = Query::new();
|
let mut query = Query::new();
|
||||||
query.add(Name::english("field0"), calc);
|
query.add(Name::english("field0"), calc);
|
||||||
queue
|
queue.send(Message::new(
|
||||||
.send(Message::new(
|
test_doc.get_docdef().get_document_names()[0].clone(),
|
||||||
test_doc.get_docdef().get_document_names()[0].clone(),
|
query,
|
||||||
query,
|
));
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1533,7 +1512,7 @@ mod document_files {
|
|||||||
calc.add_value(CalcValue::Existing(FieldType::StaticString))
|
calc.add_value(CalcValue::Existing(FieldType::StaticString))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
query.add(Name::english("field1"), calc);
|
query.add(Name::english("field1"), calc);
|
||||||
doc.send(query).unwrap();
|
doc.send(query);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1582,7 +1561,7 @@ mod document_files {
|
|||||||
calc.add_value(CalcValue::Existing(FieldType::StaticString))
|
calc.add_value(CalcValue::Existing(FieldType::StaticString))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
query.add(Name::english("field1"), calc);
|
query.add(Name::english("field1"), calc);
|
||||||
doc.send(query).unwrap();
|
doc.send(query);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1628,7 +1607,7 @@ mod document_files {
|
|||||||
calc.add_value(CalcValue::Existing(FieldType::StaticString))
|
calc.add_value(CalcValue::Existing(FieldType::StaticString))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
query.add(Name::english("field1"), calc);
|
query.add(Name::english("field1"), calc);
|
||||||
doc.send(query).unwrap();
|
doc.send(query);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1658,7 +1637,7 @@ mod document_files {
|
|||||||
calc.add_value("something").unwrap();
|
calc.add_value("something").unwrap();
|
||||||
query.add(field_name.clone(), calc);
|
query.add(field_name.clone(), calc);
|
||||||
let msg = Message::new(doc_name, query);
|
let msg = Message::new(doc_name, query);
|
||||||
queue.send(msg).unwrap();
|
queue.send(msg);
|
||||||
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1682,7 +1661,7 @@ mod document_files {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
let mut query = Query::new();
|
let mut query = Query::new();
|
||||||
query.add(Name::english("field0"), calc);
|
query.add(Name::english("field0"), calc);
|
||||||
doc.send(query).unwrap();
|
doc.send(query);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1705,7 +1684,7 @@ mod document_files {
|
|||||||
calc.add_value("notUUID").unwrap();
|
calc.add_value("notUUID").unwrap();
|
||||||
let mut query = Query::new();
|
let mut query = Query::new();
|
||||||
query.add(Name::english("field0"), calc);
|
query.add(Name::english("field0"), calc);
|
||||||
doc.send(query).unwrap();
|
doc.send(query);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1732,7 +1711,7 @@ mod document_files {
|
|||||||
let rx = test_doc.get_receiver();
|
let rx = test_doc.get_receiver();
|
||||||
let new_doc = Addition::new();
|
let new_doc = Addition::new();
|
||||||
let msg = Message::new(doc_name, new_doc);
|
let msg = Message::new(doc_name, new_doc);
|
||||||
queue.send(msg).unwrap();
|
queue.send(msg);
|
||||||
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1760,7 +1739,7 @@ mod document_files {
|
|||||||
let rx = test_doc.get_receiver();
|
let rx = test_doc.get_receiver();
|
||||||
let new_doc = Addition::new();
|
let new_doc = Addition::new();
|
||||||
let msg = Message::new(doc_name, new_doc);
|
let msg = Message::new(doc_name, new_doc);
|
||||||
queue.send(msg).unwrap();
|
queue.send(msg);
|
||||||
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1789,7 +1768,7 @@ mod document_files {
|
|||||||
let mut new_doc = Addition::new();
|
let mut new_doc = Addition::new();
|
||||||
new_doc.add_field(&field_name, Uuid::nil());
|
new_doc.add_field(&field_name, Uuid::nil());
|
||||||
let msg = Message::new(doc_name, new_doc);
|
let msg = Message::new(doc_name, new_doc);
|
||||||
queue.send(msg).unwrap();
|
queue.send(msg);
|
||||||
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1828,7 +1807,7 @@ mod document_files {
|
|||||||
update
|
update
|
||||||
.get_values_mut()
|
.get_values_mut()
|
||||||
.add_field(Name::english("field0"), Uuid::nil());
|
.add_field(Name::english("field0"), Uuid::nil());
|
||||||
doc.send(update).unwrap();
|
doc.send(update);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1857,7 +1836,7 @@ mod document_files {
|
|||||||
.get_values_mut()
|
.get_values_mut()
|
||||||
.add_field(Name::english("field1"), new);
|
.add_field(Name::english("field1"), new);
|
||||||
let mut testing = |msg: Message| {
|
let mut testing = |msg: Message| {
|
||||||
doc.get_queue().send(msg.clone()).unwrap();
|
doc.get_queue().send(msg.clone());
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
assert_eq!(result.get_message_id(), msg.get_message_id());
|
assert_eq!(result.get_message_id(), msg.get_message_id());
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
@ -1899,9 +1878,7 @@ mod document_files {
|
|||||||
update
|
update
|
||||||
.get_values_mut()
|
.get_values_mut()
|
||||||
.add_field(Name::english("field1"), new);
|
.add_field(Name::english("field1"), new);
|
||||||
doc.get_queue()
|
doc.get_queue().send(Message::new(doc_name.clone(), update));
|
||||||
.send(Message::new(doc_name.clone(), update))
|
|
||||||
.unwrap();
|
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1915,8 +1892,7 @@ mod document_files {
|
|||||||
_ => unreachable!("got {:?}: should have gotten a reply", action),
|
_ => unreachable!("got {:?}: should have gotten a reply", action),
|
||||||
}
|
}
|
||||||
doc.get_queue()
|
doc.get_queue()
|
||||||
.send(Message::new(doc_name.clone(), Query::new()))
|
.send(Message::new(doc_name.clone(), Query::new()));
|
||||||
.unwrap();
|
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1967,7 +1943,7 @@ mod document_files {
|
|||||||
.get_values_mut()
|
.get_values_mut()
|
||||||
.add_field(Name::english("field1"), new);
|
.add_field(Name::english("field1"), new);
|
||||||
let mut testing = |msg: Message| {
|
let mut testing = |msg: Message| {
|
||||||
doc.get_queue().send(msg).unwrap();
|
doc.get_queue().send(msg);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -1996,7 +1972,7 @@ mod document_files {
|
|||||||
doc.populate([id.into(), old.into()].to_vec());
|
doc.populate([id.into(), old.into()].to_vec());
|
||||||
let mut update = Update::new(Query::new());
|
let mut update = Update::new(Query::new());
|
||||||
update.get_values_mut().add_field(bad_name.clone(), new);
|
update.get_values_mut().add_field(bad_name.clone(), new);
|
||||||
doc.send(update).unwrap();
|
doc.send(update);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2027,7 +2003,7 @@ mod document_files {
|
|||||||
update
|
update
|
||||||
.get_values_mut()
|
.get_values_mut()
|
||||||
.add_field(Name::english("field1"), new);
|
.add_field(Name::english("field1"), new);
|
||||||
doc.send(update).unwrap();
|
doc.send(update);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2063,13 +2039,13 @@ mod document_files {
|
|||||||
query.add(Name::english("field0"), calc);
|
query.add(Name::english("field0"), calc);
|
||||||
let mut update = Update::new(query);
|
let mut update = Update::new(query);
|
||||||
update.get_values_mut().add_field(&fname, new);
|
update.get_values_mut().add_field(&fname, new);
|
||||||
test_doc.send(update).unwrap();
|
test_doc.send(update);
|
||||||
test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let mut should_clear = Addition::new();
|
let mut should_clear = Addition::new();
|
||||||
should_clear.add_field(fname.clone(), old);
|
should_clear.add_field(fname.clone(), old);
|
||||||
let mut should_error = Addition::new();
|
let mut should_error = Addition::new();
|
||||||
should_error.add_field(fname.clone(), new);
|
should_error.add_field(fname.clone(), new);
|
||||||
test_doc.send(should_clear).unwrap();
|
test_doc.send(should_clear);
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2081,7 +2057,7 @@ mod document_files {
|
|||||||
}
|
}
|
||||||
_ => unreachable!("got {:?}: should have gotten records", action),
|
_ => unreachable!("got {:?}: should have gotten records", action),
|
||||||
}
|
}
|
||||||
test_doc.send(should_error).unwrap();
|
test_doc.send(should_error);
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2112,9 +2088,9 @@ mod document_files {
|
|||||||
let mut good_addition = Addition::new();
|
let mut good_addition = Addition::new();
|
||||||
good_addition.add_field(&f0name, f0data.clone());
|
good_addition.add_field(&f0name, f0data.clone());
|
||||||
good_addition.add_field(&f1name, f1good_data.clone());
|
good_addition.add_field(&f1name, f1good_data.clone());
|
||||||
test_doc.send(bad_addition).unwrap();
|
test_doc.send(bad_addition);
|
||||||
test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
test_doc.send(good_addition).unwrap();
|
test_doc.send(good_addition);
|
||||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2150,11 +2126,11 @@ mod document_files {
|
|||||||
query.add(Name::english("field0"), calc);
|
query.add(Name::english("field0"), calc);
|
||||||
let mut update = Update::new(query);
|
let mut update = Update::new(query);
|
||||||
update.get_values_mut().add_field(fname.clone(), new);
|
update.get_values_mut().add_field(fname.clone(), new);
|
||||||
doc.send(update).unwrap();
|
doc.send(update);
|
||||||
doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let mut old_addition = Addition::new();
|
let mut old_addition = Addition::new();
|
||||||
old_addition.add_field(&fname, old);
|
old_addition.add_field(&fname, old);
|
||||||
doc.send(old_addition).unwrap();
|
doc.send(old_addition);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2168,7 +2144,7 @@ mod document_files {
|
|||||||
}
|
}
|
||||||
let mut new_addition = Addition::new();
|
let mut new_addition = Addition::new();
|
||||||
new_addition.add_field(fname.clone(), new);
|
new_addition.add_field(fname.clone(), new);
|
||||||
doc.send(new_addition).unwrap();
|
doc.send(new_addition);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2209,7 +2185,7 @@ mod document_files {
|
|||||||
query.add(&f1name, calc);
|
query.add(&f1name, calc);
|
||||||
let mut update = Update::new(query);
|
let mut update = Update::new(query);
|
||||||
update.get_values_mut().add_field(&f0name, holder.clone());
|
update.get_values_mut().add_field(&f0name, holder.clone());
|
||||||
doc.send(update).unwrap();
|
doc.send(update);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2220,7 +2196,7 @@ mod document_files {
|
|||||||
_ => unreachable!("got {:?}: should have gotten an error", action),
|
_ => unreachable!("got {:?}: should have gotten an error", action),
|
||||||
}
|
}
|
||||||
let query = Query::new();
|
let query = Query::new();
|
||||||
doc.send(query).unwrap();
|
doc.send(query);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2253,7 +2229,7 @@ mod document_files {
|
|||||||
let mut addition = Addition::new();
|
let mut addition = Addition::new();
|
||||||
addition.add_field(&fname, calc);
|
addition.add_field(&fname, calc);
|
||||||
let start = Utc::now() + duration;
|
let start = Utc::now() + duration;
|
||||||
doc.send(addition).unwrap();
|
doc.send(addition);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let stop = Utc::now() + duration;
|
let stop = Utc::now() + duration;
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
@ -2284,7 +2260,7 @@ mod document_files {
|
|||||||
let mut query = Query::new();
|
let mut query = Query::new();
|
||||||
query.add(&fname, calc);
|
query.add(&fname, calc);
|
||||||
let delete = Delete::new(query.clone());
|
let delete = Delete::new(query.clone());
|
||||||
doc.send(delete).unwrap();
|
doc.send(delete);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2299,7 +2275,7 @@ mod document_files {
|
|||||||
}
|
}
|
||||||
_ => unreachable!("got {:?}: should have gotten reply", action),
|
_ => unreachable!("got {:?}: should have gotten reply", action),
|
||||||
}
|
}
|
||||||
doc.send(query).unwrap();
|
doc.send(query);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2320,7 +2296,7 @@ mod document_files {
|
|||||||
let mut query = Query::new();
|
let mut query = Query::new();
|
||||||
query.add(field_name.clone(), calc);
|
query.add(field_name.clone(), calc);
|
||||||
let delete = Delete::new(query);
|
let delete = Delete::new(query);
|
||||||
doc.send(delete).unwrap();
|
doc.send(delete);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2342,11 +2318,11 @@ mod document_files {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
doc.start(standard_paths());
|
doc.start(standard_paths());
|
||||||
doc.populate([value.into()].to_vec());
|
doc.populate([value.into()].to_vec());
|
||||||
doc.send(Delete::new(Query::new())).unwrap();
|
doc.send(Delete::new(Query::new()));
|
||||||
doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let mut addition = Addition::new();
|
let mut addition = Addition::new();
|
||||||
addition.add_field(&fname, value.clone());
|
addition.add_field(&fname, value.clone());
|
||||||
doc.send(addition).unwrap();
|
doc.send(addition);
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2384,7 +2360,7 @@ mod document_files {
|
|||||||
doc.populate([0.into()].to_vec());
|
doc.populate([0.into()].to_vec());
|
||||||
for i in 0..5 {
|
for i in 0..5 {
|
||||||
let expected: Field = i.try_into().unwrap();
|
let expected: Field = i.try_into().unwrap();
|
||||||
doc.send(Query::new()).unwrap();
|
doc.send(Query::new());
|
||||||
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
let action = result.get_action();
|
let action = result.get_action();
|
||||||
match action {
|
match action {
|
||||||
@ -2445,10 +2421,10 @@ mod document_files {
|
|||||||
Name::english("clock"),
|
Name::english("clock"),
|
||||||
MsgAction::OnUpdate(Records::new(Names::new())),
|
MsgAction::OnUpdate(Records::new(Names::new())),
|
||||||
);
|
);
|
||||||
queue.send(trigger.clone()).unwrap();
|
queue.send(trigger.clone());
|
||||||
sleep(TIMEOUT);
|
sleep(TIMEOUT);
|
||||||
let msg = Message::new(doc_name, Query::new());
|
let msg = Message::new(doc_name, Query::new());
|
||||||
queue.send(msg.clone()).unwrap();
|
queue.send(msg.clone());
|
||||||
let mut result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
let mut result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
while result.get_message_id() != msg.get_message_id() {
|
while result.get_message_id() != msg.get_message_id() {
|
||||||
result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||||
|
|||||||
@ -76,9 +76,9 @@ impl Session {
|
|||||||
Include::Just(Action::Reply),
|
Include::Just(Action::Reply),
|
||||||
);
|
);
|
||||||
let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path));
|
let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path));
|
||||||
queue.send(msg.response(reg_msg)).unwrap();
|
queue.send(msg.response(reg_msg));
|
||||||
rx.recv().unwrap();
|
rx.recv().unwrap();
|
||||||
queue.send(msg).unwrap();
|
queue.send(msg);
|
||||||
rx.recv().unwrap();
|
rx.recv().unwrap();
|
||||||
queue.remove_sender(&sender_id);
|
queue.remove_sender(&sender_id);
|
||||||
}
|
}
|
||||||
@ -131,7 +131,7 @@ mod sessions {
|
|||||||
];
|
];
|
||||||
for path in paths.iter() {
|
for path in paths.iter() {
|
||||||
let reg = Register::new(id.clone(), RegMsg::AddRoute(path.clone()));
|
let reg = Register::new(id.clone(), RegMsg::AddRoute(path.clone()));
|
||||||
queue.send(Message::new(NameType::None, reg)).unwrap();
|
queue.send(Message::new(NameType::None, reg));
|
||||||
rx.recv_timeout(TIMEOUT).unwrap();
|
rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
}
|
}
|
||||||
Self {
|
Self {
|
||||||
@ -153,14 +153,12 @@ mod sessions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn send(&self, msg: Message) {
|
fn send(&self, msg: Message) {
|
||||||
self.queue.send(msg).unwrap();
|
self.queue.send(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_registry_message(&self, msg: RegMsg) {
|
fn send_registry_message(&self, msg: RegMsg) {
|
||||||
let request = Register::new(self.sender_id.clone(), msg);
|
let request = Register::new(self.sender_id.clone(), msg);
|
||||||
self.queue
|
self.queue.send(Message::new(NameType::None, request));
|
||||||
.send(Message::new(NameType::None, request))
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -48,7 +48,7 @@ impl MoreThanText {
|
|||||||
msg: Message,
|
msg: Message,
|
||||||
) -> Uuid {
|
) -> Uuid {
|
||||||
let reply = msg.response(action);
|
let reply = msg.response(action);
|
||||||
self.queue.send(reply).unwrap();
|
self.queue.send(reply);
|
||||||
let result = rx.recv().unwrap();
|
let result = rx.recv().unwrap();
|
||||||
match result.get_action() {
|
match result.get_action() {
|
||||||
MsgAction::Records(data) => {
|
MsgAction::Records(data) => {
|
||||||
@ -95,9 +95,7 @@ impl MoreThanText {
|
|||||||
Include::Just(Action::Records),
|
Include::Just(Action::Records),
|
||||||
);
|
);
|
||||||
let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path));
|
let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path));
|
||||||
self.queue
|
self.queue.send(msg.forward(NameType::None, reg_msg));
|
||||||
.send(msg.forward(NameType::None, reg_msg))
|
|
||||||
.unwrap();
|
|
||||||
rx.recv().unwrap(); // Wait for completion.
|
rx.recv().unwrap(); // Wait for completion.
|
||||||
let output = self.recursive_session_request(rx, action, msg);
|
let output = self.recursive_session_request(rx, action, msg);
|
||||||
self.queue.remove_sender(&sender_id);
|
self.queue.remove_sender(&sender_id);
|
||||||
|
|||||||
@ -446,10 +446,7 @@ impl DocRegistry {
|
|||||||
self.queue.forward(sender_id, msg.clone());
|
self.queue.forward(sender_id, msg.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => self
|
Err(err) => self.queue.send(msg.response(MsgAction::Error(err))),
|
||||||
.queue
|
|
||||||
.send(msg.response(MsgAction::Error(err)))
|
|
||||||
.unwrap(),
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,5 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
message::Message,
|
message::Message,
|
||||||
mtterror::MTTError,
|
|
||||||
name::NameType,
|
name::NameType,
|
||||||
queue::data_director::{DocRegistry, RegMsg, Register},
|
queue::data_director::{DocRegistry, RegMsg, Register},
|
||||||
};
|
};
|
||||||
@ -88,10 +87,9 @@ impl Queue {
|
|||||||
router.forward(id, msg);
|
router.forward(id, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send(&self, msg: Message) -> Result<(), MTTError> {
|
pub fn send(&self, msg: Message) {
|
||||||
let router = self.router.read().unwrap();
|
let router = self.router.read().unwrap();
|
||||||
router.send(msg.clone());
|
router.send(msg.clone());
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -245,6 +243,7 @@ mod queues {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::{
|
use crate::{
|
||||||
message::MsgAction,
|
message::MsgAction,
|
||||||
|
mtterror::MTTError,
|
||||||
name::Name,
|
name::Name,
|
||||||
queue::data_director::{Include, Path},
|
queue::data_director::{Include, Path},
|
||||||
support_tests::TIMEOUT,
|
support_tests::TIMEOUT,
|
||||||
@ -271,9 +270,7 @@ mod queues {
|
|||||||
|
|
||||||
fn send_reg_msg(&self, msg: RegMsg) {
|
fn send_reg_msg(&self, msg: RegMsg) {
|
||||||
let reg_msg = Register::new(self.rx_id.clone(), msg);
|
let reg_msg = Register::new(self.rx_id.clone(), msg);
|
||||||
self.test_mod
|
self.test_mod.send(Message::new(NameType::None, reg_msg));
|
||||||
.send(Message::new(NameType::None, reg_msg))
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recv(&self) -> Result<Message, RecvTimeoutError> {
|
fn recv(&self) -> Result<Message, RecvTimeoutError> {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user