diff --git a/src/documents.rs b/src/documents.rs new file mode 100644 index 0000000..159730d --- /dev/null +++ b/src/documents.rs @@ -0,0 +1 @@ +pub mod clock; diff --git a/src/documents/clock.rs b/src/documents/clock.rs new file mode 100644 index 0000000..1fcf8ac --- /dev/null +++ b/src/documents/clock.rs @@ -0,0 +1,89 @@ +use crate::{ + data_director::{RegMsg, Register}, + message::{Message, MsgAction, Records}, + name::{Name, NameType, Names}, + router::Queue, +}; +use std::{ + sync::mpsc::channel, + thread::{sleep, spawn}, + time::Duration, +}; + +pub struct Clock { + queue: Queue, +} + +impl Clock { + fn new(queue: Queue) -> Self { + Self { queue: queue } + } + + pub fn start(mut queue: Queue) { + let clock = Clock::new(queue.clone()); + let (tx, rx) = channel(); + let id = queue.add_sender(tx); + let reg_msg = Register::new(id, RegMsg::AddDocName([Name::english("clock")].to_vec())); + let msg = Message::new(NameType::None, reg_msg.clone()); + queue.send(msg).unwrap(); + rx.recv().unwrap(); + spawn(move || { + clock.listen(); + }); + } + + fn listen(&self) { + loop { + self.queue + .send(Message::new( + Name::english("clock"), + MsgAction::OnUpdate(Records::new(Names::new())), + )) + .unwrap(); + sleep(Duration::from_secs(1)); + } + } +} + +#[cfg(test)] +mod clocks { + use super::*; + use crate::data_director::{Include, Path}; + use chrono::{TimeDelta, Utc}; + + static TIMEOUT: Duration = Duration::from_millis(1500); + + #[test] + fn does_clock_send_reply_every_second() { + let mut queue = Queue::new(); + let (tx, rx) = channel(); + let id = queue.add_sender(tx); + let request = Register::new( + id.clone(), + RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)), + ); + queue.send(Message::new(NameType::None, request)).unwrap(); + rx.recv_timeout(TIMEOUT).unwrap(); + let mut holder: Vec = Vec::new(); + let start = Utc::now(); + Clock::start(queue.clone()); + while holder.len() < 2 { + holder.push(rx.recv_timeout(TIMEOUT).unwrap()); + } + let end = Utc::now(); + assert!((end - start) > TimeDelta::seconds(1)); + assert!((end - start) < TimeDelta::seconds(2)); + let reg_request = Register::new(id, RegMsg::GetNameID(Name::english("clock"))); + queue + .send(Message::new(NameType::None, reg_request)) + .unwrap(); + rx.recv_timeout(TIMEOUT).unwrap(); + for msg in holder.iter() { + let action = msg.get_action(); + match action { + MsgAction::OnUpdate(result) => assert_eq!(result.len(), 0), + _ => unreachable!("got {:?}, should have been empty record", action), + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 7a90edf..aa587cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ mod data_director; +mod documents; mod field; mod message; mod mtterror; @@ -6,10 +7,9 @@ mod name; mod router; use data_director::{Include, Path, RegMsg, Register}; +use documents::clock::Clock; use field::{Field, FieldType}; -use message::{ - Action, Addition, CalcValue, Calculation, Clock, CreateDoc, Message, Operand, Session, -}; +use message::{Action, Addition, CalcValue, Calculation, CreateDoc, Message, Operand, Session}; pub use message::{MsgAction, Query}; use name::{Name, NameType}; use router::Queue; diff --git a/src/message.rs b/src/message.rs index 4e73053..e8c026a 100644 --- a/src/message.rs +++ b/src/message.rs @@ -9,7 +9,7 @@ use chrono::prelude::*; use std::{ collections::{HashMap, HashSet}, sync::mpsc::{channel, Receiver}, - thread::{sleep, spawn}, + thread::spawn, time::Duration, }; use uuid::Uuid; @@ -4547,7 +4547,7 @@ mod document_files { let mut doc = TestDocument::new([FieldType::Integer].to_vec()); let queue = doc.get_queue(); let doc_name = doc.get_docdef().get_document_names()[0].clone(); - Clock::start(queue.clone()); + crate::documents::clock::Clock::start(queue.clone()); let mut calc = Calculation::new(Operand::GreaterThan); calc.add_value(CalcValue::Existing(FieldType::Integer)) .unwrap(); @@ -4707,83 +4707,6 @@ mod createdocs { } } -pub struct Clock { - queue: Queue, -} - -impl Clock { - fn new(queue: Queue) -> Self { - Self { queue: queue } - } - - pub fn start(mut queue: Queue) { - let clock = Clock::new(queue.clone()); - let (tx, rx) = channel(); - let id = queue.add_sender(tx); - let reg_msg = Register::new(id, RegMsg::AddDocName([Name::english("clock")].to_vec())); - let msg = Message::new(NameType::None, reg_msg.clone()); - queue.send(msg).unwrap(); - rx.recv().unwrap(); - spawn(move || { - clock.listen(); - }); - } - - fn listen(&self) { - loop { - self.queue - .send(Message::new( - Name::english("clock"), - MsgAction::OnUpdate(Records::new(Names::new())), - )) - .unwrap(); - sleep(Duration::from_secs(1)); - } - } -} - -#[cfg(test)] -mod clocks { - use super::*; - use chrono::TimeDelta; - - static TIMEOUT: Duration = Duration::from_millis(1500); - - #[test] - fn does_clock_send_reply_every_second() { - let mut queue = Queue::new(); - let (tx, rx) = channel(); - let id = queue.add_sender(tx); - let request = Register::new( - id.clone(), - RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)), - ); - queue.send(Message::new(NameType::None, request)).unwrap(); - rx.recv_timeout(TIMEOUT).unwrap(); - let mut holder: Vec = Vec::new(); - let start = Utc::now(); - Clock::start(queue.clone()); - while holder.len() < 2 { - holder.push(rx.recv_timeout(TIMEOUT).unwrap()); - } - let end = Utc::now(); - assert!((end - start) > TimeDelta::seconds(1)); - assert!((end - start) < TimeDelta::seconds(2)); - let reg_request = Register::new(id, RegMsg::GetNameID(Name::english("clock"))); - queue - .send(Message::new(NameType::None, reg_request)) - .unwrap(); - rx.recv_timeout(TIMEOUT).unwrap(); - for msg in holder.iter() { - let action = msg.get_action(); - match action { - MsgAction::OnUpdate(result) => assert_eq!(result.len(), 0), - _ => unreachable!("got {:?}, should have been empty record", action), - } - } - } -} - #[allow(dead_code)] #[derive(Clone, Debug)] pub struct MsgEntry { @@ -5115,7 +5038,7 @@ mod sessions { let mut queue = Queue::new(); let id = queue.add_sender(tx); CreateDoc::start(queue.clone()); - Clock::start(queue.clone()); + crate::documents::clock::Clock::start(queue.clone()); Self { queue: queue, rx: rx,