Contining to correct queue.
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 2s

This commit is contained in:
Jeff Baskin 2025-09-25 09:41:17 -04:00
parent 1326fad904
commit 7ba0740ce5

View File

@ -1,4 +1,5 @@
use chrono::prelude::*;
use isolang::Language;
use std::{
collections::{HashMap, HashSet},
sync::{
@ -33,8 +34,10 @@ enum MTTError {
enum Action {
Addition,
Create,
Delete,
Error,
Query,
Register,
Reply,
Show,
Update,
@ -45,8 +48,10 @@ impl From<MsgAction> for Action {
match value {
MsgAction::Addition(_) => Action::Addition,
MsgAction::Create(_) => Action::Create,
MsgAction::Delete(_) => Action::Delete,
MsgAction::Error(_) => Action::Error,
MsgAction::Query(_) => Action::Query,
MsgAction::Register(_) => Action::Register,
MsgAction::Reply(_) => Action::Reply,
MsgAction::Show => Action::Show,
MsgAction::Update(_) => Action::Update,
@ -65,6 +70,16 @@ impl From<&MsgAction> for Action {
enum NameID {
ID(Uuid),
Name(String),
None,
}
impl NameID {
fn is_none(&self) -> bool {
match self {
Self::None => true,
_ => false
}
}
}
impl From<&str> for NameID {
@ -99,9 +114,10 @@ enum MsgAction {
// Remove
Error(MTTError),
Query(Query),
Register(Register),
Reply(Reply),
Show,
// Delete
Delete(Delete),
Update(Update),
}
@ -111,6 +127,12 @@ impl From<Addition> for MsgAction {
}
}
impl From<Delete> for MsgAction {
fn from(value: Delete) -> Self {
MsgAction::Delete(value)
}
}
impl From<DocDef> for MsgAction {
fn from(value: DocDef) -> Self {
MsgAction::Create(value)
@ -129,6 +151,12 @@ impl From<Query> for MsgAction {
}
}
impl From<Register> for MsgAction {
fn from(value: Register) -> Self {
MsgAction::Register(value)
}
}
impl From<Reply> for MsgAction {
fn from(value: Reply) -> Self {
MsgAction::Reply(value)
@ -253,8 +281,8 @@ mod messages {
for document in dts.into_iter() {
let msg = Message::new(document, MsgAction::Create(DocDef::new()));
match msg.get_document_id() {
NameID::ID(_) => unreachable!("should have been a string id"),
NameID::Name(data) => assert_eq!(data, document),
_ => unreachable!("should have been a string id"),
}
match msg.get_action() {
MsgAction::Create(_) => {}
@ -269,8 +297,8 @@ mod messages {
for document in dts.into_iter() {
let msg = Message::new(document.clone(), MsgAction::Query(Query::new()));
match msg.get_document_id() {
NameID::ID(_) => unreachable!("should have been a string id"),
NameID::Name(data) => assert_eq!(data, &document),
_ => unreachable!("should have been a string id"),
}
match msg.get_action() {
MsgAction::Query(_) => {}
@ -285,7 +313,7 @@ mod messages {
let msg = Message::new(document.clone(), MsgAction::Query(Query::new()));
match msg.get_document_id() {
NameID::ID(data) => assert_eq!(data, &document),
NameID::Name(_) => unreachable!("should have been an id"),
_ => unreachable!("should have been an id"),
}
match msg.get_action() {
MsgAction::Query(_) => {}
@ -430,6 +458,64 @@ impl From<Route> for RouteID {
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct Name {
name: String,
lang: Language,
}
impl Name {
fn english(name: String) -> Self {
Self {
name: name,
lang: Language::from_639_1("en").unwrap(),
}
}
}
impl ToString for Name {
fn to_string(&self) -> String {
self.name.clone()
}
}
#[derive(Clone, Debug)]
enum RegMsg {
DocName(Name),
Error(MTTError),
Ok,
}
#[derive(Clone, Debug)]
struct Register {
msg: RegMsg,
sender_id: Uuid,
}
impl Register {
fn new(sender_id: Uuid, reg_msg: RegMsg) -> Self {
Self {
msg: reg_msg,
sender_id: sender_id,
}
}
fn get_msg(&self) -> &RegMsg {
&self.msg
}
fn get_sender_id(&self) -> &Uuid {
&self.sender_id
}
fn response(&self, reg_msg: RegMsg) -> Self {
Self {
msg: reg_msg,
sender_id: self.sender_id.clone(),
}
}
}
#[derive(Clone, Debug, PartialEq)]
struct Route {
action: Include<Action>,
@ -606,6 +692,7 @@ impl QueueData {
None => return Err(MTTError::DocumentNotFound(name.clone())),
},
NameID::ID(id) => id.clone(),
NameID::None => unreachable!("should never be none"),
};
if self.senders.contains_key(&sender_id) {
Ok(sender_id)
@ -877,18 +964,179 @@ mod queuedatas {
}
}
struct DocRegistry {
doc_names: Vec<Name>,
queue: Queue,
receiver: Receiver<Message>,
}
impl DocRegistry {
fn new(queue: Queue, rx: Receiver<Message>) -> Self {
Self {
doc_names: Vec::new(),
queue: queue,
receiver: rx,
}
}
fn start(queue: Queue, rx: Receiver<Message>) {
let mut doc_names = DocRegistry::new(queue, rx);
spawn(move || {
doc_names.listen();
});
}
fn listen(&mut self) {
loop {
let msg = self.receiver.recv().unwrap();
match msg.get_action() {
MsgAction::Register(data) => {
let id = data.get_sender_id();
let reply = msg.response(self.register_action(data));
self.queue.forward(id, reply);
},
_ => {},
}
}
}
fn register_action(&mut self, reg: &Register) -> Register {
match reg.get_msg() {
RegMsg::DocName(name) => {
if self.doc_names.contains(name) {
reg.response(RegMsg::Error(MTTError::DocumentAlreadyExists(name.to_string())))
} else {
self.doc_names.push(name.clone());
reg.response(RegMsg::Ok)
}
},
_ => reg.response(RegMsg::Ok),
}
}
}
struct Router {
doc_registry: Sender<Message>,
senders: HashMap<Uuid, Sender<Message>>,
}
impl Router {
fn new(tx: Sender<Message>) -> Self {
Self {
doc_registry: tx,
senders: HashMap::new(),
}
}
fn add_sender(&mut self, sender: Sender<Message>) -> Uuid {
let mut id = Uuid::new_v4();
while self.senders.contains_key(&id) {
id = Uuid::new_v4();
}
self.senders.insert(id.clone(), sender);
id
}
fn forward(&self, id: &Uuid, msg: Message) {
self.senders.get(id).unwrap().send(msg).unwrap();
}
fn send(&self, msg: Message) {
self.doc_registry.send(msg).unwrap();
}
}
#[cfg(test)]
mod routers {
use super::{support_test::TIMEOUT, *};
#[test]
fn can_pass_message() {
let (tx, rx) = channel();
let router = Router::new(tx);
let msg = Message::new("task", Query::new());
router.send(msg.clone());
let result = rx.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
}
#[test]
fn can_forward_message() {
let (tx, _) = channel();
let mut router = Router::new(tx);
let (sender, receiver) = channel();
let id = router.add_sender(sender);
let msg = Message::new("wiki", Query::new());
router.forward(&id, msg.clone());
let result = receiver.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
}
#[test]
fn sender_ids_are_unique() {
let (tx, _) = channel();
let mut router = Router::new(tx);
let count = 10;
let mut holder: HashSet<Uuid> = HashSet::new();
for _ in 0..count {
let (tx, _) = channel();
holder.insert(router.add_sender(tx));
}
assert_eq!(holder.len(), count, "had duplicate keys");
}
}
#[derive(Clone)]
struct Queue {
router: Arc<RwLock<Router>>,
//
//
//
queue_data: Arc<RwLock<QueueData>>,
}
impl Queue {
fn new() -> Self {
Self {
let (tx, rx) = channel();
let output = Self {
router: Arc::new(RwLock::new(Router::new(tx))),
//
//
//
queue_data: Arc::new(RwLock::new(QueueData::new())),
};
DocRegistry::start(output.clone(), rx);
output
}
fn add_sender(&mut self, sender: Sender<Message>) -> Uuid {
let mut router = self.router.write().unwrap();
router.add_sender(sender)
}
fn forward(&self, id: &Uuid, msg: Message) {
let router = self.router.read().unwrap();
router.forward(id, msg);
}
fn send(&self, msg: Message) -> Result<(), MTTError> {
let router = self.router.read().unwrap();
router.send(msg.clone());
//
//
//
if msg.get_document_id().is_none() {
Ok(())
} else {
let queuedata = self.queue_data.read().unwrap();
queuedata.send(msg)
}
}
//
//
//
fn register(
&mut self,
tx: Sender<Message>,
@ -898,20 +1146,141 @@ impl Queue {
let mut queuedata = self.queue_data.write().unwrap();
queuedata.register(tx, name, routes)
}
fn send(&self, msg: Message) -> Result<(), MTTError> {
let queuedata = self.queue_data.read().unwrap();
queuedata.send(msg)
}
}
#[cfg(test)]
mod queues {
use super::*;
use super::{support_test::TIMEOUT, *};
use std::sync::mpsc::RecvTimeoutError;
struct TestQueue {
sender_id: Uuid,
queue: Queue,
receiver: Receiver<Message>,
}
impl TestQueue {
fn new() -> Self {
let mut queue = Queue::new();
let (tx, rx) = channel();
let id = queue.add_sender(tx);
Self {
sender_id: id,
queue: queue,
receiver: rx,
}
}
fn get_preset_id(&self) -> &Uuid {
&self.sender_id
}
fn get_preset_rx(&self) -> &Receiver<Message> {
&self.receiver
}
fn add_sender(&mut self, sender: Sender<Message>) -> Uuid {
self.queue.add_sender(sender)
}
fn forward(&self, id: &Uuid, msg: Message) {
self.queue.forward(id, msg);
}
fn send(&self, msg: Message) -> Result<(), MTTError> {
self.queue.send(msg)
}
}
#[test]
fn create_a_queue() {
Queue::new();
fn can_forward_message() {
let mut queue = TestQueue::new();
let msg = Message::new("wiki", Query::new());
queue.forward(queue.get_preset_id(), msg.clone());
let result = queue.get_preset_rx().recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
}
#[test]
fn sender_ids_are_unique() {
let mut queue = Queue::new();
let count = 10;
let mut holder: HashSet<Uuid> = HashSet::new();
for _ in 0..count {
let (tx, _) = channel();
holder.insert(queue.add_sender(tx));
}
assert_eq!(holder.len(), count, "had duplicate keys");
}
#[test]
fn can_register_document_name() {
let mut queue = TestQueue::new();
let doc_name = Name::english(Uuid::new_v4().to_string());
let reg_msg = Register::new(queue.get_preset_id().clone(), RegMsg::DocName(doc_name.clone()));
let msg = Message::new(NameID::None, reg_msg);
queue.send(msg.clone()).unwrap();
let result = queue.get_preset_rx().recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
let action = result.get_action();
match action {
MsgAction::Register(data) => match data.get_msg() {
RegMsg::Ok => {},
_ => unreachable!("got {:?}, should have been register ok", action),
},
_ => unreachable!("got {:?}, should have been register ok", action),
}
}
#[test]
fn errors_on_duplicate_names() {
let mut queue = TestQueue::new();
//let mut queue = Queue::new();
//let (sender, receiver) = channel();
//let id = queue.add_sender(sender);
let receiver = queue.get_preset_rx();
let doc_name = Name::english(Uuid::new_v4().to_string());
let reg_msg = Register::new(queue.get_preset_id().clone(), RegMsg::DocName(doc_name.clone()));
let msg = Message::new(NameID::None, reg_msg.clone());
queue.send(msg.clone()).unwrap();
receiver.recv_timeout(TIMEOUT).unwrap();
let msg2 = Message::new(NameID::None, reg_msg.clone());
queue.send(msg.clone()).unwrap();
let result = receiver.recv_timeout(TIMEOUT).unwrap();
assert_eq!(result.get_message_id(), msg.get_message_id());
let action = result.get_action();
match action {
MsgAction::Register(data) => match data.get_msg() {
RegMsg::Error(err) => match err {
MTTError::DocumentAlreadyExists(name) => assert_eq!(name.to_string(), doc_name.to_string()),
_ => unreachable!("got {:?}, should have been duplicate error", err),
},
_ => unreachable!("got {:?}, should have been error", data),
},
_ => unreachable!("got {:?}, should have been register ok", action),
}
}
#[test]
fn can_register_routes() {
}
#[test]
#[ignore]
fn default_send_does_nothing() {
let mut queue = Queue::new();
let (sender, receiver) = channel();
let id = queue.add_sender(sender);
let msg = Message::new("wiki", Query::new());
queue.send(msg).unwrap();
match receiver.recv_timeout(TIMEOUT) {
Ok(msg) => unreachable!("should not receive: {:?}", msg),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should have timed out"),
},
}
}
}
@ -2058,16 +2427,12 @@ impl Operation {
#[derive(Clone, Debug)]
struct Query {
data: HashMap<String, Calculation>,
specifiers: Vec<Operation>,
}
impl Query {
fn new() -> Self {
Self {
data: HashMap::new(),
specifiers: Vec::new(),
}
}
@ -2327,6 +2692,27 @@ mod documents {
}
}
#[derive(Clone, Debug)]
struct Delete {
query: Query,
}
impl Delete {
fn new() -> Self {
Self {
query: Query::new(),
}
}
fn get_query(&self) -> &Query {
&self.query
}
fn get_query_mut(&mut self) -> &mut Query {
&mut self.query
}
}
#[derive(Clone, Debug)]
struct Update {
query: Query,
@ -2666,6 +3052,7 @@ impl DocumentFile {
let name = match msg.get_document_id() {
NameID::Name(name) => name.clone(),
NameID::ID(id) => id.to_string(),
NameID::None => unreachable!("should never be none"),
};
let routes = [
RouteRequest::new(
@ -2673,6 +3060,11 @@ impl DocumentFile {
Include::Some(name.clone()),
Include::Some(Action::Addition),
),
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
Include::Some(Action::Delete),
),
RouteRequest::new(
Include::All,
Include::Some(name.clone()),
@ -2716,6 +3108,7 @@ impl DocumentFile {
let msg = self.rx.recv().unwrap();
let result = match msg.get_action() {
MsgAction::Addition(data) => self.add_document(data),
MsgAction::Delete(delete) => self.delete(delete),
MsgAction::Query(query) => self.query(query),
MsgAction::Update(update) => self.update(update),
_ => Reply::new().into(),
@ -2793,6 +3186,16 @@ impl DocumentFile {
reply.into()
}
fn delete(&mut self, delete: &Delete) -> MsgAction {
let mut reply = Reply::new();
let oids = self.run_query(delete.get_query()).unwrap();
for oid in oids.iter() {
reply.add(self.docs.get(oid).unwrap().clone());
self.docs.remove(oid);
}
reply.into()
}
fn run_query(&self, query: &Query) -> Result<HashSet<Oid>, MTTError> {
let query_ids = query.field_ids();
let doc_ids = self.docdef.field_ids();
@ -2964,6 +3367,10 @@ mod document_files {
&self.rx
}
fn get_sender(&self) -> Sender<Message> {
self.tx.clone()
}
fn send<A>(&self, action: A) -> Result<(), MTTError>
where
A: Into<MsgAction>,
@ -4018,6 +4425,69 @@ mod document_files {
_ => unreachable!("got {:?}: should have gotten reply", action),
}
}
#[test]
fn can_delete() {
let mut doc = TestDocument::new([FieldType::Integer].to_vec());
doc.start();
doc.populate([1.into()].to_vec());
let mut calc = Calculation::new(Operand::Equal);
calc.add_value(1);
let mut query = Query::new();
query.add("field0".to_string(), calc);
let mut delete = Delete::new();
*delete.get_query_mut() = query.clone();
doc.send(delete).unwrap();
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(data) => {
assert_eq!(data.len(), 1);
for doc in data.iter() {
match doc.get_field("field0").unwrap() {
Field::Integer(num) => assert_eq!(num, 1),
_ => unreachable!("did not get uuid"),
}
}
}
_ => unreachable!("got {:?}: should have gotten reply", action),
}
doc.send(query).unwrap();
let result = doc.get_receiver().recv_timeout(TIMEOUT).unwrap();
let action = result.get_action();
match action {
MsgAction::Reply(data) => assert_eq!(data.len(), 0),
_ => unreachable!("did not get uuid"),
}
}
#[test]
#[ignore]
fn delete_should_only_respond_to_its_own() {
let mut doc = TestDocument::new([FieldType::Integer].to_vec());
doc.start();
doc.populate([1.into()].to_vec());
let mut calc = Calculation::new(Operand::Equal);
calc.add_value(1);
let mut query = Query::new();
query.add("field0".to_string(), calc);
let mut delete = Delete::new();
*delete.get_query_mut() = query.clone();
doc.send(delete).unwrap();
let name = "other";
let msg = Message::new(name.to_string(), MsgAction::Show);
let (tx, _) = channel();
let mut queue = doc.get_queue();
queue.register(tx, name.to_string(), Vec::new()).unwrap();
queue.send(msg).unwrap();
match doc.get_receiver().recv_timeout(TIMEOUT) {
Ok(msg) => unreachable!("should not receive: {:?}", msg),
Err(err) => match err {
RecvTimeoutError::Timeout => {}
_ => unreachable!("should have timed out"),
},
}
}
}
#[cfg(test)]