diff --git a/src/lib.rs b/src/lib.rs index c734d85..fb9c764 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ use isolang::Language; use message::{Message, MessageAction, MessageID}; use queue::{ data_director::{RegMsg, Register}, - router::Queue, + router::{ClockType, Queue, SystemClock, TestClock}, SenderID, }; use std::{ @@ -224,7 +224,14 @@ pub struct MoreThanText { impl MoreThanText { pub fn new() -> Self { - let queue = Queue::new(); + Self::with_clock(SystemClock::new()) + } + + fn with_clock(clock: C) -> Self + where + C: Into, + { + let queue = Queue::with_clock(clock); CreateDoc::start(queue.clone()); // needs to be first. Clock::start(queue.clone()); Session::start(queue.clone()); @@ -253,6 +260,7 @@ impl MoreThanText { } pub struct TestMoreThanText { + clock: TestClock, mtt: MoreThanText, queue: Queue, channel: Option>, @@ -260,9 +268,11 @@ pub struct TestMoreThanText { impl TestMoreThanText { pub fn new() -> Self { - let mtt = MoreThanText::new(); + let clock = TestClock::new(); + let mtt = MoreThanText::with_clock(clock.clone()); let queue = mtt.queue.clone(); Self { + clock: clock, mtt: mtt, queue: queue, channel: None, @@ -312,4 +322,8 @@ impl TestMoreThanText { panic!("received {:?} instead of {:?} trigger", msg, action); } } + + pub fn advance_time(&self, duration: Duration) { + self.clock.advance(duration); + } } diff --git a/src/queue.rs b/src/queue.rs index 12a91bf..7cce91a 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,4 +1,4 @@ pub mod data_director; pub mod router; -pub use router::SenderID; +pub use router::{ClockType, SenderID, SystemClock, TestClock}; diff --git a/src/queue/router.rs b/src/queue/router.rs index 5aa928e..89ca50f 100644 --- a/src/queue/router.rs +++ b/src/queue/router.rs @@ -2,12 +2,14 @@ use crate::{ message::Message, queue::data_director::{DocRegistry, RegMsg, Register}, }; +use chrono::{DateTime, Utc}; use std::{ collections::HashMap, sync::{ mpsc::{channel, Sender}, Arc, RwLock, }, + time::Duration, }; use uuid::Uuid; @@ -89,16 +91,115 @@ impl Router { } } +trait Now { + fn now(&self) -> DateTime { + Utc::now() + } +} + +pub struct SystemClock; + +impl SystemClock { + pub fn new() -> Self { + Self {} + } +} + +impl Now for SystemClock {} + +#[derive(Clone)] +pub struct TestClock { + time: Arc>>, +} + +impl TestClock { + pub fn new() -> Self { + Self { + time: Arc::new(RwLock::new(Utc::now())), + } + } + + pub fn advance(&self, duration: Duration) { + let mut current = self.time.write().unwrap(); + *current += duration; + } +} + +impl Now for TestClock { + fn now(&self) -> DateTime { + let current = self.time.read().unwrap(); + current.clone() + } +} + +#[cfg(test)] +mod test_clocks { + use super::*; + + #[test] + fn is_now_constant() { + let clock = TestClock::new(); + let read1 = clock.now(); + let read2 = clock.now(); + assert_eq!(read1, read2); + } + + #[test] + fn can_time_pass() { + let clock = TestClock::new(); + let current = clock.now(); + let duration = Duration::from_secs(35); + let expected = current + duration; + clock.advance(duration); + assert_eq!(clock.now(), expected); + } +} + +pub enum ClockType { + Clock(SystemClock), + TestClock(TestClock), +} + +impl Now for ClockType { + fn now(&self) -> DateTime { + match self { + Self::Clock(data) => data.now(), + Self::TestClock(data) => data.now(), + } + } +} + +impl From for ClockType { + fn from(value: SystemClock) -> Self { + Self::Clock(value) + } +} + +impl From for ClockType { + fn from(value: TestClock) -> Self { + Self::TestClock(value) + } +} + #[derive(Clone)] pub struct Queue { router: Arc>, + clock: Arc>, } impl Queue { pub fn new() -> Self { + Self::with_clock(SystemClock::new()) + } + + pub fn with_clock(clock: C) -> Self + where + C: Into, + { let (tx, rx) = channel(); let output = Self { router: Arc::new(RwLock::new(Router::new(tx))), + clock: Arc::new(RwLock::new(clock.into())), }; DocRegistry::start(output.clone(), rx); output @@ -123,6 +224,11 @@ impl Queue { let router = self.router.read().unwrap(); router.send(msg.clone()); } + + pub fn now(&self) -> DateTime { + let clock = self.clock.read().unwrap(); + clock.now() + } } #[cfg(test)] @@ -357,4 +463,16 @@ mod queues { _ => unreachable!("got {:?} should have been register", action), } } + + #[test] + fn does_queue_return_time() { + let clock = TestClock::new(); + let queue = Queue::with_clock(clock.clone()); + let mut expected = clock.now(); + assert_eq!(queue.now(), expected); + let forward = Duration::from_secs(22); + clock.advance(forward); + expected += forward; + assert_eq!(queue.now(), expected); + } }