use super::InternalRecord; use crate::{ action::{Action, MsgAction, Records}, document::record::Oid, message::{Message, MessageAction}, name::{Name, Names}, queue::{ self, data_director::{Include, Path, RegMsg, Register}, router::Queue, }, Query, }; use chrono::{DateTime, Utc}; use std::{ sync::mpsc::{channel, Receiver}, thread::{sleep, spawn}, time::Duration, }; struct Pulser { queue: Queue, } impl Pulser { fn new(queue: Queue) -> Self { Self { queue: queue } } fn pulse(&self) { loop { let msg = Message::new(MsgAction::OnUpdate(Clock::respnse(self.queue.now()))); self.queue.send(msg); sleep(Duration::from_secs(1)); } } } pub struct Clock { queue: Queue, rx: Receiver, } impl Clock { fn new(queue: Queue, rx: Receiver) -> Self { Self { queue: queue, rx: rx, } } pub fn doc_names() -> Vec { vec![Name::english("system time")] } fn respnse(timestamp: DateTime) -> Records { let mut names = Names::new(); let field_name = Name::english("time"); names.add_names(vec![field_name.clone()]); let field_id = names.get_id(field_name).unwrap(); let mut recs = Records::new(Self::doc_names(), names); let mut rec = InternalRecord::new(); rec.insert(field_id, timestamp); recs.insert(Oid::new(), rec); recs } pub fn gen_message() -> Message { Message::new(MsgAction::OnUpdate(Records::new( Clock::doc_names(), Names::new(), ))) } pub fn start(mut queue: Queue) { let pulser = Pulser::new(queue.clone()); spawn(move || { pulser.pulse(); }); let (tx, rx) = channel(); let id = queue.add_sender(tx); let path = Path::new( Include::All, Include::Just(Clock::doc_names()[0].clone().into()), Include::Just(Action::Query), ); let reg_msg = Register::new(id.clone(), RegMsg::AddRoute(path.clone())); queue.send(Message::new(reg_msg)); rx.recv().unwrap(); // Wait for completion. let clock = Clock::new(queue.clone(), rx); spawn(move || { clock.listen(); }); } fn listen(&self) { loop { let msg = self.rx.recv().unwrap(); let reply = msg.set_action(Clock::respnse(self.queue.now())); self.queue.send(reply); } } } #[cfg(test)] mod clocks { use super::*; use crate::{ queue::{ self, data_director::{Include, Path}, TestClock, }, Field, NameType, }; use chrono::{TimeDelta, Utc}; static TIMEOUT: Duration = Duration::from_millis(1500); #[test] fn can_document_names_be_returned() { let doc_names = vec![Name::english("system time")]; assert_eq!(Clock::doc_names(), doc_names); } #[test] fn can_return_time_record() { let now = Utc::now(); let recs = Clock::respnse(now.clone()); assert_eq!(recs.doc_name(), &Clock::doc_names()[0].clone().into()); assert_eq!(recs.len(), 1, "received: {:?}", recs); let rec = recs.iter().last().unwrap(); assert_eq!(rec.get(Name::english("time")).unwrap(), now.into()); } #[test] fn does_clock_update_return_correct_time() { let clock = TestClock::new(); let mut queue = Queue::with_clock(clock.clone()); let expected: Field = queue.now().into(); let (tx, rx) = channel(); let id = queue.add_sender(tx); let request = Register::new( id.clone(), RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)), ); queue.send(Message::new(request)); rx.recv_timeout(TIMEOUT).unwrap(); Clock::start(queue.clone()); let result = rx.recv_timeout(TIMEOUT).unwrap(); match result.get_action() { MsgAction::OnUpdate(data) => { let rec = data.iter().last().unwrap(); assert_eq!(rec.get(Name::english("time")).unwrap(), expected); } _ => unreachable!("should return on_update: {:?}", result.get_action()), } } #[test] fn does_clock_send_reply_every_second() { let mut queue = Queue::new(); let (tx, rx) = channel(); let id = queue.add_sender(tx); let request = Register::new( id.clone(), RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)), ); queue.send(Message::new(request)); rx.recv_timeout(TIMEOUT).unwrap(); let mut holder: Vec = Vec::new(); let start = Utc::now(); Clock::start(queue.clone()); while holder.len() < 2 { holder.push(rx.recv_timeout(TIMEOUT).unwrap()); } let end = Utc::now(); assert!((end - start) > TimeDelta::seconds(1)); assert!((end - start) < TimeDelta::seconds(2)); } #[test] fn can_time_be_queried() { let clock = TestClock::new(); let mut queue = Queue::with_clock(clock.clone()); let expected: Field = queue.now().into(); let (tx, rx) = channel(); let id = queue.add_sender(tx); let request = Register::new( id.clone(), RegMsg::AddRoute(Path::new( Include::All, Include::Just(Clock::doc_names()[0].clone().into()), Include::Just(Action::Records), )), ); queue.send(Message::new(request)); rx.recv_timeout(TIMEOUT).unwrap(); Clock::start(queue.clone()); let msg = Message::new(Query::new(Clock::doc_names()[0].clone())); queue.send(msg.clone()); let result = rx.recv_timeout(TIMEOUT).unwrap(); assert_eq!(result.get_message_id(), msg.get_message_id()); match result.get_action() { MsgAction::Records(data) => { let rec = data.iter().last().unwrap(); assert_eq!(rec.get(Name::english("time")).unwrap(), expected); } _ => unreachable!("should return on_update: {:?}", result.get_action()), } } }