Added trigger messages.
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1s
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1s
This commit is contained in:
parent
4025540b45
commit
18e33f78c2
134
src/message.rs
134
src/message.rs
@ -337,6 +337,18 @@ impl Message {
|
||||
action: action.into(),
|
||||
}
|
||||
}
|
||||
|
||||
fn forward<D, A>(&self, doc_id: D, action: A) -> Self
|
||||
where
|
||||
D: Into<NameType>,
|
||||
A: Into<MsgAction>,
|
||||
{
|
||||
Self {
|
||||
msg_id: self.msg_id.clone(),
|
||||
document_id: doc_id.into(),
|
||||
action: action.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@ -3791,17 +3803,11 @@ impl Query {
|
||||
}
|
||||
}
|
||||
|
||||
fn add<NT>(&mut self, name: NT, operation: Calculation) -> Result<(), MTTError>
|
||||
fn add<NT>(&mut self, name: NT, operation: Calculation)
|
||||
where
|
||||
NT: Into<NameType>,
|
||||
{
|
||||
match operation.operation() {
|
||||
Operand::Equal => {
|
||||
self.data.insert(name.into(), operation);
|
||||
Ok(())
|
||||
}
|
||||
_ => Err(MTTError::QueryCannotChangeData),
|
||||
}
|
||||
self.data.insert(name.into(), operation);
|
||||
}
|
||||
|
||||
fn get<NT>(&self, name: NT) -> Option<Calculation>
|
||||
@ -4606,6 +4612,7 @@ struct DocumentFile {
|
||||
docdef: DocDef,
|
||||
docs: InternalRecords,
|
||||
indexes: Indexes,
|
||||
name_id: Uuid,
|
||||
queue: Queue,
|
||||
routes: HashMap<RouteID, DocFuncType>,
|
||||
rx: Receiver<Message>,
|
||||
@ -4617,11 +4624,13 @@ impl DocumentFile {
|
||||
rx: Receiver<Message>,
|
||||
docdef: DocDef,
|
||||
routes: HashMap<RouteID, DocFuncType>,
|
||||
name_id: Uuid,
|
||||
) -> Self {
|
||||
Self {
|
||||
docdef: docdef.clone(),
|
||||
docs: InternalRecords::new(),
|
||||
indexes: docdef.create_indexes(),
|
||||
name_id: name_id,
|
||||
queue: queue,
|
||||
routes: routes,
|
||||
rx: rx,
|
||||
@ -4673,7 +4682,7 @@ impl DocumentFile {
|
||||
};
|
||||
route_action.insert(route_id.clone(), path_action.doc_function());
|
||||
}
|
||||
let mut doc = DocumentFile::new(queue.clone(), rx, docdef, route_action);
|
||||
let mut doc = DocumentFile::new(queue.clone(), rx, docdef, route_action, name_id.clone());
|
||||
spawn(move || {
|
||||
doc.listen();
|
||||
});
|
||||
@ -4694,6 +4703,7 @@ impl DocumentFile {
|
||||
DocFuncType::Show => self.queue.send(msg.response(Reply::new())).unwrap(),
|
||||
DocFuncType::Update => self.update(&msg),
|
||||
DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action),
|
||||
DocFuncType::Trigger(action) => self.trigger(&msg, action),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@ -4978,12 +4988,18 @@ impl DocumentFile {
|
||||
_ => panic!("should not get here"),
|
||||
};
|
||||
}
|
||||
|
||||
fn trigger(&self, msg: &Message, action: &MsgAction) {
|
||||
self.queue
|
||||
.send(msg.forward(self.name_id.clone(), action.clone()))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod document_files {
|
||||
use super::{support_test::TIMEOUT, *};
|
||||
use std::sync::mpsc::RecvTimeoutError;
|
||||
use std::{sync::mpsc::RecvTimeoutError, thread::sleep};
|
||||
|
||||
fn standard_paths() -> Vec<Path> {
|
||||
[
|
||||
@ -5586,7 +5602,6 @@ mod document_files {
|
||||
for i in 0..count {
|
||||
test_doc.populate([i.into()].to_vec());
|
||||
}
|
||||
let mut query = Query::new();
|
||||
let mut calc = Calculation::new(Operand::Equal);
|
||||
calc.add_value(expected.clone());
|
||||
calc.add_value(CalcValue::Existing(FieldType::Integer));
|
||||
@ -5615,6 +5630,41 @@ mod document_files {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn does_query_work_with_greater_than() {
|
||||
let mut test_doc = TestDocument::new([FieldType::Integer].to_vec());
|
||||
test_doc.start(standard_paths());
|
||||
let queue = test_doc.get_queue();
|
||||
test_doc.populate([1.into()].to_vec());
|
||||
test_doc.populate([2.into()].to_vec());
|
||||
let mut query = Query::new();
|
||||
let mut calc = Calculation::new(Operand::GreaterThan);
|
||||
calc.add_value(CalcValue::Existing(FieldType::Integer));
|
||||
calc.add_value(1);
|
||||
let mut query = Query::new();
|
||||
query.add(Name::english("field0"), calc);
|
||||
queue.send(Message::new(
|
||||
test_doc.get_docdef().get_document_names()[0].clone(),
|
||||
query,
|
||||
));
|
||||
let result = test_doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||
let action = result.get_action();
|
||||
match action {
|
||||
MsgAction::Records(data) => {
|
||||
assert_eq!(
|
||||
data.len(),
|
||||
1,
|
||||
"should return one entry containing 2 got:\n{:?}",
|
||||
action
|
||||
);
|
||||
for doc in data.iter() {
|
||||
assert_eq!(doc.get(&Name::english("field0")).unwrap(), 2.into());
|
||||
}
|
||||
}
|
||||
_ => unreachable!("got {:?}: should have been a reply", action),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gets_all_documents_in_query() {
|
||||
let mut test_doc = TestDocument::new([FieldType::Integer].to_vec());
|
||||
@ -6530,6 +6580,68 @@ mod document_files {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_an_action_trigger_an_action() {
|
||||
let mut doc = TestDocument::new([FieldType::Integer].to_vec());
|
||||
let queue = doc.get_queue();
|
||||
let doc_name = doc.get_docdef().get_document_names()[0].clone();
|
||||
Clock::start(queue.clone());
|
||||
let mut calc = Calculation::new(Operand::GreaterThan);
|
||||
calc.add_value(CalcValue::Existing(FieldType::Integer));
|
||||
calc.add_value(1);
|
||||
let mut query = Query::new();
|
||||
query.add(Name::english("field0"), calc);
|
||||
let delete = Delete::new(query.clone());
|
||||
let path = Path::new(
|
||||
Include::All,
|
||||
Include::Some(Name::english("clock").into()),
|
||||
Include::Some(Action::OnUpdate),
|
||||
);
|
||||
let function = DocFuncType::Trigger(delete.into());
|
||||
doc.get_docdef_mut().add_route(path, function);
|
||||
let mut paths = standard_paths();
|
||||
paths.push(Path::new(
|
||||
Include::All,
|
||||
Include::Some(doc_name.clone().into()),
|
||||
Include::Some(Action::Delete),
|
||||
));
|
||||
doc.start(paths);
|
||||
let queue = doc.get_queue();
|
||||
for item in 1..3 {
|
||||
doc.populate([item.into()].to_vec());
|
||||
}
|
||||
let trigger = Message::new(
|
||||
Name::english("clock"),
|
||||
MsgAction::OnUpdate(Records::new(Names::new())),
|
||||
);
|
||||
queue.send(trigger.clone()).unwrap();
|
||||
let trigger_reply = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||
assert_eq!(
|
||||
trigger_reply.get_message_id(),
|
||||
trigger.get_message_id(),
|
||||
"msg {:?} had incorrect message id",
|
||||
trigger_reply
|
||||
);
|
||||
sleep(TIMEOUT);
|
||||
let msg = Message::new(doc_name, Query::new());
|
||||
queue.send(msg.clone()).unwrap();
|
||||
let mut result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||
while result.get_message_id() != msg.get_message_id() {
|
||||
result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
|
||||
}
|
||||
let action = result.get_action();
|
||||
let expected: Field = 1.into();
|
||||
match action {
|
||||
MsgAction::Records(data) => {
|
||||
assert_eq!(data.len(), 1, "should have been one record:\n{:?}", data);
|
||||
for rec in data.iter() {
|
||||
assert_eq!(rec.get(Name::english("field0")).unwrap(), expected);
|
||||
}
|
||||
}
|
||||
_ => unreachable!("should return records"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user