Compare commits
4 Commits
89715320cb
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 2995710e59 | |||
| d67bed6bbf | |||
| bf1942fac8 | |||
| 8f34bad321 |
@@ -3,7 +3,7 @@ mod create;
|
|||||||
mod definition;
|
mod definition;
|
||||||
mod field;
|
mod field;
|
||||||
mod record;
|
mod record;
|
||||||
mod session;
|
// mod session;
|
||||||
|
|
||||||
use record::{InternalRecord, InternalRecords, Oid};
|
use record::{InternalRecord, InternalRecords, Oid};
|
||||||
|
|
||||||
@@ -12,7 +12,7 @@ pub use create::{CreateDoc, IndexType};
|
|||||||
pub use definition::{DocDef, DocFuncType};
|
pub use definition::{DocDef, DocFuncType};
|
||||||
pub use field::{Field, FieldType, MissingTranslation};
|
pub use field::{Field, FieldType, MissingTranslation};
|
||||||
pub use record::{Record, Records};
|
pub use record::{Record, Records};
|
||||||
pub use session::Session;
|
//pub use session::Session;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||||
enum DocFeature {
|
enum DocFeature {
|
||||||
|
|||||||
@@ -1,29 +1,69 @@
|
|||||||
|
use super::InternalRecord;
|
||||||
use crate::{
|
use crate::{
|
||||||
action::{Action, MsgAction, Records},
|
action::{Action, MsgAction, Records},
|
||||||
|
document::record::Oid,
|
||||||
message::Message,
|
message::Message,
|
||||||
name::{Name, Names},
|
name::{Name, Names},
|
||||||
queue::{
|
queue::{
|
||||||
data_director::{Include, Path, RegMsg, Register},
|
data_director::{Include, Path, RegMsg, Register},
|
||||||
router::Queue,
|
router::Queue,
|
||||||
|
SenderID,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use std::{
|
use std::{
|
||||||
sync::mpsc::channel,
|
sync::mpsc::{channel, Receiver},
|
||||||
thread::{sleep, spawn},
|
thread::{sleep, spawn},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct Clock {
|
struct Pulser {
|
||||||
queue: Queue,
|
queue: Queue,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clock {
|
impl Pulser {
|
||||||
fn new(queue: Queue) -> Self {
|
fn new(queue: Queue) -> Self {
|
||||||
Self { queue: queue }
|
Self { queue: queue }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn pulse(&self) {
|
||||||
|
loop {
|
||||||
|
let msg = Message::new(MsgAction::OnUpdate(Clock::respnse(self.queue.now())));
|
||||||
|
self.queue.send(msg);
|
||||||
|
sleep(Duration::from_secs(1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Clock {
|
||||||
|
queue: Queue,
|
||||||
|
rx: Receiver<Message>,
|
||||||
|
sender_id: SenderID,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clock {
|
||||||
|
fn new(queue: Queue, rx: Receiver<Message>, id: SenderID) -> Self {
|
||||||
|
Self {
|
||||||
|
queue: queue,
|
||||||
|
rx: rx,
|
||||||
|
sender_id: id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn doc_names() -> Vec<Name> {
|
pub fn doc_names() -> Vec<Name> {
|
||||||
vec![Name::english("clock")]
|
vec![Name::english("system time")]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn respnse(timestamp: DateTime<Utc>) -> Records {
|
||||||
|
let mut names = Names::new();
|
||||||
|
let field_name = Name::english("time");
|
||||||
|
names.add_names(vec![field_name.clone()]).unwrap();
|
||||||
|
let field_id = names.get_id(field_name).unwrap();
|
||||||
|
let mut recs = Records::new(Self::doc_names(), names);
|
||||||
|
let mut rec = InternalRecord::new();
|
||||||
|
rec.insert(field_id, timestamp);
|
||||||
|
recs.insert(Oid::new(), rec);
|
||||||
|
recs
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn gen_message() -> Message {
|
pub fn gen_message() -> Message {
|
||||||
@@ -33,22 +73,22 @@ impl Clock {
|
|||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_path() -> Path {
|
|
||||||
Path::new(
|
|
||||||
Include::All,
|
|
||||||
Include::Just(Clock::doc_names()[0].clone().into()),
|
|
||||||
Include::Just(Action::OnUpdate),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn start(mut queue: Queue) {
|
pub fn start(mut queue: Queue) {
|
||||||
let clock = Clock::new(queue.clone());
|
let pulser = Pulser::new(queue.clone());
|
||||||
|
spawn(move || {
|
||||||
|
pulser.pulse();
|
||||||
|
});
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
let id = queue.add_sender(tx);
|
let id = queue.add_sender(tx);
|
||||||
let reg_msg = Register::new(id, RegMsg::AddDocName(Clock::doc_names()));
|
let path = Path::new(
|
||||||
let msg = Message::new(reg_msg.clone());
|
Include::All,
|
||||||
queue.send(msg);
|
Include::Just(Clock::doc_names()[0].clone().into()),
|
||||||
rx.recv().unwrap();
|
Include::Just(Action::Query),
|
||||||
|
);
|
||||||
|
let reg_msg = Register::new(id.clone(), RegMsg::AddRoute(path.clone()));
|
||||||
|
queue.send(Message::new(reg_msg));
|
||||||
|
rx.recv().unwrap(); // Wait for completion.
|
||||||
|
let clock = Clock::new(queue.clone(), rx, id);
|
||||||
spawn(move || {
|
spawn(move || {
|
||||||
clock.listen();
|
clock.listen();
|
||||||
});
|
});
|
||||||
@@ -56,29 +96,74 @@ impl Clock {
|
|||||||
|
|
||||||
fn listen(&self) {
|
fn listen(&self) {
|
||||||
loop {
|
loop {
|
||||||
self.queue.send(Clock::gen_message());
|
let msg = self.rx.recv().unwrap();
|
||||||
sleep(Duration::from_secs(1));
|
let reply = msg.set_action(Clock::respnse(self.queue.now()));
|
||||||
|
self.queue.send(reply);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
impl Drop for Clock {
|
||||||
pub mod clock_test_support {
|
fn drop(&mut self) {
|
||||||
use super::*;
|
self.queue.remove_sender(&self.sender_id);
|
||||||
|
|
||||||
pub fn gen_clock_message() -> Message {
|
|
||||||
Clock::gen_message()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod clocks {
|
mod clocks {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::queue::data_director::{Include, Path};
|
use crate::{
|
||||||
|
message::MessageAction,
|
||||||
|
queue::{
|
||||||
|
data_director::{Include, Path},
|
||||||
|
TestClock,
|
||||||
|
},
|
||||||
|
Field, Query,
|
||||||
|
};
|
||||||
use chrono::{TimeDelta, Utc};
|
use chrono::{TimeDelta, Utc};
|
||||||
|
|
||||||
static TIMEOUT: Duration = Duration::from_millis(1500);
|
static TIMEOUT: Duration = Duration::from_millis(1500);
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_document_names_be_returned() {
|
||||||
|
let doc_names = vec![Name::english("system time")];
|
||||||
|
assert_eq!(Clock::doc_names(), doc_names);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_return_time_record() {
|
||||||
|
let now = Utc::now();
|
||||||
|
let recs = Clock::respnse(now.clone());
|
||||||
|
assert_eq!(recs.doc_name(), &Clock::doc_names()[0].clone().into());
|
||||||
|
assert_eq!(recs.len(), 1, "received: {:?}", recs);
|
||||||
|
let rec = recs.iter().last().unwrap();
|
||||||
|
assert_eq!(rec.get(Name::english("time")).unwrap(), now.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn does_clock_update_return_correct_time() {
|
||||||
|
let clock = TestClock::new();
|
||||||
|
let mut queue = Queue::with_clock(clock.clone());
|
||||||
|
let expected: Field = queue.now().into();
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let id = queue.add_sender(tx);
|
||||||
|
let request = Register::new(
|
||||||
|
id.clone(),
|
||||||
|
RegMsg::AddRoute(Path::new(Include::All, Include::All, Include::All)),
|
||||||
|
);
|
||||||
|
queue.send(Message::new(request));
|
||||||
|
rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
|
Clock::start(queue.clone());
|
||||||
|
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
|
match result.get_action() {
|
||||||
|
MsgAction::OnUpdate(data) => {
|
||||||
|
let rec = data.iter().last().unwrap();
|
||||||
|
assert_eq!(rec.get(Name::english("time")).unwrap(), expected);
|
||||||
|
}
|
||||||
|
_ => unreachable!("should return on_update: {:?}", result.get_action()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn does_clock_send_reply_every_second() {
|
fn does_clock_send_reply_every_second() {
|
||||||
let mut queue = Queue::new();
|
let mut queue = Queue::new();
|
||||||
@@ -99,15 +184,36 @@ mod clocks {
|
|||||||
let end = Utc::now();
|
let end = Utc::now();
|
||||||
assert!((end - start) > TimeDelta::seconds(1));
|
assert!((end - start) > TimeDelta::seconds(1));
|
||||||
assert!((end - start) < TimeDelta::seconds(2));
|
assert!((end - start) < TimeDelta::seconds(2));
|
||||||
let reg_request = Register::new(id, RegMsg::GetNameID(Clock::doc_names()[0].clone()));
|
|
||||||
queue.send(Message::new(reg_request));
|
|
||||||
rx.recv_timeout(TIMEOUT).unwrap();
|
|
||||||
for msg in holder.iter() {
|
|
||||||
let action = msg.get_action();
|
|
||||||
match action {
|
|
||||||
MsgAction::OnUpdate(result) => assert_eq!(result.len(), 0),
|
|
||||||
_ => unreachable!("got {:?}, should have been empty record", action),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_time_be_queried() {
|
||||||
|
let clock = TestClock::new();
|
||||||
|
let mut queue = Queue::with_clock(clock.clone());
|
||||||
|
let expected: Field = queue.now().into();
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let id = queue.add_sender(tx);
|
||||||
|
let request = Register::new(
|
||||||
|
id.clone(),
|
||||||
|
RegMsg::AddRoute(Path::new(
|
||||||
|
Include::All,
|
||||||
|
Include::Just(Clock::doc_names()[0].clone().into()),
|
||||||
|
Include::Just(Action::Records),
|
||||||
|
)),
|
||||||
|
);
|
||||||
|
queue.send(Message::new(request));
|
||||||
|
rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
|
Clock::start(queue.clone());
|
||||||
|
let msg = Message::new(Query::new(Clock::doc_names()[0].clone()));
|
||||||
|
queue.send(msg.clone());
|
||||||
|
let result = rx.recv_timeout(TIMEOUT).unwrap();
|
||||||
|
assert_eq!(result.get_message_id(), msg.get_message_id());
|
||||||
|
match result.get_action() {
|
||||||
|
MsgAction::Records(data) => {
|
||||||
|
let rec = data.iter().last().unwrap();
|
||||||
|
assert_eq!(rec.get(Name::english("time")).unwrap(), expected);
|
||||||
|
}
|
||||||
|
_ => unreachable!("should return on_update: {:?}", result.get_action()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use super::Session;
|
//use super::Session;
|
||||||
use super::{DocFeature, InternalRecord, InternalRecords, Oid};
|
use super::{DocFeature, InternalRecord, InternalRecords, Oid};
|
||||||
use crate::{
|
use crate::{
|
||||||
action::{Action, CalcValue, Calculation, MsgAction, Query, Records, Reply, Update},
|
action::{Action, CalcValue, Calculation, MsgAction, Query, Records, Reply, Update},
|
||||||
@@ -451,12 +451,15 @@ impl DocumentFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn listen(&mut self) {
|
fn listen(&mut self) {
|
||||||
let sess_name = Session::doc_names()[0].clone();
|
//let sess_name = Session::doc_names()[0].clone();
|
||||||
loop {
|
loop {
|
||||||
let msg = self.rx.recv().unwrap();
|
let msg = self.rx.recv().unwrap();
|
||||||
|
/*
|
||||||
|
* references old session.
|
||||||
if !self.docdef.has_feature(&DocFeature::System) {
|
if !self.docdef.has_feature(&DocFeature::System) {
|
||||||
self.queue.send(Message::new(Query::new(sess_name.clone())));
|
self.queue.send(Message::new(Query::new(sess_name.clone())));
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
let route = msg.get_route();
|
let route = msg.get_route();
|
||||||
for (route_id, doc_func) in self.routes.clone().iter() {
|
for (route_id, doc_func) in self.routes.clone().iter() {
|
||||||
if route == route_id.into() {
|
if route == route_id.into() {
|
||||||
@@ -791,6 +794,8 @@ mod internal_features {
|
|||||||
use crate::{Name, TestMoreThanText};
|
use crate::{Name, TestMoreThanText};
|
||||||
use std::sync::mpsc::RecvTimeoutError;
|
use std::sync::mpsc::RecvTimeoutError;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* references old sessions
|
||||||
#[test]
|
#[test]
|
||||||
fn do_system_documents_ignores_session() {
|
fn do_system_documents_ignores_session() {
|
||||||
let sess_name = Session::doc_names()[0].clone();
|
let sess_name = Session::doc_names()[0].clone();
|
||||||
@@ -815,26 +820,5 @@ mod internal_features {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
#[test]
|
|
||||||
fn do_normal_definitions_request_session() {
|
|
||||||
let sess_name = Session::doc_names()[0].clone();
|
|
||||||
let mut test_env = TestMoreThanText::new();
|
|
||||||
let mut mtt = test_env.get_morethantext();
|
|
||||||
let client = mtt.client();
|
|
||||||
let name = Name::english("something");
|
|
||||||
let docdef = DocDef::new(name.clone());
|
|
||||||
client.create_document(docdef);
|
|
||||||
let path = Path::new(
|
|
||||||
Include::All,
|
|
||||||
Include::Just(sess_name.clone().into()),
|
|
||||||
Include::Just(Action::Query),
|
|
||||||
);
|
|
||||||
test_env.register_channel(vec![path]);
|
|
||||||
client.records(Query::new(name)).unwrap();
|
|
||||||
match test_env.recv() {
|
|
||||||
Ok(msg) => {}
|
|
||||||
Err(err) => unreachable!("got {:?}, should have requested session", err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
36
src/lib.rs
36
src/lib.rs
@@ -5,11 +5,11 @@ mod mtterror;
|
|||||||
pub mod name;
|
pub mod name;
|
||||||
mod queue;
|
mod queue;
|
||||||
|
|
||||||
use document::{Clock, CreateDoc, Session};
|
use document::{Clock, CreateDoc};
|
||||||
use isolang::Language;
|
use isolang::Language;
|
||||||
use message::{Message, MessageAction, MessageID};
|
use message::{Message, MessageAction, MessageID};
|
||||||
use queue::{
|
use queue::{
|
||||||
data_director::{RegMsg, Register},
|
data_director::{RegMsg, Register, Session},
|
||||||
router::{ClockType, Queue, SystemClock, TestClock},
|
router::{ClockType, Queue, SystemClock, TestClock},
|
||||||
SenderID,
|
SenderID,
|
||||||
};
|
};
|
||||||
@@ -43,15 +43,16 @@ pub struct MTTClient {
|
|||||||
queue: Queue,
|
queue: Queue,
|
||||||
rx: Receiver<Message>,
|
rx: Receiver<Message>,
|
||||||
sender_id: SenderID,
|
sender_id: SenderID,
|
||||||
session_id: Uuid,
|
session: Session,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MTTClient {
|
impl MTTClient {
|
||||||
fn new(mut queue: Queue, sess_id: Option<String>, lang: Option<Language>) -> Self {
|
fn new(mut queue: Queue, session: Session) -> Self {
|
||||||
let sess_name = Session::doc_names()[0].clone();
|
//let sess_name = Session::doc_names()[0].clone();
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
let sender_id = queue.add_sender(tx);
|
let sender_id = queue.add_sender(tx);
|
||||||
let msg_id = MessageID::new();
|
let msg_id = MessageID::new();
|
||||||
|
/*
|
||||||
let paths = [
|
let paths = [
|
||||||
Path::new(
|
Path::new(
|
||||||
Include::Just(msg_id.clone()),
|
Include::Just(msg_id.clone()),
|
||||||
@@ -115,16 +116,17 @@ impl MTTClient {
|
|||||||
}
|
}
|
||||||
_ => unreachable!("should only receive session records"),
|
_ => unreachable!("should only receive session records"),
|
||||||
};
|
};
|
||||||
|
*/
|
||||||
Self {
|
Self {
|
||||||
queue: queue,
|
queue: queue,
|
||||||
rx: rx,
|
rx: rx,
|
||||||
sender_id: sender_id,
|
sender_id: sender_id,
|
||||||
session_id: session_id,
|
session: session,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn session_id(&self) -> String {
|
pub fn session_id(&self) -> &Field {
|
||||||
self.session_id.to_string()
|
self.session.id()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_document(&self, docdef: DocDef) -> Result<(), MTTError> {
|
pub fn create_document(&self, docdef: DocDef) -> Result<(), MTTError> {
|
||||||
@@ -143,7 +145,7 @@ impl MTTClient {
|
|||||||
];
|
];
|
||||||
let msg = Message::default()
|
let msg = Message::default()
|
||||||
.set_id(msg_id)
|
.set_id(msg_id)
|
||||||
.set_session(self.session_id.clone());
|
.set_session(self.session.clone());
|
||||||
for path in paths.iter() {
|
for path in paths.iter() {
|
||||||
let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone()));
|
let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone()));
|
||||||
self.queue.send(msg.set_action(reg_msg));
|
self.queue.send(msg.set_action(reg_msg));
|
||||||
@@ -181,7 +183,7 @@ impl MTTClient {
|
|||||||
];
|
];
|
||||||
let msg = Message::default()
|
let msg = Message::default()
|
||||||
.set_id(msg_id.clone())
|
.set_id(msg_id.clone())
|
||||||
.set_session(self.session_id.clone());
|
.set_session(self.session.clone());
|
||||||
for path in paths.iter() {
|
for path in paths.iter() {
|
||||||
let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone()));
|
let reg_msg = Register::new(self.sender_id.clone(), RegMsg::AddRoute(path.clone()));
|
||||||
self.queue.send(msg.set_action(reg_msg));
|
self.queue.send(msg.set_action(reg_msg));
|
||||||
@@ -234,20 +236,24 @@ impl MoreThanText {
|
|||||||
let queue = Queue::with_clock(clock);
|
let queue = Queue::with_clock(clock);
|
||||||
CreateDoc::start(queue.clone()); // needs to be first.
|
CreateDoc::start(queue.clone()); // needs to be first.
|
||||||
Clock::start(queue.clone());
|
Clock::start(queue.clone());
|
||||||
Session::start(queue.clone());
|
//Session::start(queue.clone());
|
||||||
Self { queue: queue }
|
Self { queue: queue }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client(&self) -> MTTClient {
|
pub fn client(&self) -> MTTClient {
|
||||||
MTTClient::new(self.queue.clone(), None, None)
|
MTTClient::new(self.queue.clone(), Session::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_with_language(&self, lang: Language) -> MTTClient {
|
pub fn client_with_language(&self, lang: Language) -> MTTClient {
|
||||||
MTTClient::new(self.queue.clone(), None, Some(lang))
|
MTTClient::new(self.queue.clone(), Session::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn client_with_session(&self, id: String, lang: Option<Language>) -> MTTClient {
|
pub fn client_with_session<F>(&self, id: F, lang: Option<Language>) -> MTTClient
|
||||||
MTTClient::new(self.queue.clone(), Some(id), lang)
|
where
|
||||||
|
F: Into<Field>,
|
||||||
|
{
|
||||||
|
let mut session = Session::default().set_id(id.into());
|
||||||
|
MTTClient::new(self.queue.clone(), session)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_document(&self, name: &str, _id: &str) -> Result<String, MTTError> {
|
pub fn get_document(&self, name: &str, _id: &str) -> Result<String, MTTError> {
|
||||||
|
|||||||
13
src/main.rs
13
src/main.rs
@@ -8,7 +8,7 @@ use axum::{
|
|||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
//use morethantext::{ActionType, ErrorType, MoreThanText};
|
//use morethantext::{ActionType, ErrorType, MoreThanText};
|
||||||
//use morethantext::{MoreThanText, MsgAction, Query};
|
//use morethantext::{MoreThanText, MsgAction, Query};
|
||||||
use morethantext::{ErrorID, MTTError, MoreThanText};
|
use morethantext::{ErrorID, Field, MTTError, MoreThanText};
|
||||||
use std::{collections::HashMap, convert::Infallible};
|
use std::{collections::HashMap, convert::Infallible};
|
||||||
use tokio::{spawn, sync::mpsc::channel};
|
use tokio::{spawn, sync::mpsc::channel};
|
||||||
use tower_cookies::{Cookie, CookieManagerLayer, Cookies};
|
use tower_cookies::{Cookie, CookieManagerLayer, Cookies};
|
||||||
@@ -55,7 +55,7 @@ async fn create_app(state: MoreThanText) -> Router {
|
|||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct SessionID(String);
|
struct SessionID(Field);
|
||||||
|
|
||||||
impl<S> FromRequestParts<S> for SessionID
|
impl<S> FromRequestParts<S> for SessionID
|
||||||
where
|
where
|
||||||
@@ -74,14 +74,14 @@ where
|
|||||||
let (tx, mut rx) = channel(1);
|
let (tx, mut rx) = channel(1);
|
||||||
spawn(async move {
|
spawn(async move {
|
||||||
let id = match requested {
|
let id = match requested {
|
||||||
Some(data) => state.client_with_session(data, None).session_id(),
|
Some(data) => state.client_with_session(data, None).session_id().clone(),
|
||||||
None => state.client().session_id(),
|
None => state.client().session_id().clone(),
|
||||||
};
|
};
|
||||||
tx.send(id).await.unwrap();
|
tx.send(id).await.unwrap();
|
||||||
});
|
});
|
||||||
let id = rx.recv().await.unwrap();
|
let id = rx.recv().await.unwrap();
|
||||||
if !req_id.is_some_and(|x| x == id.to_string()) {
|
if !req_id.is_some_and(|x| x == "id".to_string()) {
|
||||||
cookies.add(Cookie::new(SESSION_KEY, id.to_string()));
|
cookies.add(Cookie::new(SESSION_KEY, "id".to_string()));
|
||||||
}
|
}
|
||||||
Ok(SessionID(id))
|
Ok(SessionID(id))
|
||||||
}
|
}
|
||||||
@@ -149,6 +149,7 @@ mod servers {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
#[ignore = "need to complete moving session into the queue"]
|
||||||
async fn session_ids_are_unique() {
|
async fn session_ids_are_unique() {
|
||||||
let app = create_app(MoreThanText::new()).await;
|
let app = create_app(MoreThanText::new()).await;
|
||||||
let mut holder: Vec<String> = Vec::new();
|
let mut holder: Vec<String> = Vec::new();
|
||||||
|
|||||||
@@ -67,12 +67,9 @@ impl Message {
|
|||||||
self.session = session;
|
self.session = session;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_session<F>(&self, session: F) -> Self
|
pub fn set_session(&self, session: Session) -> Self {
|
||||||
where
|
|
||||||
F: Into<Field>,
|
|
||||||
{
|
|
||||||
let mut output = self.clone();
|
let mut output = self.clone();
|
||||||
output.session = Session::new(session, Utc::now());
|
output.session = session;
|
||||||
output
|
output
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -133,6 +130,7 @@ mod messages {
|
|||||||
use crate::{
|
use crate::{
|
||||||
action::{DocDef, FieldType, Query, Reply},
|
action::{DocDef, FieldType, Query, Reply},
|
||||||
name::{name_id_support::test_name_id, Name},
|
name::{name_id_support::test_name_id, Name},
|
||||||
|
queue::data_director::Session,
|
||||||
ErrorID, MTTError,
|
ErrorID, MTTError,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -175,10 +173,12 @@ mod messages {
|
|||||||
let doc_name = Name::english("use field");
|
let doc_name = Name::english("use field");
|
||||||
let qry = Query::new(doc_name.clone());
|
let qry = Query::new(doc_name.clone());
|
||||||
let sess_id: Field = Uuid::new_v4().into();
|
let sess_id: Field = Uuid::new_v4().into();
|
||||||
let msg = Message::new(qry).set_session(sess_id.clone());
|
let session = Session::new(sess_id.clone(), Utc::now());
|
||||||
|
let msg = Message::new(qry).set_session(session);
|
||||||
assert_eq!(msg.session_id(), &sess_id);
|
assert_eq!(msg.session_id(), &sess_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
#[test]
|
#[test]
|
||||||
fn can_session_be_set_by_string() {
|
fn can_session_be_set_by_string() {
|
||||||
let doc_name = Name::english("string");
|
let doc_name = Name::english("string");
|
||||||
@@ -196,6 +196,7 @@ mod messages {
|
|||||||
let msg = Message::new(qry).set_session(sess_id.clone());
|
let msg = Message::new(qry).set_session(sess_id.clone());
|
||||||
assert_eq!(msg.session_id(), &sess_id.into());
|
assert_eq!(msg.session_id(), &sess_id.into());
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn can_action_be_set() {
|
fn can_action_be_set() {
|
||||||
|
|||||||
@@ -1,843 +1,3 @@
|
|||||||
use super::SenderID;
|
mod engine;
|
||||||
use crate::{
|
|
||||||
action::{Action, Field, MsgAction},
|
|
||||||
message::{Message, MessageAction, MessageID},
|
|
||||||
mtterror::MTTError,
|
|
||||||
name::{Name, NameID, NameType, Names},
|
|
||||||
queue::router::Queue,
|
|
||||||
};
|
|
||||||
use chrono::{DateTime, Utc};
|
|
||||||
use std::{
|
|
||||||
collections::{HashMap, HashSet},
|
|
||||||
default,
|
|
||||||
sync::mpsc::Receiver,
|
|
||||||
thread::spawn,
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Eq, Hash)]
|
pub use engine::{DocRegistry, Include, Path, RegMsg, Register, Route, RouteID, Session};
|
||||||
pub enum Include<T> {
|
|
||||||
All,
|
|
||||||
Just(T),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: PartialEq> PartialEq for Include<T> {
|
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
match self {
|
|
||||||
Include::All => true,
|
|
||||||
Include::Just(data) => match other {
|
|
||||||
Include::All => true,
|
|
||||||
Include::Just(other_data) => data == other_data,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod includes {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn does_all_equal_evberything() {
|
|
||||||
let a: Include<isize> = Include::All;
|
|
||||||
let b: Include<isize> = Include::Just(5);
|
|
||||||
let c: Include<isize> = Include::Just(7);
|
|
||||||
assert!(a == a, "all should equal all");
|
|
||||||
assert!(a == b, "all should equal just");
|
|
||||||
assert!(b == a, "just should equal all");
|
|
||||||
assert!(b == b, "same just should equal");
|
|
||||||
assert!(b != c, "different justs do not equal");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub enum RegMsg {
|
|
||||||
AddRoute(Path),
|
|
||||||
AddDocName(Vec<Name>),
|
|
||||||
DocumentNameID(NameID),
|
|
||||||
Error(MTTError),
|
|
||||||
GetNameID(Name),
|
|
||||||
Ok,
|
|
||||||
RemoveRoute(Route),
|
|
||||||
RemoveSender(SenderID),
|
|
||||||
RouteID(RouteID),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct Register {
|
|
||||||
msg: RegMsg,
|
|
||||||
sender_id: SenderID,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Register {
|
|
||||||
pub fn new(sender_id: SenderID, reg_msg: RegMsg) -> Self {
|
|
||||||
Self {
|
|
||||||
msg: reg_msg,
|
|
||||||
sender_id: sender_id,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_msg(&self) -> &RegMsg {
|
|
||||||
&self.msg
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_sender_id(&self) -> &SenderID {
|
|
||||||
&self.sender_id
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn response(&self, reg_msg: RegMsg) -> Self {
|
|
||||||
Self {
|
|
||||||
msg: reg_msg,
|
|
||||||
sender_id: self.sender_id.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MessageAction for Register {}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod registries {
|
|
||||||
use super::*;
|
|
||||||
use crate::name::name_id_support::test_name_id;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn does_registry_store_data() {
|
|
||||||
let name_id = test_name_id();
|
|
||||||
let sender_data_id = SenderID::new();
|
|
||||||
let inputs = [
|
|
||||||
RegMsg::DocumentNameID(name_id.clone()),
|
|
||||||
RegMsg::RemoveSender(sender_data_id.clone()),
|
|
||||||
];
|
|
||||||
for regmsg in inputs.iter() {
|
|
||||||
let sender_id = SenderID::new();
|
|
||||||
let reg = Register::new(sender_id.clone(), regmsg.clone());
|
|
||||||
assert_eq!(reg.doc_name(), &NameType::None);
|
|
||||||
assert_eq!(reg.get_sender_id(), &sender_id);
|
|
||||||
match reg.get_msg() {
|
|
||||||
RegMsg::DocumentNameID(data) => assert_eq!(data, &name_id),
|
|
||||||
RegMsg::RemoveSender(data) => assert_eq!(data, &sender_data_id),
|
|
||||||
_ => unreachable!("should have been one of the inputs"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct Path {
|
|
||||||
pub msg_id: Include<MessageID>,
|
|
||||||
pub doc: Include<NameType>,
|
|
||||||
pub action: Include<Action>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Path {
|
|
||||||
pub fn new(id: Include<MessageID>, doc: Include<NameType>, action: Include<Action>) -> Self {
|
|
||||||
Self {
|
|
||||||
msg_id: id,
|
|
||||||
doc: doc,
|
|
||||||
action: action,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn for_message<NT>(name: NT, action: &MsgAction) -> Self
|
|
||||||
where
|
|
||||||
NT: Into<NameType>,
|
|
||||||
{
|
|
||||||
Self {
|
|
||||||
msg_id: Include::Just(MessageID::new()),
|
|
||||||
doc: Include::Just(name.into()),
|
|
||||||
action: Include::Just(action.into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod paths {
|
|
||||||
use super::*;
|
|
||||||
use crate::{
|
|
||||||
action::{Records, Show},
|
|
||||||
name::{Name, Names},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_create_for_message() {
|
|
||||||
let input = [
|
|
||||||
(
|
|
||||||
Name::english("one"),
|
|
||||||
MsgAction::Show(Show::new(Name::english("one"))),
|
|
||||||
),
|
|
||||||
(
|
|
||||||
Name::english("two"),
|
|
||||||
MsgAction::Records(Records::new(vec![Name::english("two")], Names::new())),
|
|
||||||
),
|
|
||||||
];
|
|
||||||
for item in input.iter() {
|
|
||||||
let path = Path::for_message(item.0.clone(), &item.1);
|
|
||||||
match path.doc {
|
|
||||||
Include::Just(name) => assert_eq!(name, item.0.clone().into()),
|
|
||||||
_ => unreachable!("should have returned document name"),
|
|
||||||
}
|
|
||||||
match path.action {
|
|
||||||
Include::Just(action) => assert_eq!(action, item.1.clone().into()),
|
|
||||||
_ => unreachable!("should have returned action type"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn message_ids_are_unique_for_message_paths() {
|
|
||||||
let count = 10;
|
|
||||||
let mut ids: Vec<MessageID> = Vec::new();
|
|
||||||
for _ in 0..count {
|
|
||||||
let path =
|
|
||||||
Path::for_message(NameType::None, &MsgAction::Show(Show::new(NameType::None)));
|
|
||||||
let id = match path.msg_id {
|
|
||||||
Include::Just(data) => data.clone(),
|
|
||||||
Include::All => unreachable!("should have been a message id"),
|
|
||||||
};
|
|
||||||
assert!(!ids.contains(&id), "{:?} is duplicated in {:?}", id, ids);
|
|
||||||
ids.push(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
|
||||||
pub struct Route {
|
|
||||||
pub action: Include<Action>,
|
|
||||||
pub doc_id: Include<NameID>,
|
|
||||||
pub msg_id: Include<MessageID>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Route {
|
|
||||||
pub fn new(msg_id: Include<MessageID>, doc: Include<NameID>, action: Include<Action>) -> Self {
|
|
||||||
Self {
|
|
||||||
action: action,
|
|
||||||
doc_id: doc,
|
|
||||||
msg_id: msg_id,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Route {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
action: Include::All,
|
|
||||||
doc_id: Include::All,
|
|
||||||
msg_id: Include::All,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<RouteID> for Route {
|
|
||||||
fn from(value: RouteID) -> Self {
|
|
||||||
Self::from(&value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<&RouteID> for Route {
|
|
||||||
fn from(value: &RouteID) -> Self {
|
|
||||||
Self {
|
|
||||||
action: match &value.action {
|
|
||||||
Some(data) => Include::Just(data.clone()),
|
|
||||||
None => Include::All,
|
|
||||||
},
|
|
||||||
doc_id: match &value.doc_id {
|
|
||||||
Some(doc) => Include::Just(doc.clone()),
|
|
||||||
None => Include::All,
|
|
||||||
},
|
|
||||||
msg_id: match &value.msg_id {
|
|
||||||
Some(msg) => Include::Just(msg.clone()),
|
|
||||||
None => Include::All,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
|
||||||
pub struct RouteID {
|
|
||||||
action: Option<Action>,
|
|
||||||
doc_id: Option<NameID>,
|
|
||||||
msg_id: Option<MessageID>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Route> for RouteID {
|
|
||||||
fn from(value: Route) -> Self {
|
|
||||||
Self {
|
|
||||||
action: match value.action {
|
|
||||||
Include::All => None,
|
|
||||||
Include::Just(action) => Some(action.clone()),
|
|
||||||
},
|
|
||||||
doc_id: match value.doc_id {
|
|
||||||
Include::All => None,
|
|
||||||
Include::Just(doc) => Some(doc.clone()),
|
|
||||||
},
|
|
||||||
msg_id: match value.msg_id {
|
|
||||||
Include::All => None,
|
|
||||||
Include::Just(id) => Some(id.clone()),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct RouteStorage {
|
|
||||||
data: HashMap<RouteID, HashSet<SenderID>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RouteStorage {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
data: HashMap::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add(&mut self, route: Route, sender_id: SenderID) -> RouteID {
|
|
||||||
let route_id: RouteID = route.into();
|
|
||||||
let set = match self.data.get_mut(&route_id) {
|
|
||||||
Some(result) => result,
|
|
||||||
None => {
|
|
||||||
let holder = HashSet::new();
|
|
||||||
self.data.insert(route_id.clone(), holder);
|
|
||||||
self.data.get_mut(&route_id).unwrap()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
set.insert(sender_id);
|
|
||||||
route_id
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_sender_id(&mut self, sender_id: &SenderID) {
|
|
||||||
let mut removal: Vec<RouteID> = Vec::new();
|
|
||||||
for (route_id, set) in self.data.iter_mut() {
|
|
||||||
set.remove(sender_id);
|
|
||||||
if set.is_empty() {
|
|
||||||
removal.push(route_id.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for route_id in removal.iter() {
|
|
||||||
self.data.remove(route_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_route(&mut self, route: Route, sender: SenderID) {
|
|
||||||
let route_id: RouteID = route.into();
|
|
||||||
let mut remove = false;
|
|
||||||
match self.data.get_mut(&route_id) {
|
|
||||||
Some(store) => {
|
|
||||||
store.remove(&sender);
|
|
||||||
if store.len() == 0 {
|
|
||||||
remove = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => {}
|
|
||||||
}
|
|
||||||
if remove {
|
|
||||||
self.data.remove(&route_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get(&self, route: Route) -> HashSet<SenderID> {
|
|
||||||
let mut output = HashSet::new();
|
|
||||||
for (route_id, set) in self.data.iter() {
|
|
||||||
if route == route_id.into() {
|
|
||||||
output = output.union(set).cloned().collect();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
output
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod route_storeage {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_add_routes() {
|
|
||||||
let mut routes = RouteStorage::new();
|
|
||||||
let id1 = SenderID::new();
|
|
||||||
let id2 = SenderID::new();
|
|
||||||
let route1 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
let route2 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
let route_id1 = routes.add(route1.clone(), id1.clone());
|
|
||||||
let route_id2 = routes.add(route2.clone(), id2.clone());
|
|
||||||
let result1 = routes.get(route1.clone());
|
|
||||||
assert_eq!(result1.len(), 1);
|
|
||||||
assert!(
|
|
||||||
result1.contains(&id1),
|
|
||||||
"{:?} not found in {:?}",
|
|
||||||
id1,
|
|
||||||
result1
|
|
||||||
);
|
|
||||||
assert_eq!(route_id1, route1.into());
|
|
||||||
let result2 = routes.get(route2.clone());
|
|
||||||
assert_eq!(result2.len(), 1);
|
|
||||||
assert!(
|
|
||||||
result2.contains(&id2),
|
|
||||||
"{:?} not found in {:?}",
|
|
||||||
id2,
|
|
||||||
result2
|
|
||||||
);
|
|
||||||
assert_eq!(route_id2, route2.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn returns_empty_set_when_nothing_is_available() {
|
|
||||||
let routes = RouteStorage::new();
|
|
||||||
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
let result = routes.get(route);
|
|
||||||
assert_eq!(result.len(), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn returns_all_entries_using_the_same_route() {
|
|
||||||
let count = 5;
|
|
||||||
let mut routes = RouteStorage::new();
|
|
||||||
let mut ids: HashSet<SenderID> = HashSet::new();
|
|
||||||
while ids.len() < count {
|
|
||||||
ids.insert(SenderID::new());
|
|
||||||
}
|
|
||||||
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
for id in ids.iter() {
|
|
||||||
routes.add(route.clone(), id.clone());
|
|
||||||
}
|
|
||||||
let result = routes.get(route);
|
|
||||||
assert_eq!(result, ids);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn routes_are_not_duplicated() {
|
|
||||||
let count = 5;
|
|
||||||
let mut routes = RouteStorage::new();
|
|
||||||
let id = SenderID::new();
|
|
||||||
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
for _ in 0..count {
|
|
||||||
routes.add(route.clone(), id.clone());
|
|
||||||
}
|
|
||||||
let result = routes.get(route);
|
|
||||||
assert_eq!(result.len(), 1);
|
|
||||||
assert!(result.contains(&id), "{:?} not found in {:?}", id, result);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn overlapping_routes_are_combined() {
|
|
||||||
let mut routes = RouteStorage::new();
|
|
||||||
let id1 = SenderID::new();
|
|
||||||
let id2 = SenderID::new();
|
|
||||||
let route1 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
let route2 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
routes.add(route1.clone(), id1.clone());
|
|
||||||
routes.add(route2.clone(), id2.clone());
|
|
||||||
let retrieve = Route::new(Include::All, Include::All, Include::All);
|
|
||||||
let result = routes.get(retrieve);
|
|
||||||
assert_eq!(result.len(), 2);
|
|
||||||
assert!(result.contains(&id1), "{:?} not found in {:?}", id1, result);
|
|
||||||
assert!(result.contains(&id2), "{:?} not found in {:?}", id2, result);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_remove_sender_id() {
|
|
||||||
let mut routes = RouteStorage::new();
|
|
||||||
let count = 5;
|
|
||||||
let mut ids: HashSet<SenderID> = HashSet::new();
|
|
||||||
while ids.len() < count {
|
|
||||||
ids.insert(SenderID::new());
|
|
||||||
}
|
|
||||||
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
for id in ids.iter() {
|
|
||||||
routes.add(route.clone(), id.clone());
|
|
||||||
}
|
|
||||||
let removed = ids.iter().last().unwrap().clone();
|
|
||||||
ids.remove(&removed);
|
|
||||||
routes.remove_sender_id(&removed);
|
|
||||||
let result = routes.get(route);
|
|
||||||
assert_eq!(result, ids);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn empty_routes_are_release_memory() {
|
|
||||||
let mut routes = RouteStorage::new();
|
|
||||||
let id = SenderID::new();
|
|
||||||
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
routes.add(route.clone(), id.clone());
|
|
||||||
routes.remove_sender_id(&id);
|
|
||||||
assert_eq!(routes.data.len(), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_route_be_removed() {
|
|
||||||
let mut routes = RouteStorage::new();
|
|
||||||
let id = SenderID::new();
|
|
||||||
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
routes.add(route.clone(), id.clone());
|
|
||||||
routes.remove_route(route.clone(), id);
|
|
||||||
assert_eq!(routes.data.len(), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_shared_route_be_removed() {
|
|
||||||
let mut routes = RouteStorage::new();
|
|
||||||
let id1 = SenderID::new();
|
|
||||||
let id2 = SenderID::new();
|
|
||||||
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
|
||||||
routes.add(route.clone(), id1.clone());
|
|
||||||
routes.add(route.clone(), id2.clone());
|
|
||||||
routes.remove_route(route.clone(), id1);
|
|
||||||
assert_eq!(routes.data.len(), 1);
|
|
||||||
let ids = routes.get(route.clone());
|
|
||||||
let id = ids.iter().last().unwrap();
|
|
||||||
assert_eq!(id, &id2);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct Session {
|
|
||||||
id: Field,
|
|
||||||
expire_time: DateTime<Utc>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Session {
|
|
||||||
const EXPIRE_IN: Duration = Duration::from_hours(1);
|
|
||||||
|
|
||||||
pub fn new<F>(id: F, time: DateTime<Utc>) -> Self
|
|
||||||
where
|
|
||||||
F: Into<Field>,
|
|
||||||
{
|
|
||||||
Self {
|
|
||||||
id: id.into(),
|
|
||||||
expire_time: time + Self::EXPIRE_IN,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn id(&self) -> &Field {
|
|
||||||
&self.id
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extend(&mut self, time: DateTime<Utc>) {
|
|
||||||
self.expire_time = time + Session::EXPIRE_IN;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_expired(&self, time: DateTime<Utc>) -> bool {
|
|
||||||
time > self.expire_time
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Session {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
id: Field::None,
|
|
||||||
expire_time: Utc::now(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod session_entries {
|
|
||||||
use super::*;
|
|
||||||
use chrono::Utc;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn is_there_a_default() {
|
|
||||||
let session = Session::default();
|
|
||||||
assert_eq!(session.id(), &Field::None);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn does_entry_return_id() {
|
|
||||||
let id: Field = Uuid::new_v4().into();
|
|
||||||
let time = Utc::now();
|
|
||||||
let entry = Session::new(id.clone(), time.clone());
|
|
||||||
assert_eq!(entry.id, id);
|
|
||||||
assert_eq!(entry.expire_time, time + Session::EXPIRE_IN);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_determine_if_entry_expired() {
|
|
||||||
let id: Field = Uuid::new_v4().into();
|
|
||||||
let time = Utc::now();
|
|
||||||
let entry = Session::new(id.clone(), time.clone());
|
|
||||||
assert!(
|
|
||||||
!entry.is_expired(time + Session::EXPIRE_IN),
|
|
||||||
"should not be expired"
|
|
||||||
);
|
|
||||||
assert!(
|
|
||||||
entry.is_expired(time + Session::EXPIRE_IN + Duration::from_secs(1)),
|
|
||||||
"should be expired"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_expiration_be_reset() {
|
|
||||||
let id: Field = Uuid::new_v4().into();
|
|
||||||
let mut entry = Session::new(id.clone(), Utc::now());
|
|
||||||
let time = Utc::now();
|
|
||||||
entry.extend(time.clone());
|
|
||||||
assert_eq!(entry.expire_time, time + Session::EXPIRE_IN);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SessionStorage {
|
|
||||||
entries: HashMap<Field, Session>,
|
|
||||||
queue: Queue,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SessionStorage {
|
|
||||||
fn new(queue: Queue) -> Self {
|
|
||||||
Self {
|
|
||||||
entries: HashMap::new(),
|
|
||||||
queue: queue,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get(&mut self, id: &Field) -> Session {
|
|
||||||
let converted = match id {
|
|
||||||
Field::Uuid(data) => Field::Uuid(data.clone()),
|
|
||||||
Field::StaticString(data) => match Uuid::try_from(data.clone()) {
|
|
||||||
Ok(id) => Field::Uuid(id.clone()),
|
|
||||||
Err(_) => Field::None,
|
|
||||||
},
|
|
||||||
_ => Field::None,
|
|
||||||
};
|
|
||||||
match self.entries.get_mut(&converted) {
|
|
||||||
Some(data) => {
|
|
||||||
data.extend(self.queue.now());
|
|
||||||
data.clone()
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
let mut new_id: Field = Uuid::new_v4().into();
|
|
||||||
while self.entries.contains_key(&new_id) {
|
|
||||||
new_id = Uuid::new_v4().into();
|
|
||||||
}
|
|
||||||
let output = Session::new(new_id.clone(), self.queue.now());
|
|
||||||
self.entries.insert(new_id, output.clone());
|
|
||||||
output
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn expire(&mut self) {
|
|
||||||
let mut remove: Vec<Field> = Vec::new();
|
|
||||||
let time = self.queue.now();
|
|
||||||
for (id, session) in self.entries.iter() {
|
|
||||||
if session.is_expired(time) {
|
|
||||||
remove.push(id.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for id in remove.iter() {
|
|
||||||
self.entries.remove(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod session_storage {
|
|
||||||
use super::*;
|
|
||||||
use crate::{
|
|
||||||
queue::{self, TestClock},
|
|
||||||
FieldType,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn are_session_ids_unique() {
|
|
||||||
let count = 10;
|
|
||||||
let mut ids: Vec<Field> = Vec::new();
|
|
||||||
let clock = TestClock::new();
|
|
||||||
let queue = Queue::with_clock(clock.clone());
|
|
||||||
let mut sess = SessionStorage::new(queue.clone());
|
|
||||||
let expire_time = queue.now() + Session::EXPIRE_IN;
|
|
||||||
for _ in 0..count {
|
|
||||||
let result = sess.get(&Field::None);
|
|
||||||
assert_eq!(result.expire_time, expire_time);
|
|
||||||
let id = result.id().clone();
|
|
||||||
let id_type: FieldType = (&id).into();
|
|
||||||
assert_eq!(id_type, FieldType::Uuid);
|
|
||||||
assert!(!ids.contains(&id), "{:?} is not unique in {:?}", id, ids);
|
|
||||||
ids.push(id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn are_valid_uuids_returned() {
|
|
||||||
let clock = TestClock::new();
|
|
||||||
let queue = Queue::with_clock(clock.clone());
|
|
||||||
let mut sess = SessionStorage::new(queue.clone());
|
|
||||||
let data = sess.get(&Field::None);
|
|
||||||
clock.advance(Duration::from_secs(5));
|
|
||||||
let time = queue.now();
|
|
||||||
let entry = sess.get(data.id());
|
|
||||||
assert_eq!(entry.id(), data.id());
|
|
||||||
assert_eq!(entry.expire_time, time + Session::EXPIRE_IN);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn do_bad_ids_generate_new_ids() {
|
|
||||||
let mut sess = SessionStorage::new(Queue::new());
|
|
||||||
let data: Field = Uuid::nil().into();
|
|
||||||
let result = sess.get(&data);
|
|
||||||
assert_ne!(result.id(), &data);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn can_string_ids_be_accepted() {
|
|
||||||
let mut sess = SessionStorage::new(Queue::new());
|
|
||||||
let id = sess.get(&Field::None).id().clone();
|
|
||||||
let text: Field = match id {
|
|
||||||
Field::Uuid(id) => id.to_string().into(),
|
|
||||||
_ => unreachable!("entry id should always return a uuid"),
|
|
||||||
};
|
|
||||||
let result = sess.get(&text);
|
|
||||||
assert_eq!(result.id(), &id);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn does_mismatched_string_produce_new_id() {
|
|
||||||
let mut sess = SessionStorage::new(Queue::new());
|
|
||||||
let input = Uuid::nil();
|
|
||||||
let id: Field = input.to_string().into();
|
|
||||||
let test_data: Field = input.into();
|
|
||||||
let result = sess.get(&id);
|
|
||||||
assert_ne!(result.id(), &test_data);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn does_bad_string_produce_id() {
|
|
||||||
let mut sess = SessionStorage::new(Queue::new());
|
|
||||||
let id: Field = "not a uuid".into();
|
|
||||||
let result = sess.get(&id);
|
|
||||||
match result.id() {
|
|
||||||
Field::Uuid(_) => {}
|
|
||||||
_ => panic!("session id should always return a uuid field"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn do_other_fields_return_uuid() {
|
|
||||||
let mut sess = SessionStorage::new(Queue::new());
|
|
||||||
let id: Field = 2.into();
|
|
||||||
let result = sess.get(&id);
|
|
||||||
match result.id() {
|
|
||||||
Field::Uuid(_) => {}
|
|
||||||
_ => panic!("session id should always return a uuid field"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn are_expired_sessions_removed() {
|
|
||||||
let clock = TestClock::new();
|
|
||||||
let queue = Queue::with_clock(clock.clone());
|
|
||||||
let mut sess = SessionStorage::new(queue.clone());
|
|
||||||
let data = sess.get(&Field::None);
|
|
||||||
assert_eq!(sess.entries.len(), 1, "should have one entry");
|
|
||||||
clock.advance(Session::EXPIRE_IN);
|
|
||||||
sess.expire();
|
|
||||||
assert_eq!(
|
|
||||||
sess.entries.len(),
|
|
||||||
1,
|
|
||||||
"entry should not have expired: expire: {:?}, time: {:?}",
|
|
||||||
data.expire_time,
|
|
||||||
queue.now()
|
|
||||||
);
|
|
||||||
clock.advance(Duration::from_nanos(1));
|
|
||||||
sess.expire();
|
|
||||||
assert_eq!(
|
|
||||||
sess.entries.len(),
|
|
||||||
0,
|
|
||||||
"entry should have expired: expire: {:?}, time: {:?}",
|
|
||||||
data.expire_time,
|
|
||||||
queue.now()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct DocRegistry {
|
|
||||||
doc_names: Names,
|
|
||||||
queue: Queue,
|
|
||||||
receiver: Receiver<Message>,
|
|
||||||
routes: RouteStorage,
|
|
||||||
sessions: SessionStorage,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DocRegistry {
|
|
||||||
fn new(queue: Queue, rx: Receiver<Message>) -> Self {
|
|
||||||
Self {
|
|
||||||
doc_names: Names::new(),
|
|
||||||
queue: queue.clone(),
|
|
||||||
receiver: rx,
|
|
||||||
routes: RouteStorage::new(),
|
|
||||||
sessions: SessionStorage::new(queue),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn start(queue: Queue, rx: Receiver<Message>) {
|
|
||||||
let mut doc_names = DocRegistry::new(queue, rx);
|
|
||||||
spawn(move || {
|
|
||||||
doc_names.listen();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn listen(&mut self) {
|
|
||||||
loop {
|
|
||||||
let mut msg = self.receiver.recv().unwrap();
|
|
||||||
match msg.get_action() {
|
|
||||||
MsgAction::Register(data) => {
|
|
||||||
let id = data.get_sender_id();
|
|
||||||
let reply = msg.set_action(self.register_action(data));
|
|
||||||
self.queue.forward(id, reply);
|
|
||||||
}
|
|
||||||
_ => match self.path_to_route(&msg.get_path()) {
|
|
||||||
Ok(route) => {
|
|
||||||
let session = self.sessions.get(msg.session_id());
|
|
||||||
msg.override_session(session.clone());
|
|
||||||
msg.set_route(route.clone());
|
|
||||||
for sender_id in self.routes.get(route).iter() {
|
|
||||||
self.queue.forward(sender_id, msg.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => self.queue.send(msg.set_action(MsgAction::Error(err))),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn path_to_route(&self, path: &Path) -> Result<Route, MTTError> {
|
|
||||||
let doc_id = match &path.doc {
|
|
||||||
Include::Just(name) => match self.doc_names.get_id(name) {
|
|
||||||
Ok(id) => Include::Just(id),
|
|
||||||
Err(err) => return Err(err),
|
|
||||||
},
|
|
||||||
Include::All => Include::All,
|
|
||||||
};
|
|
||||||
Ok(Route::new(path.msg_id.clone(), doc_id, path.action.clone()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn register_action(&mut self, reg: &Register) -> Register {
|
|
||||||
match reg.get_msg() {
|
|
||||||
RegMsg::AddDocName(names) => match self.doc_names.add_names(names.clone()) {
|
|
||||||
Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())),
|
|
||||||
Err(err) => reg.response(RegMsg::Error(err)),
|
|
||||||
},
|
|
||||||
RegMsg::AddRoute(path) => {
|
|
||||||
let response = match self.path_to_route(path) {
|
|
||||||
Ok(route) => {
|
|
||||||
let id = self.routes.add(route, reg.get_sender_id().clone());
|
|
||||||
RegMsg::RouteID(id)
|
|
||||||
}
|
|
||||||
Err(err) => RegMsg::Error(err),
|
|
||||||
};
|
|
||||||
reg.response(response)
|
|
||||||
}
|
|
||||||
RegMsg::GetNameID(name) => match self.doc_names.get_id(name) {
|
|
||||||
Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())),
|
|
||||||
Err(err) => reg.response(RegMsg::Error(err)),
|
|
||||||
},
|
|
||||||
RegMsg::RemoveSender(sender_id) => {
|
|
||||||
self.routes.remove_sender_id(sender_id);
|
|
||||||
reg.response(RegMsg::Ok)
|
|
||||||
}
|
|
||||||
RegMsg::RemoveRoute(route) => {
|
|
||||||
self.routes
|
|
||||||
.remove_route(route.clone(), reg.get_sender_id().clone());
|
|
||||||
reg.response(RegMsg::Ok)
|
|
||||||
}
|
|
||||||
_ => reg.response(RegMsg::Ok),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
849
src/queue/data_director/engine.rs
Normal file
849
src/queue/data_director/engine.rs
Normal file
@@ -0,0 +1,849 @@
|
|||||||
|
use super::super::SenderID;
|
||||||
|
use crate::{
|
||||||
|
action::{Action, Field, MsgAction},
|
||||||
|
document::Clock,
|
||||||
|
message::{Message, MessageAction, MessageID},
|
||||||
|
mtterror::MTTError,
|
||||||
|
name::{Name, NameID, NameType, Names},
|
||||||
|
queue::router::Queue,
|
||||||
|
};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
default,
|
||||||
|
sync::mpsc::Receiver,
|
||||||
|
thread::spawn,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Eq, Hash)]
|
||||||
|
pub enum Include<T> {
|
||||||
|
All,
|
||||||
|
Just(T),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: PartialEq> PartialEq for Include<T> {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
match self {
|
||||||
|
Include::All => true,
|
||||||
|
Include::Just(data) => match other {
|
||||||
|
Include::All => true,
|
||||||
|
Include::Just(other_data) => data == other_data,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod includes {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn does_all_equal_evberything() {
|
||||||
|
let a: Include<isize> = Include::All;
|
||||||
|
let b: Include<isize> = Include::Just(5);
|
||||||
|
let c: Include<isize> = Include::Just(7);
|
||||||
|
assert!(a == a, "all should equal all");
|
||||||
|
assert!(a == b, "all should equal just");
|
||||||
|
assert!(b == a, "just should equal all");
|
||||||
|
assert!(b == b, "same just should equal");
|
||||||
|
assert!(b != c, "different justs do not equal");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub enum RegMsg {
|
||||||
|
AddRoute(Path),
|
||||||
|
AddDocName(Vec<Name>),
|
||||||
|
DocumentNameID(NameID),
|
||||||
|
Error(MTTError),
|
||||||
|
GetNameID(Name),
|
||||||
|
Ok,
|
||||||
|
RemoveRoute(Route),
|
||||||
|
RemoveSender(SenderID),
|
||||||
|
RouteID(RouteID),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Register {
|
||||||
|
msg: RegMsg,
|
||||||
|
sender_id: SenderID,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Register {
|
||||||
|
pub fn new(sender_id: SenderID, reg_msg: RegMsg) -> Self {
|
||||||
|
Self {
|
||||||
|
msg: reg_msg,
|
||||||
|
sender_id: sender_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_msg(&self) -> &RegMsg {
|
||||||
|
&self.msg
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_sender_id(&self) -> &SenderID {
|
||||||
|
&self.sender_id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn response(&self, reg_msg: RegMsg) -> Self {
|
||||||
|
Self {
|
||||||
|
msg: reg_msg,
|
||||||
|
sender_id: self.sender_id.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageAction for Register {}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod registries {
|
||||||
|
use super::*;
|
||||||
|
use crate::name::name_id_support::test_name_id;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn does_registry_store_data() {
|
||||||
|
let name_id = test_name_id();
|
||||||
|
let sender_data_id = SenderID::new();
|
||||||
|
let inputs = [
|
||||||
|
RegMsg::DocumentNameID(name_id.clone()),
|
||||||
|
RegMsg::RemoveSender(sender_data_id.clone()),
|
||||||
|
];
|
||||||
|
for regmsg in inputs.iter() {
|
||||||
|
let sender_id = SenderID::new();
|
||||||
|
let reg = Register::new(sender_id.clone(), regmsg.clone());
|
||||||
|
assert_eq!(reg.doc_name(), &NameType::None);
|
||||||
|
assert_eq!(reg.get_sender_id(), &sender_id);
|
||||||
|
match reg.get_msg() {
|
||||||
|
RegMsg::DocumentNameID(data) => assert_eq!(data, &name_id),
|
||||||
|
RegMsg::RemoveSender(data) => assert_eq!(data, &sender_data_id),
|
||||||
|
_ => unreachable!("should have been one of the inputs"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Path {
|
||||||
|
pub msg_id: Include<MessageID>,
|
||||||
|
pub doc: Include<NameType>,
|
||||||
|
pub action: Include<Action>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Path {
|
||||||
|
pub fn new(id: Include<MessageID>, doc: Include<NameType>, action: Include<Action>) -> Self {
|
||||||
|
Self {
|
||||||
|
msg_id: id,
|
||||||
|
doc: doc,
|
||||||
|
action: action,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn for_message<NT>(name: NT, action: &MsgAction) -> Self
|
||||||
|
where
|
||||||
|
NT: Into<NameType>,
|
||||||
|
{
|
||||||
|
Self {
|
||||||
|
msg_id: Include::Just(MessageID::new()),
|
||||||
|
doc: Include::Just(name.into()),
|
||||||
|
action: Include::Just(action.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod paths {
|
||||||
|
use super::*;
|
||||||
|
use crate::{
|
||||||
|
action::{Records, Show},
|
||||||
|
name::{Name, Names},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_create_for_message() {
|
||||||
|
let input = [
|
||||||
|
(
|
||||||
|
Name::english("one"),
|
||||||
|
MsgAction::Show(Show::new(Name::english("one"))),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
Name::english("two"),
|
||||||
|
MsgAction::Records(Records::new(vec![Name::english("two")], Names::new())),
|
||||||
|
),
|
||||||
|
];
|
||||||
|
for item in input.iter() {
|
||||||
|
let path = Path::for_message(item.0.clone(), &item.1);
|
||||||
|
match path.doc {
|
||||||
|
Include::Just(name) => assert_eq!(name, item.0.clone().into()),
|
||||||
|
_ => unreachable!("should have returned document name"),
|
||||||
|
}
|
||||||
|
match path.action {
|
||||||
|
Include::Just(action) => assert_eq!(action, item.1.clone().into()),
|
||||||
|
_ => unreachable!("should have returned action type"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn message_ids_are_unique_for_message_paths() {
|
||||||
|
let count = 10;
|
||||||
|
let mut ids: Vec<MessageID> = Vec::new();
|
||||||
|
for _ in 0..count {
|
||||||
|
let path =
|
||||||
|
Path::for_message(NameType::None, &MsgAction::Show(Show::new(NameType::None)));
|
||||||
|
let id = match path.msg_id {
|
||||||
|
Include::Just(data) => data.clone(),
|
||||||
|
Include::All => unreachable!("should have been a message id"),
|
||||||
|
};
|
||||||
|
assert!(!ids.contains(&id), "{:?} is duplicated in {:?}", id, ids);
|
||||||
|
ids.push(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub struct Route {
|
||||||
|
pub action: Include<Action>,
|
||||||
|
pub doc_id: Include<NameID>,
|
||||||
|
pub msg_id: Include<MessageID>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Route {
|
||||||
|
pub fn new(msg_id: Include<MessageID>, doc: Include<NameID>, action: Include<Action>) -> Self {
|
||||||
|
Self {
|
||||||
|
action: action,
|
||||||
|
doc_id: doc,
|
||||||
|
msg_id: msg_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Route {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
action: Include::All,
|
||||||
|
doc_id: Include::All,
|
||||||
|
msg_id: Include::All,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<RouteID> for Route {
|
||||||
|
fn from(value: RouteID) -> Self {
|
||||||
|
Self::from(&value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&RouteID> for Route {
|
||||||
|
fn from(value: &RouteID) -> Self {
|
||||||
|
Self {
|
||||||
|
action: match &value.action {
|
||||||
|
Some(data) => Include::Just(data.clone()),
|
||||||
|
None => Include::All,
|
||||||
|
},
|
||||||
|
doc_id: match &value.doc_id {
|
||||||
|
Some(doc) => Include::Just(doc.clone()),
|
||||||
|
None => Include::All,
|
||||||
|
},
|
||||||
|
msg_id: match &value.msg_id {
|
||||||
|
Some(msg) => Include::Just(msg.clone()),
|
||||||
|
None => Include::All,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||||
|
pub struct RouteID {
|
||||||
|
action: Option<Action>,
|
||||||
|
doc_id: Option<NameID>,
|
||||||
|
msg_id: Option<MessageID>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Route> for RouteID {
|
||||||
|
fn from(value: Route) -> Self {
|
||||||
|
Self {
|
||||||
|
action: match value.action {
|
||||||
|
Include::All => None,
|
||||||
|
Include::Just(action) => Some(action.clone()),
|
||||||
|
},
|
||||||
|
doc_id: match value.doc_id {
|
||||||
|
Include::All => None,
|
||||||
|
Include::Just(doc) => Some(doc.clone()),
|
||||||
|
},
|
||||||
|
msg_id: match value.msg_id {
|
||||||
|
Include::All => None,
|
||||||
|
Include::Just(id) => Some(id.clone()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct RouteStorage {
|
||||||
|
data: HashMap<RouteID, HashSet<SenderID>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RouteStorage {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
data: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add(&mut self, route: Route, sender_id: SenderID) -> RouteID {
|
||||||
|
let route_id: RouteID = route.into();
|
||||||
|
let set = match self.data.get_mut(&route_id) {
|
||||||
|
Some(result) => result,
|
||||||
|
None => {
|
||||||
|
let holder = HashSet::new();
|
||||||
|
self.data.insert(route_id.clone(), holder);
|
||||||
|
self.data.get_mut(&route_id).unwrap()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
set.insert(sender_id);
|
||||||
|
route_id
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_sender_id(&mut self, sender_id: &SenderID) {
|
||||||
|
let mut removal: Vec<RouteID> = Vec::new();
|
||||||
|
for (route_id, set) in self.data.iter_mut() {
|
||||||
|
set.remove(sender_id);
|
||||||
|
if set.is_empty() {
|
||||||
|
removal.push(route_id.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for route_id in removal.iter() {
|
||||||
|
self.data.remove(route_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_route(&mut self, route: Route, sender: SenderID) {
|
||||||
|
let route_id: RouteID = route.into();
|
||||||
|
let mut remove = false;
|
||||||
|
match self.data.get_mut(&route_id) {
|
||||||
|
Some(store) => {
|
||||||
|
store.remove(&sender);
|
||||||
|
if store.len() == 0 {
|
||||||
|
remove = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {}
|
||||||
|
}
|
||||||
|
if remove {
|
||||||
|
self.data.remove(&route_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self, route: Route) -> HashSet<SenderID> {
|
||||||
|
let mut output = HashSet::new();
|
||||||
|
for (route_id, set) in self.data.iter() {
|
||||||
|
if route == route_id.into() {
|
||||||
|
output = output.union(set).cloned().collect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
output
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod route_storeage {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_add_routes() {
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let id1 = SenderID::new();
|
||||||
|
let id2 = SenderID::new();
|
||||||
|
let route1 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
let route2 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
let route_id1 = routes.add(route1.clone(), id1.clone());
|
||||||
|
let route_id2 = routes.add(route2.clone(), id2.clone());
|
||||||
|
let result1 = routes.get(route1.clone());
|
||||||
|
assert_eq!(result1.len(), 1);
|
||||||
|
assert!(
|
||||||
|
result1.contains(&id1),
|
||||||
|
"{:?} not found in {:?}",
|
||||||
|
id1,
|
||||||
|
result1
|
||||||
|
);
|
||||||
|
assert_eq!(route_id1, route1.into());
|
||||||
|
let result2 = routes.get(route2.clone());
|
||||||
|
assert_eq!(result2.len(), 1);
|
||||||
|
assert!(
|
||||||
|
result2.contains(&id2),
|
||||||
|
"{:?} not found in {:?}",
|
||||||
|
id2,
|
||||||
|
result2
|
||||||
|
);
|
||||||
|
assert_eq!(route_id2, route2.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn returns_empty_set_when_nothing_is_available() {
|
||||||
|
let routes = RouteStorage::new();
|
||||||
|
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
let result = routes.get(route);
|
||||||
|
assert_eq!(result.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn returns_all_entries_using_the_same_route() {
|
||||||
|
let count = 5;
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let mut ids: HashSet<SenderID> = HashSet::new();
|
||||||
|
while ids.len() < count {
|
||||||
|
ids.insert(SenderID::new());
|
||||||
|
}
|
||||||
|
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
for id in ids.iter() {
|
||||||
|
routes.add(route.clone(), id.clone());
|
||||||
|
}
|
||||||
|
let result = routes.get(route);
|
||||||
|
assert_eq!(result, ids);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn routes_are_not_duplicated() {
|
||||||
|
let count = 5;
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let id = SenderID::new();
|
||||||
|
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
for _ in 0..count {
|
||||||
|
routes.add(route.clone(), id.clone());
|
||||||
|
}
|
||||||
|
let result = routes.get(route);
|
||||||
|
assert_eq!(result.len(), 1);
|
||||||
|
assert!(result.contains(&id), "{:?} not found in {:?}", id, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn overlapping_routes_are_combined() {
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let id1 = SenderID::new();
|
||||||
|
let id2 = SenderID::new();
|
||||||
|
let route1 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
let route2 = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
routes.add(route1.clone(), id1.clone());
|
||||||
|
routes.add(route2.clone(), id2.clone());
|
||||||
|
let retrieve = Route::new(Include::All, Include::All, Include::All);
|
||||||
|
let result = routes.get(retrieve);
|
||||||
|
assert_eq!(result.len(), 2);
|
||||||
|
assert!(result.contains(&id1), "{:?} not found in {:?}", id1, result);
|
||||||
|
assert!(result.contains(&id2), "{:?} not found in {:?}", id2, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_remove_sender_id() {
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let count = 5;
|
||||||
|
let mut ids: HashSet<SenderID> = HashSet::new();
|
||||||
|
while ids.len() < count {
|
||||||
|
ids.insert(SenderID::new());
|
||||||
|
}
|
||||||
|
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
for id in ids.iter() {
|
||||||
|
routes.add(route.clone(), id.clone());
|
||||||
|
}
|
||||||
|
let removed = ids.iter().last().unwrap().clone();
|
||||||
|
ids.remove(&removed);
|
||||||
|
routes.remove_sender_id(&removed);
|
||||||
|
let result = routes.get(route);
|
||||||
|
assert_eq!(result, ids);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn empty_routes_are_release_memory() {
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let id = SenderID::new();
|
||||||
|
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
routes.add(route.clone(), id.clone());
|
||||||
|
routes.remove_sender_id(&id);
|
||||||
|
assert_eq!(routes.data.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_route_be_removed() {
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let id = SenderID::new();
|
||||||
|
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
routes.add(route.clone(), id.clone());
|
||||||
|
routes.remove_route(route.clone(), id);
|
||||||
|
assert_eq!(routes.data.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_shared_route_be_removed() {
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let id1 = SenderID::new();
|
||||||
|
let id2 = SenderID::new();
|
||||||
|
let route = Route::new(Include::Just(MessageID::new()), Include::All, Include::All);
|
||||||
|
routes.add(route.clone(), id1.clone());
|
||||||
|
routes.add(route.clone(), id2.clone());
|
||||||
|
routes.remove_route(route.clone(), id1);
|
||||||
|
assert_eq!(routes.data.len(), 1);
|
||||||
|
let ids = routes.get(route.clone());
|
||||||
|
let id = ids.iter().last().unwrap();
|
||||||
|
assert_eq!(id, &id2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Session {
|
||||||
|
id: Field,
|
||||||
|
expire_time: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Session {
|
||||||
|
const EXPIRE_IN: Duration = Duration::from_hours(1);
|
||||||
|
|
||||||
|
pub fn new<F>(id: F, time: DateTime<Utc>) -> Self
|
||||||
|
where
|
||||||
|
F: Into<Field>,
|
||||||
|
{
|
||||||
|
Self {
|
||||||
|
id: id.into(),
|
||||||
|
expire_time: time + Self::EXPIRE_IN,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn id(&self) -> &Field {
|
||||||
|
&self.id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_id(&mut self, id: Field) -> Self {
|
||||||
|
let mut output = self.clone();
|
||||||
|
output.id = id;
|
||||||
|
output
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extend(&mut self, time: DateTime<Utc>) {
|
||||||
|
self.expire_time = time + Session::EXPIRE_IN;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_expired(&self, time: DateTime<Utc>) -> bool {
|
||||||
|
time > self.expire_time
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Session {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
id: Field::None,
|
||||||
|
expire_time: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod session_entries {
|
||||||
|
use super::*;
|
||||||
|
use chrono::Utc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn is_there_a_default() {
|
||||||
|
let session = Session::default();
|
||||||
|
assert_eq!(session.id(), &Field::None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn does_entry_return_id() {
|
||||||
|
let id: Field = Uuid::new_v4().into();
|
||||||
|
let time = Utc::now();
|
||||||
|
let entry = Session::new(id.clone(), time.clone());
|
||||||
|
assert_eq!(entry.id, id);
|
||||||
|
assert_eq!(entry.expire_time, time + Session::EXPIRE_IN);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_determine_if_entry_expired() {
|
||||||
|
let id: Field = Uuid::new_v4().into();
|
||||||
|
let time = Utc::now();
|
||||||
|
let entry = Session::new(id.clone(), time.clone());
|
||||||
|
assert!(
|
||||||
|
!entry.is_expired(time + Session::EXPIRE_IN),
|
||||||
|
"should not be expired"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
entry.is_expired(time + Session::EXPIRE_IN + Duration::from_secs(1)),
|
||||||
|
"should be expired"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_expiration_be_reset() {
|
||||||
|
let id: Field = Uuid::new_v4().into();
|
||||||
|
let mut entry = Session::new(id.clone(), Utc::now());
|
||||||
|
let time = Utc::now();
|
||||||
|
entry.extend(time.clone());
|
||||||
|
assert_eq!(entry.expire_time, time + Session::EXPIRE_IN);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SessionStorage {
|
||||||
|
entries: HashMap<Field, Session>,
|
||||||
|
queue: Queue,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionStorage {
|
||||||
|
fn new(queue: Queue) -> Self {
|
||||||
|
Self {
|
||||||
|
entries: HashMap::new(),
|
||||||
|
queue: queue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&mut self, id: &Field) -> Session {
|
||||||
|
let converted = match id {
|
||||||
|
Field::Uuid(data) => Field::Uuid(data.clone()),
|
||||||
|
Field::StaticString(data) => match Uuid::try_from(data.clone()) {
|
||||||
|
Ok(id) => Field::Uuid(id.clone()),
|
||||||
|
Err(_) => Field::None,
|
||||||
|
},
|
||||||
|
_ => Field::None,
|
||||||
|
};
|
||||||
|
match self.entries.get_mut(&converted) {
|
||||||
|
Some(data) => {
|
||||||
|
data.extend(self.queue.now());
|
||||||
|
data.clone()
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
let mut new_id: Field = Uuid::new_v4().into();
|
||||||
|
while self.entries.contains_key(&new_id) {
|
||||||
|
new_id = Uuid::new_v4().into();
|
||||||
|
}
|
||||||
|
let output = Session::new(new_id.clone(), self.queue.now());
|
||||||
|
self.entries.insert(new_id, output.clone());
|
||||||
|
output
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn expire(&mut self) {
|
||||||
|
let mut remove: Vec<Field> = Vec::new();
|
||||||
|
let time = self.queue.now();
|
||||||
|
for (id, session) in self.entries.iter() {
|
||||||
|
if session.is_expired(time) {
|
||||||
|
remove.push(id.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for id in remove.iter() {
|
||||||
|
self.entries.remove(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod session_storage {
|
||||||
|
use super::*;
|
||||||
|
use crate::{queue::TestClock, FieldType};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn are_session_ids_unique() {
|
||||||
|
let count = 10;
|
||||||
|
let mut ids: Vec<Field> = Vec::new();
|
||||||
|
let clock = TestClock::new();
|
||||||
|
let queue = Queue::with_clock(clock.clone());
|
||||||
|
let mut sess = SessionStorage::new(queue.clone());
|
||||||
|
let expire_time = queue.now() + Session::EXPIRE_IN;
|
||||||
|
for _ in 0..count {
|
||||||
|
let result = sess.get(&Field::None);
|
||||||
|
assert_eq!(result.expire_time, expire_time);
|
||||||
|
let id = result.id().clone();
|
||||||
|
let id_type: FieldType = (&id).into();
|
||||||
|
assert_eq!(id_type, FieldType::Uuid);
|
||||||
|
assert!(!ids.contains(&id), "{:?} is not unique in {:?}", id, ids);
|
||||||
|
ids.push(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn are_valid_uuids_returned() {
|
||||||
|
let clock = TestClock::new();
|
||||||
|
let queue = Queue::with_clock(clock.clone());
|
||||||
|
let mut sess = SessionStorage::new(queue.clone());
|
||||||
|
let data = sess.get(&Field::None);
|
||||||
|
clock.advance(Duration::from_secs(5));
|
||||||
|
let time = queue.now();
|
||||||
|
let entry = sess.get(data.id());
|
||||||
|
assert_eq!(entry.id(), data.id());
|
||||||
|
assert_eq!(entry.expire_time, time + Session::EXPIRE_IN);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn do_bad_ids_generate_new_ids() {
|
||||||
|
let mut sess = SessionStorage::new(Queue::new());
|
||||||
|
let data: Field = Uuid::nil().into();
|
||||||
|
let result = sess.get(&data);
|
||||||
|
assert_ne!(result.id(), &data);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_string_ids_be_accepted() {
|
||||||
|
let mut sess = SessionStorage::new(Queue::new());
|
||||||
|
let id = sess.get(&Field::None).id().clone();
|
||||||
|
let text: Field = match id {
|
||||||
|
Field::Uuid(id) => id.to_string().into(),
|
||||||
|
_ => unreachable!("entry id should always return a uuid"),
|
||||||
|
};
|
||||||
|
let result = sess.get(&text);
|
||||||
|
assert_eq!(result.id(), &id);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn does_mismatched_string_produce_new_id() {
|
||||||
|
let mut sess = SessionStorage::new(Queue::new());
|
||||||
|
let input = Uuid::nil();
|
||||||
|
let id: Field = input.to_string().into();
|
||||||
|
let test_data: Field = input.into();
|
||||||
|
let result = sess.get(&id);
|
||||||
|
assert_ne!(result.id(), &test_data);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn does_bad_string_produce_id() {
|
||||||
|
let mut sess = SessionStorage::new(Queue::new());
|
||||||
|
let id: Field = "not a uuid".into();
|
||||||
|
let result = sess.get(&id);
|
||||||
|
match result.id() {
|
||||||
|
Field::Uuid(_) => {}
|
||||||
|
_ => panic!("session id should always return a uuid field"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn do_other_fields_return_uuid() {
|
||||||
|
let mut sess = SessionStorage::new(Queue::new());
|
||||||
|
let id: Field = 2.into();
|
||||||
|
let result = sess.get(&id);
|
||||||
|
match result.id() {
|
||||||
|
Field::Uuid(_) => {}
|
||||||
|
_ => panic!("session id should always return a uuid field"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn are_expired_sessions_removed() {
|
||||||
|
let clock = TestClock::new();
|
||||||
|
let queue = Queue::with_clock(clock.clone());
|
||||||
|
let mut sess = SessionStorage::new(queue.clone());
|
||||||
|
let data = sess.get(&Field::None);
|
||||||
|
assert_eq!(sess.entries.len(), 1, "should have one entry");
|
||||||
|
clock.advance(Session::EXPIRE_IN);
|
||||||
|
sess.expire();
|
||||||
|
assert_eq!(
|
||||||
|
sess.entries.len(),
|
||||||
|
1,
|
||||||
|
"entry should not have expired: expire: {:?}, time: {:?}",
|
||||||
|
data.expire_time,
|
||||||
|
queue.now()
|
||||||
|
);
|
||||||
|
clock.advance(Duration::from_nanos(1));
|
||||||
|
sess.expire();
|
||||||
|
assert_eq!(
|
||||||
|
sess.entries.len(),
|
||||||
|
0,
|
||||||
|
"entry should have expired: expire: {:?}, time: {:?}",
|
||||||
|
data.expire_time,
|
||||||
|
queue.now()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DocRegistry {
|
||||||
|
doc_names: Names,
|
||||||
|
queue: Queue,
|
||||||
|
receiver: Receiver<Message>,
|
||||||
|
routes: RouteStorage,
|
||||||
|
sessions: SessionStorage,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DocRegistry {
|
||||||
|
fn new(queue: Queue, rx: Receiver<Message>) -> Self {
|
||||||
|
let mut doc_names = Names::new();
|
||||||
|
doc_names.add_names(Clock::doc_names());
|
||||||
|
Self {
|
||||||
|
doc_names: doc_names,
|
||||||
|
queue: queue.clone(),
|
||||||
|
receiver: rx,
|
||||||
|
routes: RouteStorage::new(),
|
||||||
|
sessions: SessionStorage::new(queue),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(queue: Queue, rx: Receiver<Message>) {
|
||||||
|
let mut doc_names = DocRegistry::new(queue, rx);
|
||||||
|
spawn(move || {
|
||||||
|
doc_names.listen();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn listen(&mut self) {
|
||||||
|
loop {
|
||||||
|
let mut msg = self.receiver.recv().unwrap();
|
||||||
|
match msg.get_action() {
|
||||||
|
MsgAction::Register(data) => {
|
||||||
|
let id = data.get_sender_id();
|
||||||
|
let reply = msg.set_action(self.register_action(data));
|
||||||
|
self.queue.forward(id, reply);
|
||||||
|
}
|
||||||
|
_ => match self.path_to_route(&msg.get_path()) {
|
||||||
|
Ok(route) => {
|
||||||
|
let session = self.sessions.get(msg.session_id());
|
||||||
|
msg.override_session(session.clone());
|
||||||
|
msg.set_route(route.clone());
|
||||||
|
for sender_id in self.routes.get(route).iter() {
|
||||||
|
self.queue.forward(sender_id, msg.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => self.queue.send(msg.set_action(MsgAction::Error(err))),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn path_to_route(&self, path: &Path) -> Result<Route, MTTError> {
|
||||||
|
let doc_id = match &path.doc {
|
||||||
|
Include::Just(name) => match self.doc_names.get_id(name) {
|
||||||
|
Ok(id) => Include::Just(id),
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
},
|
||||||
|
Include::All => Include::All,
|
||||||
|
};
|
||||||
|
Ok(Route::new(path.msg_id.clone(), doc_id, path.action.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_action(&mut self, reg: &Register) -> Register {
|
||||||
|
match reg.get_msg() {
|
||||||
|
RegMsg::AddDocName(names) => match self.doc_names.add_names(names.clone()) {
|
||||||
|
Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())),
|
||||||
|
Err(err) => reg.response(RegMsg::Error(err)),
|
||||||
|
},
|
||||||
|
RegMsg::AddRoute(path) => {
|
||||||
|
let response = match self.path_to_route(path) {
|
||||||
|
Ok(route) => {
|
||||||
|
let id = self.routes.add(route, reg.get_sender_id().clone());
|
||||||
|
RegMsg::RouteID(id)
|
||||||
|
}
|
||||||
|
Err(err) => RegMsg::Error(err),
|
||||||
|
};
|
||||||
|
reg.response(response)
|
||||||
|
}
|
||||||
|
RegMsg::GetNameID(name) => match self.doc_names.get_id(name) {
|
||||||
|
Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())),
|
||||||
|
Err(err) => reg.response(RegMsg::Error(err)),
|
||||||
|
},
|
||||||
|
RegMsg::RemoveSender(sender_id) => {
|
||||||
|
self.routes.remove_sender_id(sender_id);
|
||||||
|
reg.response(RegMsg::Ok)
|
||||||
|
}
|
||||||
|
RegMsg::RemoveRoute(route) => {
|
||||||
|
self.routes
|
||||||
|
.remove_route(route.clone(), reg.get_sender_id().clone());
|
||||||
|
reg.response(RegMsg::Ok)
|
||||||
|
}
|
||||||
|
_ => reg.response(RegMsg::Ok),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,8 +2,8 @@ mod support;
|
|||||||
|
|
||||||
use isolang::Language;
|
use isolang::Language;
|
||||||
use morethantext::{
|
use morethantext::{
|
||||||
Action, Addition, DocDef, ErrorID, FieldType, Include, MTTError, MoreThanText, Name, Path,
|
Action, Addition, DocDef, ErrorID, Field, FieldType, Include, MTTError, MoreThanText, Name,
|
||||||
Query, TestMoreThanText,
|
Path, Query, TestMoreThanText,
|
||||||
};
|
};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use support::setup_range;
|
use support::setup_range;
|
||||||
@@ -18,13 +18,14 @@ fn lang_name() -> Name {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "fix after session move complete"]
|
||||||
fn are_session_ids_unique() {
|
fn are_session_ids_unique() {
|
||||||
let count = 10;
|
let count = 10;
|
||||||
let mtt = MoreThanText::new();
|
let mtt = MoreThanText::new();
|
||||||
let mut ids: HashSet<String> = HashSet::new();
|
let mut ids: HashSet<Field> = HashSet::new();
|
||||||
for _ in 0..count {
|
for _ in 0..count {
|
||||||
let client = mtt.client();
|
let client = mtt.client();
|
||||||
ids.insert(client.session_id());
|
ids.insert(client.session_id().clone());
|
||||||
}
|
}
|
||||||
assert_eq!(ids.len(), count, "ids = {:?}", ids);
|
assert_eq!(ids.len(), count, "ids = {:?}", ids);
|
||||||
}
|
}
|
||||||
@@ -33,29 +34,34 @@ fn are_session_ids_unique() {
|
|||||||
fn can_existing_sessions_be_used() {
|
fn can_existing_sessions_be_used() {
|
||||||
let mtt = MoreThanText::new();
|
let mtt = MoreThanText::new();
|
||||||
let client1 = mtt.client();
|
let client1 = mtt.client();
|
||||||
let id = client1.session_id();
|
let id = client1.session_id().clone();
|
||||||
drop(client1);
|
drop(client1);
|
||||||
let client2 = mtt.client_with_session(id.clone(), None);
|
let client2 = mtt.client_with_session(id.clone(), None);
|
||||||
assert_eq!(client2.session_id(), id);
|
assert_eq!(client2.session_id(), &id);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "fix after session move complete"]
|
||||||
fn does_expired_session_ids_return_new() {
|
fn does_expired_session_ids_return_new() {
|
||||||
let id = Uuid::new_v4().to_string();
|
let id = Uuid::new_v4();
|
||||||
|
let expected: Field = id.into();
|
||||||
let mtt = MoreThanText::new();
|
let mtt = MoreThanText::new();
|
||||||
let client = mtt.client_with_session(id.clone(), None);
|
let client = mtt.client_with_session(id.clone(), None);
|
||||||
assert_ne!(client.session_id(), id);
|
assert_ne!(client.session_id(), &expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "may no longer be valid"]
|
||||||
fn does_bad_id_string_get_new() {
|
fn does_bad_id_string_get_new() {
|
||||||
let id = "Not uuid".to_string();
|
let id = "Not uuid".to_string();
|
||||||
|
let not_expected: Field = id.clone().into();
|
||||||
let mtt = MoreThanText::new();
|
let mtt = MoreThanText::new();
|
||||||
let client = mtt.client_with_session(id.clone(), None);
|
let client = mtt.client_with_session(id.clone(), None);
|
||||||
assert_ne!(client.session_id(), id);
|
assert_ne!(client.session_id(), ¬_expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "wait for language to be added to new session"]
|
||||||
fn can_new_clients_set_langauge() {
|
fn can_new_clients_set_langauge() {
|
||||||
let lang = Language::from_639_1("fr").unwrap();
|
let lang = Language::from_639_1("fr").unwrap();
|
||||||
let mut test_env = TestMoreThanText::new();
|
let mut test_env = TestMoreThanText::new();
|
||||||
@@ -73,6 +79,7 @@ fn can_new_clients_set_langauge() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "wait till language added to new session"]
|
||||||
fn is_lanaguage_set_for_expired_session() {
|
fn is_lanaguage_set_for_expired_session() {
|
||||||
let lang = Language::from_639_1("fr").unwrap();
|
let lang = Language::from_639_1("fr").unwrap();
|
||||||
let mut test_env = TestMoreThanText::new();
|
let mut test_env = TestMoreThanText::new();
|
||||||
@@ -90,6 +97,7 @@ fn is_lanaguage_set_for_expired_session() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "wait till language added to new session"]
|
||||||
fn is_lanaguage_set_for_bad_session() {
|
fn is_lanaguage_set_for_bad_session() {
|
||||||
let lang = Language::from_639_1("de").unwrap();
|
let lang = Language::from_639_1("de").unwrap();
|
||||||
let mut test_env = TestMoreThanText::new();
|
let mut test_env = TestMoreThanText::new();
|
||||||
@@ -107,13 +115,14 @@ fn is_lanaguage_set_for_bad_session() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "wait till language added to session"]
|
||||||
fn do_existing_sessions_keep_language_unchanged() {
|
fn do_existing_sessions_keep_language_unchanged() {
|
||||||
let lang1 = Language::from_639_1("de").unwrap();
|
let lang1 = Language::from_639_1("de").unwrap();
|
||||||
let lang2 = Language::from_639_1("fr").unwrap();
|
let lang2 = Language::from_639_1("fr").unwrap();
|
||||||
let mut test_env = TestMoreThanText::new();
|
let mut test_env = TestMoreThanText::new();
|
||||||
let mtt = test_env.get_morethantext();
|
let mtt = test_env.get_morethantext();
|
||||||
let client = mtt.client_with_language(lang1);
|
let client = mtt.client_with_language(lang1);
|
||||||
let id = client.session_id();
|
let id = client.session_id().clone();
|
||||||
drop(client);
|
drop(client);
|
||||||
let path = Path::new(
|
let path = Path::new(
|
||||||
Include::All,
|
Include::All,
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ fn lang_name() -> Name {
|
|||||||
Name::english("language")
|
Name::english("language")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
fn get_session(mtt: &mut MoreThanText, id: &Uuid) -> Result<Record, MTTError> {
|
fn get_session(mtt: &mut MoreThanText, id: &Uuid) -> Result<Record, MTTError> {
|
||||||
let client = mtt.client();
|
let client = mtt.client();
|
||||||
let mut qry = Query::new(doc_name());
|
let mut qry = Query::new(doc_name());
|
||||||
@@ -234,3 +235,4 @@ fn does_not_change_language() {
|
|||||||
let rec = result.iter().last().unwrap();
|
let rec = result.iter().last().unwrap();
|
||||||
assert_eq!(rec.get(&lang_name).unwrap(), jlang.into());
|
assert_eq!(rec.get(&lang_name).unwrap(), jlang.into());
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|||||||
@@ -129,6 +129,7 @@ fn can_trigger_update_specific_record() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore = "finish updating clock"]
|
||||||
fn can_a_trigger_from_another_document_be_used() {
|
fn can_a_trigger_from_another_document_be_used() {
|
||||||
let count = 3;
|
let count = 3;
|
||||||
let selected = 1; // must be greater than or equal to 0 and less than count
|
let selected = 1; // must be greater than or equal to 0 and less than count
|
||||||
|
|||||||
Reference in New Issue
Block a user