Moved data registry to separate module.
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1m25s
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 1m25s
This commit is contained in:
parent
9e88f84166
commit
8b95fbd458
@ -1,6 +1,13 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
message::{Action, MsgAction},
|
message::{Action, Message, MsgAction},
|
||||||
name::NameType,
|
mtterror::MTTError,
|
||||||
|
name::{Name, NameType, Names},
|
||||||
|
router::Queue,
|
||||||
|
};
|
||||||
|
use std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
sync::mpsc::Receiver,
|
||||||
|
thread::spawn,
|
||||||
};
|
};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
@ -39,6 +46,48 @@ mod includes {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub enum RegMsg {
|
||||||
|
AddRoute(Path),
|
||||||
|
AddDocName(Vec<Name>),
|
||||||
|
DocumentNameID(Uuid),
|
||||||
|
Error(MTTError),
|
||||||
|
GetNameID(Name),
|
||||||
|
Ok,
|
||||||
|
RemoveSender(Uuid),
|
||||||
|
RouteID(RouteID),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Register {
|
||||||
|
msg: RegMsg,
|
||||||
|
sender_id: Uuid,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Register {
|
||||||
|
pub fn new(sender_id: Uuid, reg_msg: RegMsg) -> Self {
|
||||||
|
Self {
|
||||||
|
msg: reg_msg,
|
||||||
|
sender_id: sender_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_msg(&self) -> &RegMsg {
|
||||||
|
&self.msg
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_sender_id(&self) -> &Uuid {
|
||||||
|
&self.sender_id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn response(&self, reg_msg: RegMsg) -> Self {
|
||||||
|
Self {
|
||||||
|
msg: reg_msg,
|
||||||
|
sender_id: self.sender_id.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Path {
|
pub struct Path {
|
||||||
pub msg_id: Include<Uuid>,
|
pub msg_id: Include<Uuid>,
|
||||||
@ -112,3 +161,333 @@ mod paths {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
|
pub struct Route {
|
||||||
|
pub action: Include<Action>,
|
||||||
|
pub doc_id: Include<Uuid>,
|
||||||
|
pub msg_id: Include<Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Route {
|
||||||
|
pub fn new(msg_id: Include<Uuid>, doc: Include<Uuid>, action: Include<Action>) -> Self {
|
||||||
|
Self {
|
||||||
|
action: action,
|
||||||
|
doc_id: doc,
|
||||||
|
msg_id: msg_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Route {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
action: Include::All,
|
||||||
|
doc_id: Include::All,
|
||||||
|
msg_id: Include::All,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<RouteID> for Route {
|
||||||
|
fn from(value: RouteID) -> Self {
|
||||||
|
Self::from(&value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&RouteID> for Route {
|
||||||
|
fn from(value: &RouteID) -> Self {
|
||||||
|
Self {
|
||||||
|
action: match &value.action {
|
||||||
|
Some(data) => Include::Just(data.clone()),
|
||||||
|
None => Include::All,
|
||||||
|
},
|
||||||
|
doc_id: match &value.doc_id {
|
||||||
|
Some(doc) => Include::Just(doc.clone()),
|
||||||
|
None => Include::All,
|
||||||
|
},
|
||||||
|
msg_id: match &value.msg_id {
|
||||||
|
Some(msg) => Include::Just(msg.clone()),
|
||||||
|
None => Include::All,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||||
|
pub struct RouteID {
|
||||||
|
action: Option<Action>,
|
||||||
|
doc_id: Option<Uuid>,
|
||||||
|
msg_id: Option<Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Route> for RouteID {
|
||||||
|
fn from(value: Route) -> Self {
|
||||||
|
Self {
|
||||||
|
action: match value.action {
|
||||||
|
Include::All => None,
|
||||||
|
Include::Just(action) => Some(action.clone()),
|
||||||
|
},
|
||||||
|
doc_id: match value.doc_id {
|
||||||
|
Include::All => None,
|
||||||
|
Include::Just(doc) => Some(doc.clone()),
|
||||||
|
},
|
||||||
|
msg_id: match value.msg_id {
|
||||||
|
Include::All => None,
|
||||||
|
Include::Just(id) => Some(id.clone()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct RouteStorage {
|
||||||
|
data: HashMap<RouteID, HashSet<Uuid>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RouteStorage {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
data: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add(&mut self, route: Route, sender_id: Uuid) -> RouteID {
|
||||||
|
let route_id: RouteID = route.into();
|
||||||
|
let set = match self.data.get_mut(&route_id) {
|
||||||
|
Some(result) => result,
|
||||||
|
None => {
|
||||||
|
let holder = HashSet::new();
|
||||||
|
self.data.insert(route_id.clone(), holder);
|
||||||
|
self.data.get_mut(&route_id).unwrap()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
set.insert(sender_id);
|
||||||
|
route_id
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_sender_id(&mut self, sender_id: &Uuid) {
|
||||||
|
let mut removal: Vec<RouteID> = Vec::new();
|
||||||
|
for (route_id, set) in self.data.iter_mut() {
|
||||||
|
set.remove(sender_id);
|
||||||
|
if set.is_empty() {
|
||||||
|
removal.push(route_id.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for route_id in removal.iter() {
|
||||||
|
self.data.remove(route_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self, route: Route) -> HashSet<Uuid> {
|
||||||
|
let mut output = HashSet::new();
|
||||||
|
for (route_id, set) in self.data.iter() {
|
||||||
|
if route == route_id.into() {
|
||||||
|
output = output.union(set).cloned().collect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
output
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod route_storeage {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_add_routes() {
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let id1 = Uuid::new_v4();
|
||||||
|
let id2 = Uuid::new_v4();
|
||||||
|
let route1 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
|
||||||
|
let route2 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
|
||||||
|
let route_id1 = routes.add(route1.clone(), id1.clone());
|
||||||
|
let route_id2 = routes.add(route2.clone(), id2.clone());
|
||||||
|
let result1 = routes.get(route1.clone());
|
||||||
|
assert_eq!(result1.len(), 1);
|
||||||
|
assert!(
|
||||||
|
result1.contains(&id1),
|
||||||
|
"{:?} not found in {:?}",
|
||||||
|
id1,
|
||||||
|
result1
|
||||||
|
);
|
||||||
|
assert_eq!(route_id1, route1.into());
|
||||||
|
let result2 = routes.get(route2.clone());
|
||||||
|
assert_eq!(result2.len(), 1);
|
||||||
|
assert!(
|
||||||
|
result2.contains(&id2),
|
||||||
|
"{:?} not found in {:?}",
|
||||||
|
id2,
|
||||||
|
result2
|
||||||
|
);
|
||||||
|
assert_eq!(route_id2, route2.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn returns_empty_set_when_nothing_is_available() {
|
||||||
|
let routes = RouteStorage::new();
|
||||||
|
let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
|
||||||
|
let result = routes.get(route);
|
||||||
|
assert_eq!(result.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn returns_all_entries_using_the_same_route() {
|
||||||
|
let count = 5;
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let mut ids: HashSet<Uuid> = HashSet::new();
|
||||||
|
while ids.len() < count {
|
||||||
|
ids.insert(Uuid::new_v4());
|
||||||
|
}
|
||||||
|
let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
|
||||||
|
for id in ids.iter() {
|
||||||
|
routes.add(route.clone(), id.clone());
|
||||||
|
}
|
||||||
|
let result = routes.get(route);
|
||||||
|
assert_eq!(result, ids);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn routes_are_not_duplicated() {
|
||||||
|
let count = 5;
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
|
||||||
|
for _ in 0..count {
|
||||||
|
routes.add(route.clone(), id.clone());
|
||||||
|
}
|
||||||
|
let result = routes.get(route);
|
||||||
|
assert_eq!(result.len(), 1);
|
||||||
|
assert!(result.contains(&id), "{:?} not found in {:?}", id, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn overlapping_routes_are_combined() {
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let id1 = Uuid::new_v4();
|
||||||
|
let id2 = Uuid::new_v4();
|
||||||
|
let route1 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
|
||||||
|
let route2 = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
|
||||||
|
routes.add(route1.clone(), id1.clone());
|
||||||
|
routes.add(route2.clone(), id2.clone());
|
||||||
|
let retrieve = Route::new(Include::All, Include::All, Include::All);
|
||||||
|
let result = routes.get(retrieve);
|
||||||
|
assert_eq!(result.len(), 2);
|
||||||
|
assert!(result.contains(&id1), "{:?} not found in {:?}", id1, result);
|
||||||
|
assert!(result.contains(&id2), "{:?} not found in {:?}", id2, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_remove_sender_id() {
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let count = 5;
|
||||||
|
let mut ids: HashSet<Uuid> = HashSet::new();
|
||||||
|
while ids.len() < count {
|
||||||
|
ids.insert(Uuid::new_v4());
|
||||||
|
}
|
||||||
|
let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
|
||||||
|
for id in ids.iter() {
|
||||||
|
routes.add(route.clone(), id.clone());
|
||||||
|
}
|
||||||
|
let removed = ids.iter().last().unwrap().clone();
|
||||||
|
ids.remove(&removed);
|
||||||
|
routes.remove_sender_id(&removed);
|
||||||
|
let result = routes.get(route);
|
||||||
|
assert_eq!(result, ids);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn empty_routes_are_release_memory() {
|
||||||
|
let mut routes = RouteStorage::new();
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
let route = Route::new(Include::Just(Uuid::new_v4()), Include::All, Include::All);
|
||||||
|
routes.add(route.clone(), id.clone());
|
||||||
|
routes.remove_sender_id(&id);
|
||||||
|
assert_eq!(routes.data.len(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DocRegistry {
|
||||||
|
doc_names: Names,
|
||||||
|
queue: Queue,
|
||||||
|
receiver: Receiver<Message>,
|
||||||
|
routes: RouteStorage,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DocRegistry {
|
||||||
|
fn new(queue: Queue, rx: Receiver<Message>) -> Self {
|
||||||
|
Self {
|
||||||
|
doc_names: Names::new(),
|
||||||
|
queue: queue,
|
||||||
|
receiver: rx,
|
||||||
|
routes: RouteStorage::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(queue: Queue, rx: Receiver<Message>) {
|
||||||
|
let mut doc_names = DocRegistry::new(queue, rx);
|
||||||
|
spawn(move || {
|
||||||
|
doc_names.listen();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn listen(&mut self) {
|
||||||
|
loop {
|
||||||
|
let mut msg = self.receiver.recv().unwrap();
|
||||||
|
match msg.get_action() {
|
||||||
|
MsgAction::Register(data) => {
|
||||||
|
let id = data.get_sender_id();
|
||||||
|
let reply = msg.response(self.register_action(data));
|
||||||
|
self.queue.forward(id, reply);
|
||||||
|
}
|
||||||
|
_ => match self.path_to_route(&msg.get_path()) {
|
||||||
|
Ok(route) => {
|
||||||
|
msg.set_route(route.clone());
|
||||||
|
for sender_id in self.routes.get(route).iter() {
|
||||||
|
self.queue.forward(sender_id, msg.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => self
|
||||||
|
.queue
|
||||||
|
.send(msg.response(MsgAction::Error(err)))
|
||||||
|
.unwrap(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn path_to_route(&self, path: &Path) -> Result<Route, MTTError> {
|
||||||
|
let doc_id = match &path.doc {
|
||||||
|
Include::Just(name) => match self.doc_names.get_id(name) {
|
||||||
|
Ok(id) => Include::Just(id),
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
},
|
||||||
|
Include::All => Include::All,
|
||||||
|
};
|
||||||
|
Ok(Route::new(path.msg_id.clone(), doc_id, path.action.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_action(&mut self, reg: &Register) -> Register {
|
||||||
|
match reg.get_msg() {
|
||||||
|
RegMsg::AddDocName(names) => match self.doc_names.add_names(names.clone()) {
|
||||||
|
Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())),
|
||||||
|
Err(err) => reg.response(RegMsg::Error(err)),
|
||||||
|
},
|
||||||
|
RegMsg::AddRoute(path) => {
|
||||||
|
// let route = self.doc_names.path_to_route(path).unwrap();
|
||||||
|
let route = self.path_to_route(path).unwrap();
|
||||||
|
reg.response(RegMsg::RouteID(
|
||||||
|
self.routes.add(route, reg.get_sender_id().clone()),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
RegMsg::GetNameID(name) => match self.doc_names.get_id(name) {
|
||||||
|
Ok(id) => reg.response(RegMsg::DocumentNameID(id.clone())),
|
||||||
|
Err(err) => reg.response(RegMsg::Error(err)),
|
||||||
|
},
|
||||||
|
RegMsg::RemoveSender(sender_id) => {
|
||||||
|
self.routes.remove_sender_id(sender_id);
|
||||||
|
reg.response(RegMsg::Ok)
|
||||||
|
}
|
||||||
|
_ => reg.response(RegMsg::Ok),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -4,10 +4,10 @@ mod mtterror;
|
|||||||
mod name;
|
mod name;
|
||||||
mod router;
|
mod router;
|
||||||
|
|
||||||
use data_director::{Include, Path};
|
use data_director::{Include, Path, RegMsg, Register};
|
||||||
use message::{
|
use message::{
|
||||||
Action, Addition, CalcValue, Calculation, Clock, CreateDoc, Field, FieldType, Message, Operand,
|
Action, Addition, CalcValue, Calculation, Clock, CreateDoc, Field, FieldType, Message, Operand,
|
||||||
RegMsg, Register, Session,
|
Session,
|
||||||
};
|
};
|
||||||
pub use message::{MsgAction, Query};
|
pub use message::{MsgAction, Query};
|
||||||
use mtterror::MTTError;
|
use mtterror::MTTError;
|
||||||
|
|||||||
1467
src/message.rs
1467
src/message.rs
File diff suppressed because it is too large
Load Diff
30
src/name.rs
30
src/name.rs
@ -1,8 +1,4 @@
|
|||||||
use crate::{
|
use crate::mtterror::MTTError;
|
||||||
data_director::{Include, Path},
|
|
||||||
message::Route,
|
|
||||||
mtterror::MTTError,
|
|
||||||
};
|
|
||||||
use isolang::Language;
|
use isolang::Language;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@ -142,30 +138,6 @@ impl Names {
|
|||||||
NameType::None => Ok(Uuid::nil()),
|
NameType::None => Ok(Uuid::nil()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn path_to_route(&self, path: &Path) -> Result<Route, MTTError> {
|
|
||||||
let doc_id = match &path.doc {
|
|
||||||
Include::Just(id_info) => match id_info {
|
|
||||||
NameType::ID(id) => {
|
|
||||||
if self.ids.contains_key(&id) {
|
|
||||||
Include::Just(id.clone())
|
|
||||||
} else {
|
|
||||||
return Err(MTTError::NameInvalidID(id.clone()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
NameType::Name(name) => {
|
|
||||||
let id = match self.get_id(name) {
|
|
||||||
Ok(data) => data,
|
|
||||||
Err(err) => return Err(err),
|
|
||||||
};
|
|
||||||
Include::Just(id.clone())
|
|
||||||
}
|
|
||||||
NameType::None => Include::Just(Uuid::nil()),
|
|
||||||
},
|
|
||||||
Include::All => Include::All,
|
|
||||||
};
|
|
||||||
Ok(Route::new(path.msg_id.clone(), doc_id, path.action.clone()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
message::{DocRegistry, Message, RegMsg, Register},
|
data_director::{DocRegistry, RegMsg, Register},
|
||||||
|
message::Message,
|
||||||
mtterror::MTTError,
|
mtterror::MTTError,
|
||||||
name::NameType,
|
name::NameType,
|
||||||
};
|
};
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user