use crate::{ action::{Action, Query}, document::{ clock::Clock, create::IndexType, definition::{DocDef, DocFuncType}, field::FieldType, }, message::{ wrapper::{CalcValue, Calculation, Delete, Message, Operand, Update}, }, name::{Name, NameType}, queue::{ data_director::{Include, Path, RegMsg, Register}, router::Queue, }, }; use std::{sync::mpsc::channel, time::Duration}; pub struct Session; impl Session { fn doc_names() -> Vec { let mut names = Vec::new(); names.push(Name::english("session")); names } pub fn start(mut queue: Queue) { let mut docdef = DocDef::with_names(Self::doc_names()); let mut calc = Calculation::new(Operand::Add); calc.add_value(FieldType::DateTime).unwrap(); calc.add_value(Duration::from_hours(1)).unwrap(); let name_id = Name::english("id"); docdef.add_field(name_id.clone(), FieldType::Uuid); docdef.set_default(&name_id, FieldType::Uuid).unwrap(); docdef.add_index(&name_id, IndexType::Unique).unwrap(); let name_expire = Name::english("expire"); docdef.add_field(name_expire.clone(), FieldType::DateTime); docdef.set_default(&name_expire, calc.clone()).unwrap(); docdef.add_index(&name_expire, IndexType::Index).unwrap(); let mut update = Update::new(Query::new()); update .get_values_mut() .add_field(name_expire.clone(), calc.clone()); let path = Path::new( Include::All, Include::Just(Session::doc_names()[0].clone().into()), Include::Just(Action::OnQuery), ); let query_action = DocFuncType::ExistingQuery(update.into()); docdef.add_route(path, query_action); let mut delete_qry = Query::new(); let mut delete_calc = Calculation::new(Operand::LessThan); delete_calc.add_value(FieldType::DateTime).unwrap(); delete_calc .add_value(CalcValue::Existing(FieldType::DateTime)) .unwrap(); delete_qry.add(name_expire.clone(), delete_calc); let delete = Delete::new(delete_qry); let delete_func = DocFuncType::Trigger(delete.into()); docdef.add_route(Clock::get_path(), delete_func); let (tx, rx) = channel(); let sender_id = queue.add_sender(tx); let msg = Message::new(NameType::None, docdef); let msg_id = msg.get_message_id().clone(); let path = Path::new( Include::Just(msg_id), Include::All, Include::Just(Action::Reply), ); let reg_msg = Register::new(sender_id.clone(), RegMsg::AddRoute(path)); queue.send(msg.response(reg_msg)); rx.recv().unwrap(); queue.send(msg); rx.recv().unwrap(); queue.remove_sender(&sender_id); } } #[cfg(test)] mod sessions { use super::*; use crate::{ action::{MsgAction, Query}, document::{ clock::{clock_test_support::gen_clock_message, Clock}, create::CreateDoc, field::Field, }, message::wrapper::{Addition, Records}, mtterror::MTTError, name::{Name, NameType, Names}, queue::data_director::{Include, Path, RegMsg, Register}, support_tests::TIMEOUT, }; use chrono::{DateTime, Utc}; use std::{ collections::HashSet, sync::mpsc::{Receiver, RecvTimeoutError}, thread::sleep, }; use uuid::Uuid; struct Setup { queue: Queue, rx: Receiver, sender_id: Uuid, } impl Setup { fn new() -> Self { let mut queue = Queue::new(); let (tx, rx) = channel(); let id = queue.add_sender(tx); CreateDoc::start(queue.clone()); Clock::start(queue.clone()); Session::start(queue.clone()); let paths = [ Path::new( Include::All, Include::Just(NameType::Name(Session::doc_names()[0].clone())), Include::Just(Action::Records), ), Path::new( Include::All, Include::Just(NameType::Name(Session::doc_names()[0].clone())), Include::Just(Action::Error), ), ]; for path in paths.iter() { let reg = Register::new(id.clone(), RegMsg::AddRoute(path.clone())); queue.send(Message::new(NameType::None, reg)); rx.recv_timeout(TIMEOUT).unwrap(); } Self { queue: queue, rx: rx, sender_id: id, } } fn message(action: A) -> Message where A: Into, { Message::new(Session::doc_names()[0].clone(), action) } fn recv(&self) -> Result { self.rx.recv_timeout(TIMEOUT) } fn send(&self, msg: Message) { self.queue.send(msg); } fn send_registry_message(&self, msg: RegMsg) { let request = Register::new(self.sender_id.clone(), msg); self.queue.send(Message::new(NameType::None, request)); } } #[test] fn is_session_document_created() { let setup = Setup::new(); for name in Session::doc_names().iter() { let path = Path::new(Include::All, Include::Just(name.into()), Include::All); setup.send_registry_message(RegMsg::AddRoute(path)); let result = setup.recv().unwrap(); let action = result.get_action(); match action { MsgAction::Register(data) => match data.get_msg() { RegMsg::RouteID(_) => {} _ => unreachable!("got {:?} should have been route id", data), }, _ => unreachable!("got {:?} should have been data register", action), } } } #[test] fn are_session_ids_unique() { let setup = Setup::new(); let mut ids: HashSet = HashSet::new(); let count = 10; for _ in 0..count { let msg = Setup::message(Addition::new()); setup.send(msg); let result = setup.recv().unwrap(); match result.get_action() { MsgAction::Records(recs) => { assert_eq!(recs.len(), 1); let rec = recs.iter().last().unwrap(); let holder = rec.get(Name::english("id")).unwrap(); let id = match holder { Field::Uuid(data) => data, _ => unreachable!("got {:?} should have been uuid", holder), }; ids.insert(id); } _ => unreachable!("should always return records"), } } assert_eq!(ids.len(), count, "should be {} ids, got {:?}", count, ids); } #[test] fn expire_default_is_an_hour_from_now() { let setup = Setup::new(); let msg = Setup::message(Addition::new()); let start_time = Utc::now() + Duration::from_hours(1); setup.send(msg); let result = setup.recv().unwrap(); let end_time = Utc::now() + Duration::from_hours(1); let action = result.get_action(); match action { MsgAction::Records(recs) => { assert_eq!(recs.len(), 1); let rec = recs.iter().last().unwrap(); let holder = rec.get(Name::english("expire")).unwrap(); match holder { Field::DateTime(data) => { assert!(data > start_time, "expire should be after {:?}", start_time); assert!(data < end_time, "expire should be before {:?}", end_time); } _ => unreachable!("got {:?} should have been date time", holder), }; } _ => unreachable!("got {:?}, should have gotten records", action), } } #[test] fn session_ids_error_when_not_unique() { let setup = Setup::new(); let id = Uuid::new_v4(); let mut addition = Addition::new(); addition.add_field(Name::english("id"), id); setup.send(Setup::message(addition.clone())); setup.recv().unwrap(); setup.send(Setup::message(addition)); let result = setup.recv().unwrap(); let action = result.get_action(); match action { MsgAction::Error(err) => match err { MTTError::FieldDuplicate => {} _ => unreachable!("got {:?}, should have been a field duplicate", err), }, _ => unreachable!("got {:?}, should have been an error", action), } } #[test] fn expire_should_update_on_successful_query() { let setup = Setup::new(); let id = Uuid::new_v4(); let timestamp = Utc::now() + Duration::from_secs(60); let mut addition = Addition::new(); addition.add_field(Name::english("id"), id.clone()); addition.add_field(Name::english("expire"), timestamp); setup.send(Setup::message(addition)); setup.recv().unwrap(); let mut query = Query::new(); let mut calc = Calculation::new(Operand::Equal); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); calc.add_value(id).unwrap(); query.add(Name::english("id"), calc.clone()); let get_expire_datetime = || -> DateTime { setup.send(Setup::message(query.clone())); let result = setup.recv().unwrap(); let action = result.get_action(); match action { MsgAction::Records(recs) => { assert_eq!(recs.len(), 1); let rec = recs.iter().last().unwrap(); let holder = rec.get(Name::english("expire")).unwrap(); match holder { Field::DateTime(data) => data.clone(), _ => unreachable!("got {:?} should have been date time", holder), } } _ => unreachable!("got {:?}, should have gotten records", action), } }; let start_time = Utc::now() + Duration::from_secs(3600); let first_query = get_expire_datetime(); let end_time = Utc::now() + Duration::from_secs(3601); assert_eq!(first_query, timestamp); let second_query = get_expire_datetime(); assert!( second_query > start_time, "{:?} should be after {:?}", second_query, start_time ); assert!( second_query < end_time, "{:?} should be before {:?}", second_query, end_time ); } #[test] fn clock_removes_expired_sessions() { let setup = Setup::new(); let id1 = Uuid::new_v4(); let id2 = Uuid::new_v4(); let duration = Duration::from_secs(60); let expire1 = Utc::now() + duration; let expire2 = Utc::now() - duration; let mut addition1 = Addition::new(); addition1.add_field(Name::english("id"), id1.clone()); addition1.add_field(Name::english("expire"), expire1); let mut addition2 = Addition::new(); addition2.add_field(Name::english("id"), id2); addition2.add_field(Name::english("expire"), expire2); setup.send(Setup::message(addition1)); setup.send(Setup::message(addition2)); setup.recv().unwrap(); // Eat addition result. setup.recv().unwrap(); // Eat addition result. setup.send(gen_clock_message()); sleep(TIMEOUT); // Allow time to react to message. setup.send(Setup::message(Query::new())); let result = setup.recv().unwrap(); let action = result.get_action(); match action { MsgAction::Records(recs) => { assert_eq!(recs.len(), 1, "nothing was deleted"); let rec = recs.iter().last().unwrap(); let id = rec.get(Name::english("id")).unwrap(); let expire = rec.get(Name::english("expire")).unwrap(); assert_eq!(id, id1.into(), "\n\n{:?}\n{:?}", Utc::now(), recs); assert_eq!(expire, expire1.into(), "\n\n{:?}\n{:?}", Utc::now(), recs); } _ => unreachable!("got {:?}, should have gotten records", action), } } }