From 495e3129d43de22cf6d8625b24b43131413e8048 Mon Sep 17 00:00:00 2001 From: Adrian Malacoda Date: Wed, 10 May 2017 01:13:52 -0500 Subject: [PATCH] split logic for filtering out events into Subscription struct --- src/lib.rs | 96 +++++++++++++++++++++++++++++++--------------- src/modules/mod.rs | 20 +--------- 2 files changed, 66 insertions(+), 50 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 074a544..6aea2af 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,17 +32,21 @@ use transformable_channels::mpsc::TransformableSender; mod helpers; +use std::collections::btree_set::BTreeSet; + #[macro_use] extern crate log; pub struct Tenquestionmarks { - modules: BTreeMap + subscriptions: BTreeMap } impl Tenquestionmarks { pub fn with_modules (modules: BTreeMap) -> Tenquestionmarks { let tqm = Tenquestionmarks { - modules: modules + subscriptions: modules.into_iter().map(|(key, module)| { + (key.clone(), Subscription::new(key.clone(), module)) + }).collect() }; tqm @@ -65,7 +69,7 @@ impl Tenquestionmarks { // into all other Modules. // tenquestionmarks propagates all events to each Module through these // channels. - let module_senders: BTreeMap<&str, Sender>> = self.modules.iter().map(|(key, module)| { + let module_senders: BTreeMap<&str, Sender>> = self.subscriptions.iter().map(|(key, subscription)| { let from = key.clone(); let main_sender_mapped = main_sender.map(move |envelope: Envelope| { Envelope { @@ -79,7 +83,7 @@ impl Tenquestionmarks { let (module_sender, module_receiver) = mpsc::channel(); info!("Spawning thread for \"{}\"", key); scope.spawn(move || { - module.run(Box::new(main_sender_mapped), module_receiver); + subscription.module.run(Box::new(main_sender_mapped), module_receiver); info!("Thread for \"{}\" is exiting", key); }); (&key[..], module_sender) @@ -101,33 +105,13 @@ impl Tenquestionmarks { let arc_envelope = Arc::new(envelope); for (key, sender) in &module_senders { let from_this = String::from(*key); - match self.modules.get::(&from_this) { - Some(module) => { - if Some(from_this.clone()) == arc_envelope.from { - debug!("Refusing to transmit event to its originator ({:?})", from_this); - continue; - } - else if !(arc_envelope.to.is_empty() || !arc_envelope.to.contains(&from_this)) { - debug!( - "Refusing to transmit envelope from {:?} to {:?} since it is not on the list of allowed recipients ({:?})", - arc_envelope.from, - from_this, - arc_envelope.to - ); - continue; - } - else if !module.can_handle_event(&arc_envelope) { - debug!( - "Refusing to transmit envelope from {:?} to {:?} since envelope was filtered out", - arc_envelope.from, - from_this - ); - continue; - } - - match sender.send(arc_envelope.clone()) { - Err(err) => debug!("Failed to dispatch event to module \"{}\": {:?}", key, err), - Ok(_) => {} + match self.subscriptions.get::(&from_this) { + Some(subscription) => { + if subscription.can_handle_event(&arc_envelope) { + match sender.send(arc_envelope.clone()) { + Err(err) => debug!("Failed to dispatch event to module \"{}\": {:?}", key, err), + Ok(_) => {} + } } }, None => debug!("Failed to dispatch event to module \"{}\": No such module found!", key) @@ -186,3 +170,53 @@ impl User { pub trait MessageSender : Sync + Send + std::fmt::Debug { fn send_message (&self, _: &str) {} } + +struct Subscription { + pub module: Module, + pub name: String, + pub filters: BTreeSet +} + +impl Subscription { + pub fn new (name: String, module: Module) -> Subscription { + let filters: BTreeSet = module.config.get("filters") + .and_then(|value| value.as_slice()) + .map(|value| value.to_vec()) + .unwrap_or(vec![]) + .into_iter() + .map(|value| { String::from(value.as_str().unwrap()) }) + .collect(); + + Subscription { + module: module, + name: name, + filters: filters + } + } + + pub fn can_handle_event (&self, envelope: &Envelope) -> bool { + if Some(&self.name) == envelope.from.as_ref() { + debug!("Refusing to transmit event to its originator ({:?})", self.name); + return false; + } + else if !(envelope.to.is_empty() || !envelope.to.contains(&self.name)) { + debug!( + "Refusing to transmit envelope from {:?} to {:?} since it is not on the list of allowed recipients ({:?})", + envelope.from, + self.name, + envelope.to + ); + return false; + } + else if !(self.filters.is_empty() || self.filters.intersection(&envelope.tags).count() > 0) { + debug!( + "Refusing to transmit envelope from {:?} to {:?} since envelope was filtered out", + envelope.from, + self.name + ); + return false; + } + + true + } +} diff --git a/src/modules/mod.rs b/src/modules/mod.rs index 6de6d54..b9bef30 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -19,34 +19,16 @@ use transformable_channels::mpsc::ExtSender; use toml::Table; -use std::collections::btree_set::BTreeSet; - pub struct Module { event_loop: Box, module_type: String, - config: Table + pub config: Table } impl Module { pub fn run (&self, sender: Box>, receiver: Receiver>) { self.event_loop.run(sender, receiver); } - - pub fn can_handle_event (&self, envelope: &Envelope) -> bool { - let filters: BTreeSet = self.config.get("filters") - .and_then(|value| value.as_slice()) - .map(|value| value.to_vec()) - .unwrap_or(vec![]) - .into_iter() - .map(|value| { String::from(value.as_str().unwrap()) }) - .collect(); - - if filters.is_empty() { - return true; - } - - filters.intersection(&envelope.tags).count() > 0 - } } pub trait EventLoop : Sync {