use super::{InternalRecord, InternalRecords, Oid}; use crate::{ action::{Action, CalcValue, Calculation, MsgAction, Query, Records, Reply, Update}, document::{ definition::{DocDef, DocFuncType}, field::Field, }, message::{wrapper::Message, MessageAction}, mtterror::{ErrorID, MTTError}, name::{NameID, NameType}, queue::{ data_director::{Include, Path, RegMsg, Register, RouteID}, router::Queue, }, }; use std::{ collections::{HashMap, HashSet}, sync::mpsc::{channel, Receiver}, thread::spawn, }; use uuid::Uuid; pub struct CreateDoc { queue: Queue, rx: Receiver, } impl CreateDoc { fn new(queue: Queue, rx: Receiver) -> Self { Self { queue: queue, rx: rx, } } pub fn start(mut queue: Queue) { let (tx, rx) = channel(); let routes = [Path::new( Include::All, Include::All, Include::Just(Action::Create), )] .to_vec(); let id = queue.add_sender(tx); for route in routes.iter() { let regmsg = Register::new(id.clone(), RegMsg::AddRoute(route.clone())); queue.send(Message::new(regmsg)); rx.recv().unwrap(); } let doc = CreateDoc::new(queue, rx); spawn(move || { doc.listen(); }); } fn listen(&self) { loop { let msg = self.rx.recv().unwrap(); DocumentFile::start(self.queue.clone(), msg); } } } #[derive(Clone, Debug)] pub enum IndexType { Index, Unique, } impl IndexType { fn create_index(&self) -> Index { match self { Self::Index => Index::new(), Self::Unique => Index::new_unique(), } } } #[derive(Debug)] struct Index { data: HashMap>, unique: bool, } impl Index { fn new() -> Self { Self { data: HashMap::new(), unique: false, } } fn new_unique() -> Self { Self { data: HashMap::new(), unique: true, } } fn internal_add(&mut self, field: &Field, oid: Oid) { let storage = match self.data.get_mut(field) { Some(data) => data, None => { let data = HashSet::new(); self.data.insert(field.clone(), data); self.data.get_mut(field).unwrap() } }; storage.insert(oid); } fn add(&mut self, field: Field, oid: Oid) -> Result<(), MTTError> { let oids = match self.data.get_mut(&field) { Some(data) => data, None => { self.data.insert(field.clone(), HashSet::new()); self.data.get_mut(&field).unwrap() } }; if self.unique && oids.len() > 0 { let err = MTTError::new(ErrorID::IndexEntryAlreadyExists(field.clone())); return Err(err); } else { oids.insert(oid); } Ok(()) } fn pull(&self, calc: &Calculation) -> Result, MTTError> { let mut output = HashSet::new(); for (key, value) in self.data.iter() { match calc.calculate(key) { Field::Boolean(data) => { if data { output = output.union(&value).cloned().collect(); } } _ => { let err = MTTError::new(ErrorID::FieldInvalidType); return Err(err); } } } Ok(output) } fn remove(&mut self, field: &Field, oid: &Oid) { match self.data.get_mut(field) { Some(oids) => { oids.remove(oid); if oids.len() == 0 { self.data.remove(field); } } None => {} }; } fn validate(&self, field: &Field) -> Result<(), MTTError> { if self.unique { match self.data.get(field) { Some(_) => { let err = MTTError::new(ErrorID::IndexEntryAlreadyExists(field.clone())); return Err(err); } None => {} } } Ok(()) } } struct Indexes { data: HashMap, } impl Indexes { fn new(settings: &HashMap) -> Self { let mut output = HashMap::new(); for (key, value) in settings.iter() { output.insert(key.clone(), value.create_index()); } Self { data: output } } fn index_ids(&self) -> HashSet<&NameID> { self.data.keys().collect::>() } fn pull(&self, field_id: &NameID, calc: &Calculation) -> Result, MTTError> { self.data.get(field_id).unwrap().pull(calc) } fn add_to_index( &mut self, field_name: &NameID, field: Field, oid: Oid, ) -> Result<(), MTTError> { let index = match self.data.get_mut(field_name) { Some(data) => data, None => return Ok(()), }; index.add(field, oid) } fn validate(&self, field_name: &NameID, value: &Field) -> Result<(), MTTError> { match self.data.get(field_name) { Some(index) => match index.validate(value) { Ok(_) => {} Err(err) => return Err(err), }, None => {} } Ok(()) } fn iter_mut(&mut self) -> impl Iterator { self.data.iter_mut() } } #[cfg(test)] mod indexes { use super::*; use crate::action::{FieldType, Operand}; fn get_fields(count: usize) -> Vec { let mut output = Vec::new(); while output.len() < count { let field: Field = Uuid::new_v4().into(); if !output.contains(&field) { output.push(field); } } output } fn get_oids(count: usize) -> Vec { let mut output = Vec::new(); while output.len() < count { let oid = Oid::new(); if !output.contains(&oid) { output.push(oid); } } output } #[test] fn add_to_index() { let mut index = Index::new(); let count = 3; let fields = get_fields(count); let oids = get_oids(count); for i in 0..count { index.add(fields[i].clone(), oids[i].clone()).unwrap(); } for i in 0..count { let mut calc = Calculation::new(Operand::Equal); calc.add_value(fields[i].clone()).unwrap(); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); let result = index.pull(&calc).unwrap(); assert_eq!(result.len(), 1); assert_eq!(result.iter().last().unwrap(), &oids[i]); } } #[test] fn index_can_handle_multiple_entries() { let mut index = Index::new(); let count = 3; let fields = get_fields(1); let oids = get_oids(count); for i in 0..count { index.add(fields[0].clone(), oids[i].clone()).unwrap(); } let mut calc = Calculation::new(Operand::Equal); calc.add_value(fields[0].clone()).unwrap(); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); let result = index.pull(&calc).unwrap(); assert_eq!(result.len(), 3); for oid in oids { assert!(result.contains(&oid)); } } #[test] fn can_remove_oid() { let mut index = Index::new(); let count = 3; let pos = 1; let fields = get_fields(1); let oids = get_oids(count); for i in 0..count { index.add(fields[0].clone(), oids[i].clone()).unwrap(); } index.remove(&fields[0], &oids[pos]); let mut calc = Calculation::new(Operand::Equal); calc.add_value(fields[0].clone()).unwrap(); calc.add_value(CalcValue::Existing(FieldType::Uuid)) .unwrap(); let result = index.pull(&calc).unwrap(); assert!(!result.contains(&oids[pos]), "should have removed oid"); } #[test] fn are_empty_indexes_removed() { let mut index = Index::new(); let field: Field = Uuid::new_v4().into(); let oid = Oid::new(); index.add(field.clone(), oid.clone()).unwrap(); index.remove(&field, &oid); assert_eq!(index.data.len(), 0); } #[test] fn do_unique_indexes_error_on_duplicates() { let mut index = Index::new_unique(); let field: Field = "fred".into(); let oids = get_oids(2); index.add(field.clone(), oids[0].clone()).unwrap(); match index.add(field.clone(), oids[0].clone()) { Ok(_) => unreachable!("should have been an error"), Err(err) => { let err_id = err.get_error_ids().back().unwrap(); match err_id { ErrorID::IndexEntryAlreadyExists(data) => assert_eq!(data, &field), _ => unreachable!("got {:?}: should have been duplicate field", err), } } } } #[test] fn index_returns_validate() { let mut index = Index::new(); let field: Field = "stuff".into(); let oid = Oid::new(); index.add(field.clone(), oid).unwrap(); match index.validate(&field) { Ok(_) => {} Err(err) => unreachable!("got {:?}: should have returned without issue", err), } } #[test] fn unique_return_duplicate_error() { let mut index = Index::new_unique(); let field: Field = "fred".into(); let oid = Oid::new(); index.add(field.clone(), oid).unwrap(); match index.validate(&field) { Ok(_) => unreachable!("should have gotten a duplication error"), Err(err) => match err.get_error_ids().back().unwrap() { ErrorID::IndexEntryAlreadyExists(data) => assert_eq!(data, &field), _ => unreachable!("got {:?}: should have been duplicate field", err), }, } } } struct DocumentFile { docdef: DocDef, docs: InternalRecords, indexes: Indexes, name_id: NameID, queue: Queue, routes: HashMap, rx: Receiver, } impl DocumentFile { fn new( queue: Queue, rx: Receiver, docdef: DocDef, routes: HashMap, name_id: NameID, ) -> Self { let indexes = Indexes::new(docdef.get_indexes()); Self { docdef: docdef.clone(), docs: InternalRecords::new(), indexes: indexes, name_id: name_id, queue: queue, routes: routes, rx: rx, } } fn start(mut queue: Queue, msg: Message) { let (tx, rx) = channel(); let action = msg.get_action(); let docdef = match action { MsgAction::Create(data) => data.clone(), _ => unreachable!("got {:?}: should have been a create message", action), }; let names = docdef.get_document_names(); let id = queue.add_sender(tx); let reg_msg = Register::new(id, RegMsg::AddDocName(names.clone())); let rmsg = msg.response(reg_msg.clone()); queue.send(rmsg.clone()); let name_result = rx.recv().unwrap(); let name_id = match name_result.get_action() { MsgAction::Register(data) => match data.get_msg() { RegMsg::DocumentNameID(data) => data, RegMsg::Error(err) => { queue.remove_sender(&id); queue.send(msg.response(err.clone())); return; } _ => unreachable!("should only return a name id or an error"), }, _ => unreachable!("should only return a name id or an error"), }; let mut route_action: HashMap = HashMap::new(); for path_action in docdef.iter_routes() { let request = reg_msg.response(RegMsg::AddRoute(path_action.path())); let add_route = rmsg.response(request); queue.send(add_route); let result = rx.recv().unwrap(); let route_id = match result.get_action() { MsgAction::Register(data) => match data.get_msg() { RegMsg::RouteID(data) => data, RegMsg::Error(err) => { queue.remove_sender(&id); queue.send(msg.response(err.clone())); return; } _ => unreachable!("should only return a route id or an error"), }, _ => unreachable!("should only return a route id or an error"), }; route_action.insert(route_id.clone(), path_action.doc_function()); } let mut doc = DocumentFile::new( queue.clone(), rx, docdef.clone(), route_action, name_id.clone(), ); spawn(move || { doc.listen(); }); let reply = msg.response(MsgAction::DocumentCreated); queue.send(reply.clone()); } fn listen(&mut self) { loop { let msg = self.rx.recv().unwrap(); let route = msg.get_route(); for (route_id, doc_func) in self.routes.clone().iter() { if route == route_id.into() { match doc_func { DocFuncType::Add => self.add_document(&msg), DocFuncType::Delete => self.delete(&msg), DocFuncType::Query => self.query(&msg), DocFuncType::Show => self.queue.send( msg.response(Reply::new(self.docdef.get_document_names()[0].clone())), ), DocFuncType::Update => self.update(&msg), DocFuncType::ExistingQuery(action) => self.existing_query(&msg, action), DocFuncType::Trigger(action) => self.trigger(&msg, action), } } } } } fn validate(&self, field_name: NT, value: &Field) -> Result where NT: Into, { let id_holder = field_name.into(); let field_id = match self.docdef.get_field_id(id_holder.clone()) { Ok(data) => data, Err(err) => return Err(err), }; let output = match self.docdef.validate(id_holder.clone(), value) { Ok(data) => data, Err(err) => return Err(err), }; match self.indexes.validate(&field_id, &output) { Ok(_) => {} Err(err) => return Err(err), } Ok(output) } fn add_document(&mut self, msg: &Message) { let addition = match msg.get_action() { MsgAction::Addition(data) => data, _ => return, }; let mut holder = InternalRecord::new(); let mut field_ids = self.docdef.get_field_ids(); for (name, value) in addition.iter() { let field_id = match self.docdef.get_field_id(name) { Ok(id) => id, Err(mut err) => { err.add_parent(ErrorID::Field(name.clone())); err.add_parent(ErrorID::Document(msg.doc_name().clone())); let reply = msg.response(err); self.queue.send(reply); return; } }; let corrected = match self.validate(field_id.clone(), &value.get(&Field::None)) { Ok(data) => data, Err(mut err) => { err.add_parent(ErrorID::Field(name.clone().into())); err.add_parent(ErrorID::Document(msg.doc_name().clone())); let reply = msg.response(err); self.queue.send(reply); return; } }; holder.insert(field_id.clone(), corrected.clone()); field_ids.remove(&field_id); } for field_id in field_ids.iter() { let corrected = match self.validate(field_id, &Field::None) { Ok(data) => data, Err(mut err) => { err.add_parent(ErrorID::Field(field_id.clone().into())); err.add_parent(ErrorID::Document(msg.doc_name().clone())); let reply = msg.response(err); self.queue.send(reply); return; } }; holder.insert(field_id.clone(), corrected.clone()); } let mut records = Records::new( self.docdef.get_document_names().clone(), self.docdef.get_field_names().clone(), ); if !holder.is_empty() { let mut oid = Oid::new(); while self.docs.contains_key(&oid) { oid = Oid::new(); } for (field_id, oids) in self.indexes.iter_mut() { let value = holder.get(field_id).unwrap(); oids.internal_add(value, oid.clone()); } self.docs.insert(oid.clone(), holder.clone()); records.insert(oid, holder); } self.queue.send(msg.response(records.clone())); self.queue .send(msg.response(MsgAction::OnAddition(records))); } fn delete(&mut self, msg: &Message) { let delete = match msg.get_action() { MsgAction::Delete(data) => data, _ => unreachable!("should always be delete action"), }; let records = match self.run_query(delete.get_query()) { Ok(data) => data, Err(mut err) => { err.add_parent(ErrorID::Document(msg.doc_name().into())); let reply = msg.response(err); self.queue.send(reply); return; } }; for (oid, record) in records.iter() { for (field_id, index) in self.indexes.iter_mut() { index.remove(record.get(field_id).unwrap(), oid); } self.docs.remove(oid); } let rec = Records::with_data( self.docdef.get_document_names().clone(), self.docdef.get_field_names().clone(), records, ); self.queue.send(msg.response(rec.clone())); self.queue.send(msg.response(MsgAction::OnDelete(rec))); } fn run_query(&self, query: &Query) -> Result { let indexed_ids = self.indexes.index_ids(); let mut indexed: HashMap = HashMap::new(); let mut unindexed: HashMap = HashMap::new(); for (field, data) in query.iter() { let id = match self.docdef.get_field_id(field.clone()) { Ok(fid) => { let expected_type = self.docdef.get_field_type(field.clone()).unwrap(); if &data.get_type() != expected_type { let mut err = MTTError::new(ErrorID::FieldTypeExpected(expected_type.clone())); err.add_parent(ErrorID::Field(field.clone())); return Err(err); } fid } Err(mut err) => { err.add_parent(ErrorID::Field(field.clone())); return Err(err); } }; if indexed_ids.contains(&id) { indexed.insert(id, data.clone()); } else { unindexed.insert(id, data.clone()); } } let mut oids: HashSet = self.docs.keys().cloned().collect(); for (field_id, calculation) in indexed.iter() { let holder = match self.indexes.pull(field_id, calculation) { Ok(data) => data, Err(err) => return Err(err), }; oids = oids.intersection(&holder).cloned().collect(); } let mut records = InternalRecords::new(); for oid in oids.iter() { records.insert(oid.clone(), self.docs.get(oid).unwrap().clone()); } let holder = oids.clone(); for (oid, record) in records.iter() { for (field_id, calc) in unindexed.iter() { match calc.calculate(record.get(field_id).unwrap()) { Field::Boolean(data) => { if !data { oids.remove(oid); break; } } _ => { oids.remove(oid); break; } } } } let removals = holder.difference(&oids); for oid in removals { records.remove(oid); } Ok(records) } fn query(&self, msg: &Message) { let query = match msg.get_action() { MsgAction::Query(data) => data, _ => unreachable!("should receive a query"), }; let records = match self.run_query(query) { Ok(data) => data, Err(mut err) => { err.add_parent(ErrorID::Document(msg.doc_name().into())); let reply = msg.response(err); self.queue.send(reply); return; } }; let recs = Records::with_data( self.docdef.get_document_names().clone(), self.docdef.get_field_names().clone(), records, ); self.queue.send(msg.response(recs.clone())); self.queue.send(msg.response(MsgAction::OnQuery(recs))); } fn run_update( &mut self, original: &InternalRecords, update: &Update, msg: &Message, ) -> Result { let mut changes: HashMap = HashMap::new(); for (key, value) in update.get_values().iter() { let field_id = match self.docdef.get_field_id(key) { Ok(id) => { let expected_type = self.docdef.get_field_type(key.clone()).unwrap(); if &value.get_type() != expected_type { let mut err = MTTError::new(ErrorID::FieldTypeExpected(expected_type.clone())); err.add_parent(ErrorID::Field(key.clone())); return Err(err); } id } Err(mut err) => { err.add_parent(ErrorID::Field(key.clone())); return Err(err); } }; changes.insert(field_id, value); } let mut indexes = Indexes::new(self.docdef.get_indexes()); let mut updates = InternalRecords::new(); for (oid, record) in original.iter() { let mut holder = record.clone(); for (field_id, value) in changes.iter() { let field = value.get(holder.get(field_id).unwrap()); let correction = match self.validate(field_id, &field) { Ok(data) => data, Err(err) => return Err(err), }; holder.insert(field_id.clone(), correction.clone()); match indexes.add_to_index(&field_id, correction, oid.clone()) { Ok(_) => {} Err(err) => return Err(err), } } updates.insert(oid.clone(), holder); } for (oid, new_rec) in updates.iter() { let old_rec = original.get(oid).unwrap(); for (field_id, index) in self.indexes.iter_mut() { index.remove(old_rec.get(field_id).unwrap(), oid); index .add(new_rec.get(field_id).unwrap().clone(), oid.clone()) .unwrap(); } self.docs.insert(oid.clone(), new_rec.clone()); } let recs = Records::with_data( self.docdef.get_document_names().clone(), self.docdef.get_field_names().clone(), updates, ); self.queue .send(msg.response(MsgAction::OnUpdate(recs.clone()))); Ok(recs) } fn update(&mut self, msg: &Message) { let update = match msg.get_action() { MsgAction::Update(data) => data, _ => unreachable!("should receive a update"), }; let original = match self.run_query(update.get_query()) { Ok(result) => result, Err(mut err) => { err.add_parent(ErrorID::Document(msg.doc_name().into())); let reply = msg.response(err); self.queue.send(reply); return; } }; let data = match self.run_update(&original, update, msg) { Ok(output) => output, Err(mut err) => { err.add_parent(ErrorID::Document(msg.doc_name().into())); let reply = msg.response(err); self.queue.send(reply); return; } }; self.queue.send(msg.response(data)); } fn existing_query(&mut self, msg: &Message, action: &MsgAction) { let recs = match msg.get_action() { MsgAction::OnQuery(data) => data, _ => unreachable!("should only receive on messages"), }; match action { MsgAction::Update(change) => self .run_update(recs.get_internal_records(), change, msg) .unwrap(), _ => panic!("should not get here"), }; } fn trigger(&self, msg: &Message, action: &MsgAction) { self.queue .send(msg.forward(self.name_id.clone(), action.clone())); } }