Added message logging.
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 3m3s
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 3m3s
This commit is contained in:
parent
247712f10f
commit
1428b8984b
435
src/message.rs
435
src/message.rs
@ -48,6 +48,8 @@ enum Action {
|
|||||||
Create,
|
Create,
|
||||||
Delete,
|
Delete,
|
||||||
Error,
|
Error,
|
||||||
|
GetLog,
|
||||||
|
Log,
|
||||||
OnAddition,
|
OnAddition,
|
||||||
OnDelete,
|
OnDelete,
|
||||||
OnQuery,
|
OnQuery,
|
||||||
@ -67,6 +69,8 @@ impl From<MsgAction> for Action {
|
|||||||
MsgAction::Create(_) => Action::Create,
|
MsgAction::Create(_) => Action::Create,
|
||||||
MsgAction::Delete(_) => Action::Delete,
|
MsgAction::Delete(_) => Action::Delete,
|
||||||
MsgAction::Error(_) => Action::Error,
|
MsgAction::Error(_) => Action::Error,
|
||||||
|
MsgAction::GetLog(_) => Action::GetLog,
|
||||||
|
MsgAction::Log(_) => Action::Log,
|
||||||
MsgAction::OnAddition(_) => Action::OnAddition,
|
MsgAction::OnAddition(_) => Action::OnAddition,
|
||||||
MsgAction::OnDelete(_) => Action::OnDelete,
|
MsgAction::OnDelete(_) => Action::OnDelete,
|
||||||
MsgAction::OnQuery(_) => Action::OnQuery,
|
MsgAction::OnQuery(_) => Action::OnQuery,
|
||||||
@ -135,6 +139,8 @@ enum MsgAction {
|
|||||||
// Alter
|
// Alter
|
||||||
// Remove
|
// Remove
|
||||||
Error(MTTError),
|
Error(MTTError),
|
||||||
|
GetLog(Uuid),
|
||||||
|
Log(Vec<MsgEntry>),
|
||||||
OnAddition(Records),
|
OnAddition(Records),
|
||||||
OnDelete(Records),
|
OnDelete(Records),
|
||||||
OnQuery(Records),
|
OnQuery(Records),
|
||||||
@ -202,6 +208,18 @@ impl From<Update> for MsgAction {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<Uuid> for MsgAction {
|
||||||
|
fn from(value: Uuid) -> Self {
|
||||||
|
MsgAction::GetLog(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&Uuid> for MsgAction {
|
||||||
|
fn from(value: &Uuid) -> Self {
|
||||||
|
Self::from(value.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod msgactions {
|
mod msgactions {
|
||||||
use super::*;
|
use super::*;
|
||||||
@ -2690,13 +2708,14 @@ impl IndexType {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
#[derive(Clone, Debug)]
|
||||||
enum DocFuncType {
|
enum DocFuncType {
|
||||||
Action,
|
|
||||||
Add,
|
Add,
|
||||||
Delete,
|
Delete,
|
||||||
|
ExistingQuery(MsgAction),
|
||||||
Query,
|
Query,
|
||||||
Show,
|
Show,
|
||||||
|
Trigger(MsgAction),
|
||||||
Update,
|
Update,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2888,6 +2907,10 @@ impl DocDef {
|
|||||||
fn iter_routes(&self) -> impl Iterator<Item = &PathAction> {
|
fn iter_routes(&self) -> impl Iterator<Item = &PathAction> {
|
||||||
self.routes.iter()
|
self.routes.iter()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn add_route(&mut self, path: Path, action: DocFuncType) {
|
||||||
|
self.routes.push(PathAction::new(path, action));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -2997,7 +3020,7 @@ mod docdefs {
|
|||||||
docdef.routes
|
docdef.routes
|
||||||
);
|
);
|
||||||
let mut actions: HashSet<Action> = HashSet::new();
|
let mut actions: HashSet<Action> = HashSet::new();
|
||||||
let mut doc_funcs: HashSet<DocFuncType> = HashSet::new();
|
let mut doc_funcs: HashSet<String> = HashSet::new();
|
||||||
for path_action in docdef.iter_routes() {
|
for path_action in docdef.iter_routes() {
|
||||||
let path = path_action.path();
|
let path = path_action.path();
|
||||||
match &path.msg_id {
|
match &path.msg_id {
|
||||||
@ -3024,11 +3047,11 @@ mod docdefs {
|
|||||||
};
|
};
|
||||||
let file_func = path_action.doc_function();
|
let file_func = path_action.doc_function();
|
||||||
match file_func {
|
match file_func {
|
||||||
DocFuncType::Add => doc_funcs.insert(file_func),
|
DocFuncType::Add => doc_funcs.insert("Add".to_string()),
|
||||||
DocFuncType::Delete => doc_funcs.insert(file_func),
|
DocFuncType::Delete => doc_funcs.insert("Delete".to_string()),
|
||||||
DocFuncType::Query => doc_funcs.insert(file_func),
|
DocFuncType::Query => doc_funcs.insert("Query".to_string()),
|
||||||
DocFuncType::Show => doc_funcs.insert(file_func),
|
DocFuncType::Show => doc_funcs.insert("Show".to_string()),
|
||||||
DocFuncType::Update => doc_funcs.insert(file_func),
|
DocFuncType::Update => doc_funcs.insert("Update".to_string()),
|
||||||
_ => unreachable!("got {:?}, which is not a default function", file_func),
|
_ => unreachable!("got {:?}, which is not a default function", file_func),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -3045,6 +3068,45 @@ mod docdefs {
|
|||||||
doc_funcs
|
doc_funcs
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn add_route_function() {
|
||||||
|
let docname = Name::english(Uuid::new_v4().to_string().as_str());
|
||||||
|
let mut docdef = DocDef::new(docname.clone());
|
||||||
|
docdef.add_route(
|
||||||
|
Path::new(
|
||||||
|
Include::All,
|
||||||
|
Include::Some(docname.clone().into()),
|
||||||
|
Include::Some(Action::OnQuery),
|
||||||
|
),
|
||||||
|
DocFuncType::Trigger(Update::new(Query::new()).into()),
|
||||||
|
);
|
||||||
|
let path_action = docdef.iter_routes().last().unwrap();
|
||||||
|
let path = path_action.path();
|
||||||
|
match &path.msg_id {
|
||||||
|
Include::All => {}
|
||||||
|
_ => unreachable!("got {:?}, message id should include all", path.msg_id),
|
||||||
|
};
|
||||||
|
match &path.doc {
|
||||||
|
Include::Some(output) => match output {
|
||||||
|
NameType::Name(data) => assert_eq!(data, &docname),
|
||||||
|
_ => unreachable!("got {:?}, name type should be {:?}", path.doc, docname),
|
||||||
|
},
|
||||||
|
_ => unreachable!("got {:?}, name type should be {:?}", path.doc, docname),
|
||||||
|
};
|
||||||
|
match &path.action {
|
||||||
|
Include::Some(output) => match output {
|
||||||
|
Action::OnQuery => {}
|
||||||
|
_ => unreachable!("got {:?} which is not a additional action", output),
|
||||||
|
},
|
||||||
|
_ => unreachable!("got {:?}, which is not on query action", path.action),
|
||||||
|
}
|
||||||
|
let file_func = path_action.doc_function();
|
||||||
|
match file_func {
|
||||||
|
DocFuncType::Trigger(_) => {}
|
||||||
|
_ => unreachable!("got {:?}, which is not a default function", file_func),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@ -3888,6 +3950,10 @@ impl Records {
|
|||||||
fn iter(&self) -> impl Iterator<Item = Record> {
|
fn iter(&self) -> impl Iterator<Item = Record> {
|
||||||
RecordIter::new(self)
|
RecordIter::new(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_internal_records(&self) -> &InternalRecords {
|
||||||
|
&self.data
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct RecordIter {
|
struct RecordIter {
|
||||||
@ -4495,6 +4561,7 @@ impl DocumentFile {
|
|||||||
DocFuncType::Query => self.query(&msg),
|
DocFuncType::Query => self.query(&msg),
|
||||||
DocFuncType::Show => self.queue.send(msg.response(Reply::new())).unwrap(),
|
DocFuncType::Show => self.queue.send(msg.response(Reply::new())).unwrap(),
|
||||||
DocFuncType::Update => self.update(&msg),
|
DocFuncType::Update => self.update(&msg),
|
||||||
|
DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action),
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4696,6 +4763,48 @@ impl DocumentFile {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn run_update(
|
||||||
|
&mut self,
|
||||||
|
original: &InternalRecords,
|
||||||
|
update: &Update,
|
||||||
|
) -> Result<InternalRecords, MTTError> {
|
||||||
|
let mut changes: HashMap<Uuid, &CalcValue> = HashMap::new();
|
||||||
|
for (key, value) in update.get_values().iter() {
|
||||||
|
let field_id = match self.docdef.get_field_id(key) {
|
||||||
|
Ok(data) => data,
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
};
|
||||||
|
changes.insert(field_id, value);
|
||||||
|
}
|
||||||
|
let mut indexes = self.docdef.create_indexes();
|
||||||
|
let mut updates = InternalRecords::new();
|
||||||
|
for (oid, record) in original.iter() {
|
||||||
|
let mut holder = record.clone();
|
||||||
|
for (field_id, value) in changes.iter() {
|
||||||
|
let field = value.get(holder.get(field_id).unwrap());
|
||||||
|
let correction = match self.validate(field_id, &field) {
|
||||||
|
Ok(data) => data,
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
};
|
||||||
|
holder.insert(field_id.clone(), correction.clone());
|
||||||
|
match indexes.add_to_index(&field_id, correction, oid.clone()) {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
updates.insert(oid.clone(), holder);
|
||||||
|
}
|
||||||
|
for (oid, new_rec) in updates.iter() {
|
||||||
|
let old_rec = original.get(oid).unwrap();
|
||||||
|
for (field_id, index) in self.indexes.iter_mut() {
|
||||||
|
index.remove(old_rec.get(field_id).unwrap(), oid);
|
||||||
|
index.add(new_rec.get(field_id).unwrap().clone(), oid.clone());
|
||||||
|
}
|
||||||
|
self.docs.insert(oid.clone(), new_rec.clone());
|
||||||
|
}
|
||||||
|
Ok(updates.clone())
|
||||||
|
}
|
||||||
|
|
||||||
fn update(&mut self, msg: &Message) {
|
fn update(&mut self, msg: &Message) {
|
||||||
let update = match msg.get_action() {
|
let update = match msg.get_action() {
|
||||||
MsgAction::Update(data) => data,
|
MsgAction::Update(data) => data,
|
||||||
@ -4709,58 +4818,33 @@ impl DocumentFile {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let mut changes: HashMap<Uuid, &CalcValue> = HashMap::new();
|
let data = match self.run_update(&original, update) {
|
||||||
for (key, value) in update.get_values().iter() {
|
Ok(output) => output,
|
||||||
let field_id = match self.docdef.get_field_id(key) {
|
Err(err) => {
|
||||||
Ok(data) => data,
|
let reply = msg.response(err);
|
||||||
Err(err) => {
|
self.queue.send(reply).unwrap();
|
||||||
let reply = msg.response(err);
|
return;
|
||||||
self.queue.send(reply).unwrap();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
changes.insert(field_id, value);
|
|
||||||
}
|
|
||||||
let mut indexes = self.docdef.create_indexes();
|
|
||||||
let mut updates = InternalRecords::new();
|
|
||||||
for (oid, record) in original.iter() {
|
|
||||||
let mut holder = record.clone();
|
|
||||||
for (field_id, value) in changes.iter() {
|
|
||||||
let field = value.get(holder.get(field_id).unwrap());
|
|
||||||
let correction = match self.validate(field_id, &field) {
|
|
||||||
Ok(data) => data,
|
|
||||||
Err(err) => {
|
|
||||||
let reply = msg.response(err);
|
|
||||||
self.queue.send(reply).unwrap();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
holder.insert(field_id.clone(), correction.clone());
|
|
||||||
match indexes.add_to_index(&field_id, correction, oid.clone()) {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(err) => {
|
|
||||||
let reply = msg.response(err);
|
|
||||||
self.queue.send(reply).unwrap();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
updates.insert(oid.clone(), holder);
|
};
|
||||||
}
|
let recs = Records::with_data(self.docdef.get_field_names().clone(), data);
|
||||||
for (oid, new_rec) in updates.iter() {
|
|
||||||
let old_rec = original.get(oid).unwrap();
|
|
||||||
for (field_id, index) in self.indexes.iter_mut() {
|
|
||||||
index.remove(old_rec.get(field_id).unwrap(), oid);
|
|
||||||
index.add(new_rec.get(field_id).unwrap().clone(), oid.clone());
|
|
||||||
}
|
|
||||||
self.docs.insert(oid.clone(), new_rec.clone());
|
|
||||||
}
|
|
||||||
let recs = Records::with_data(self.docdef.get_field_names().clone(), updates);
|
|
||||||
self.queue.send(msg.response(recs.clone())).unwrap();
|
self.queue.send(msg.response(recs.clone())).unwrap();
|
||||||
self.queue
|
self.queue
|
||||||
.send(msg.response(MsgAction::OnUpdate(recs)))
|
.send(msg.response(MsgAction::OnUpdate(recs)))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn existing_query(&mut self, msg: &Message, action: &MsgAction) {
|
||||||
|
let recs = match msg.get_action() {
|
||||||
|
MsgAction::OnQuery(data) => data,
|
||||||
|
_ => unreachable!("should only receive on messages"),
|
||||||
|
};
|
||||||
|
match action {
|
||||||
|
MsgAction::Update(change) => self
|
||||||
|
.run_update(recs.get_internal_records(), change)
|
||||||
|
.unwrap(),
|
||||||
|
_ => panic!("should not get here"),
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -6264,6 +6348,20 @@ mod document_files {
|
|||||||
#[ignore]
|
#[ignore]
|
||||||
fn can_query_trigger_reaction() {
|
fn can_query_trigger_reaction() {
|
||||||
let mut doc = TestDocument::new([FieldType::Integer].to_vec());
|
let mut doc = TestDocument::new([FieldType::Integer].to_vec());
|
||||||
|
let doc_name = doc.get_docdef().get_document_names()[0].clone();
|
||||||
|
let path = Path::new(
|
||||||
|
Include::All,
|
||||||
|
Include::Some(doc_name.into()),
|
||||||
|
Include::Some(Action::OnQuery),
|
||||||
|
);
|
||||||
|
let mut update = Update::new(Query::new());
|
||||||
|
let mut calc = Calculation::new(Operand::Add);
|
||||||
|
calc.add_value(CalcValue::Existing(FieldType::Integer));
|
||||||
|
calc.add_value(1);
|
||||||
|
update
|
||||||
|
.get_values_mut()
|
||||||
|
.add_field(Name::english("field0"), calc);
|
||||||
|
let function = DocFuncType::ExistingQuery(update.into());
|
||||||
doc.start(standard_paths());
|
doc.start(standard_paths());
|
||||||
doc.populate([0.into()].to_vec());
|
doc.populate([0.into()].to_vec());
|
||||||
for i in 0..5 {
|
for i in 0..5 {
|
||||||
@ -6499,3 +6597,232 @@ mod clocks {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct MsgEntry {
|
||||||
|
timestamp: DateTime<Utc>,
|
||||||
|
message: Message,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MsgEntry {
|
||||||
|
fn new(msg: Message) -> Self {
|
||||||
|
Self {
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
message: msg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_timestamp(&self) -> &DateTime<Utc> {
|
||||||
|
&self.timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_message(&self) -> &Message {
|
||||||
|
&self.message
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_message_id(&self) -> &Uuid {
|
||||||
|
self.message.get_message_id()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod msg_entries {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn creates_message_entry() {
|
||||||
|
let msg = Message::new(Name::english("holder"), Query::new());
|
||||||
|
let start = Utc::now();
|
||||||
|
let entry = MsgEntry::new(msg.clone());
|
||||||
|
let end = Utc::now();
|
||||||
|
assert!(
|
||||||
|
entry.get_timestamp() > &start,
|
||||||
|
"timestamp should be between start and end times"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
entry.get_timestamp() < &end,
|
||||||
|
"timestamp should be between start and end times"
|
||||||
|
);
|
||||||
|
assert_eq!(entry.get_message_id(), msg.get_message_id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct MsgLogs {
|
||||||
|
data: HashMap<Uuid, Vec<MsgEntry>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MsgLogs {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
data: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add(&mut self, msg: Message) {
|
||||||
|
let entry = MsgEntry::new(msg);
|
||||||
|
let id = entry.get_message_id();
|
||||||
|
let entries = match self.data.get_mut(id) {
|
||||||
|
Some(data) => data,
|
||||||
|
None => {
|
||||||
|
self.data.insert(id.clone(), Vec::new());
|
||||||
|
self.data.get_mut(id).unwrap()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
entries.push(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self, msg_id: &Uuid) -> Option<&Vec<MsgEntry>> {
|
||||||
|
self.data.get(msg_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod msg_logs {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_add_message_to_log() {
|
||||||
|
let mut logs = MsgLogs::new();
|
||||||
|
let msg = Message::new(Name::english("something"), Query::new());
|
||||||
|
logs.add(msg.clone());
|
||||||
|
let result = logs.get(msg.get_message_id()).unwrap();
|
||||||
|
assert_eq!(result.len(), 1, "should be one entry");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn returns_none_when_no_logs_found() {
|
||||||
|
let logs = MsgLogs::new();
|
||||||
|
match logs.get(&Uuid::nil()) {
|
||||||
|
Some(data) => unreachable!("got {:?}, should return none", data),
|
||||||
|
None => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stores_messages_with_responses() {
|
||||||
|
let mut logs = MsgLogs::new();
|
||||||
|
let msg1 = Message::new(Name::english("something"), Query::new());
|
||||||
|
let msg2 = msg1.response(Records::new(Names::new()));
|
||||||
|
logs.add(msg1.clone());
|
||||||
|
logs.add(msg2.clone());
|
||||||
|
let result = logs.get(msg1.get_message_id()).unwrap();
|
||||||
|
assert_eq!(result.len(), 2, "should be two entry");
|
||||||
|
let action1: Action = result[0].get_message().get_action().clone().into();
|
||||||
|
let action2: Action = result[1].get_message().get_action().clone().into();
|
||||||
|
assert_eq!(action1, Action::Query);
|
||||||
|
assert_eq!(action2, Action::Records);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn messages_are_stored_by_ids() {
|
||||||
|
let mut logs = MsgLogs::new();
|
||||||
|
let msg1 = Message::new(Name::english("something"), Query::new());
|
||||||
|
let msg2 = Message::new(Name::english("something"), Query::new());
|
||||||
|
logs.add(msg1.clone());
|
||||||
|
logs.add(msg2.clone());
|
||||||
|
let result1 = logs.get(msg1.get_message_id()).unwrap();
|
||||||
|
let result2 = logs.get(msg2.get_message_id()).unwrap();
|
||||||
|
assert_eq!(result1.len(), 1, "should be one entry");
|
||||||
|
assert_eq!(result2.len(), 1, "should be one entry");
|
||||||
|
assert_eq!(result1[0].get_message_id(), msg1.get_message_id());
|
||||||
|
assert_eq!(result2[0].get_message_id(), msg2.get_message_id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MessageLog {
|
||||||
|
data: MsgLogs,
|
||||||
|
queue: Queue,
|
||||||
|
rx: Receiver<Message>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageLog {
|
||||||
|
fn new(queue: Queue, rx: Receiver<Message>) -> Self {
|
||||||
|
Self {
|
||||||
|
data: MsgLogs::new(),
|
||||||
|
queue: queue,
|
||||||
|
rx: rx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start(mut queue: Queue) {
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let mut logs = MessageLog::new(queue.clone(), rx);
|
||||||
|
let id = queue.add_sender(tx);
|
||||||
|
let reg_msg = Register::new(
|
||||||
|
id,
|
||||||
|
RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)),
|
||||||
|
);
|
||||||
|
let rmsg = Message::new(NameType::None, reg_msg);
|
||||||
|
queue.send(rmsg.clone()).unwrap();
|
||||||
|
spawn(move || {
|
||||||
|
logs.listen();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn listen(&mut self) {
|
||||||
|
loop {
|
||||||
|
let msg = self.rx.recv().unwrap();
|
||||||
|
match msg.get_action() {
|
||||||
|
MsgAction::GetLog(id) => match self.data.get(id) {
|
||||||
|
Some(data) => self
|
||||||
|
.queue
|
||||||
|
.send(msg.response(MsgAction::Log(data.clone())))
|
||||||
|
.unwrap(),
|
||||||
|
None => self
|
||||||
|
.queue
|
||||||
|
.send(msg.response(MsgAction::Log(Vec::new())))
|
||||||
|
.unwrap(),
|
||||||
|
},
|
||||||
|
_ => self.data.add(msg),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod message_logs {
|
||||||
|
use super::{support_test::TIMEOUT, *};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn does_log_store_messages() {
|
||||||
|
let doc_name = Name::english("unimportant");
|
||||||
|
let mut queue = Queue::new();
|
||||||
|
MessageLog::start(queue.clone());
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let id = queue.add_sender(tx);
|
||||||
|
let reg_msg = Register::new(id, RegMsg::AddDocName(vec![doc_name.clone()]));
|
||||||
|
let rmsg = Message::new(NameType::None, reg_msg);
|
||||||
|
queue.send(rmsg.clone()).unwrap();
|
||||||
|
let name_result = rx.recv().unwrap();
|
||||||
|
let name_id = match name_result.get_action() {
|
||||||
|
MsgAction::Register(data) => match data.get_msg() {
|
||||||
|
RegMsg::DocumentNameID(data) => data,
|
||||||
|
RegMsg::Error(err) => unreachable!("got {:?}, should have gotten data", err),
|
||||||
|
_ => unreachable!("should only return a name id or an error"),
|
||||||
|
},
|
||||||
|
_ => unreachable!("should only return a name id or an error"),
|
||||||
|
};
|
||||||
|
let request = Register::new(
|
||||||
|
id.clone(),
|
||||||
|
RegMsg::AddRoute(Path::new(
|
||||||
|
Include::All,
|
||||||
|
Include::All,
|
||||||
|
Include::Some(Action::Log),
|
||||||
|
)),
|
||||||
|
);
|
||||||
|
queue.send(Message::new(NameType::None, request)).unwrap();
|
||||||
|
rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
|
let msg = Message::new(doc_name.clone(), Query::new());
|
||||||
|
let start = Utc::now();
|
||||||
|
queue.send(msg.clone()).unwrap();
|
||||||
|
let log_msg = Message::new(NameType::None, msg.get_message_id());
|
||||||
|
queue.send(log_msg.clone()).unwrap();
|
||||||
|
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
|
assert_eq!(result.get_message_id(), log_msg.get_message_id());
|
||||||
|
match result.get_action() {
|
||||||
|
MsgAction::Log(output) => assert_eq!(output.len(), 1),
|
||||||
|
_ => unreachable!("got {:?}, should have been log", result.get_action()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user