split logic for filtering out events into Subscription struct

This commit is contained in:
Adrian Malacoda 2017-05-10 01:13:52 -05:00
parent 7c26e0294a
commit 495e3129d4
2 changed files with 66 additions and 50 deletions

View File

@ -32,17 +32,21 @@ use transformable_channels::mpsc::TransformableSender;
mod helpers;
use std::collections::btree_set::BTreeSet;
#[macro_use]
extern crate log;
pub struct Tenquestionmarks {
modules: BTreeMap<String, Module>
subscriptions: BTreeMap<String, Subscription>
}
impl Tenquestionmarks {
pub fn with_modules (modules: BTreeMap<String, Module>) -> Tenquestionmarks {
let tqm = Tenquestionmarks {
modules: modules
subscriptions: modules.into_iter().map(|(key, module)| {
(key.clone(), Subscription::new(key.clone(), module))
}).collect()
};
tqm
@ -65,7 +69,7 @@ impl Tenquestionmarks {
// into all other Modules.
// tenquestionmarks propagates all events to each Module through these
// channels.
let module_senders: BTreeMap<&str, Sender<Arc<Envelope>>> = self.modules.iter().map(|(key, module)| {
let module_senders: BTreeMap<&str, Sender<Arc<Envelope>>> = self.subscriptions.iter().map(|(key, subscription)| {
let from = key.clone();
let main_sender_mapped = main_sender.map(move |envelope: Envelope| {
Envelope {
@ -79,7 +83,7 @@ impl Tenquestionmarks {
let (module_sender, module_receiver) = mpsc::channel();
info!("Spawning thread for \"{}\"", key);
scope.spawn(move || {
module.run(Box::new(main_sender_mapped), module_receiver);
subscription.module.run(Box::new(main_sender_mapped), module_receiver);
info!("Thread for \"{}\" is exiting", key);
});
(&key[..], module_sender)
@ -101,33 +105,13 @@ impl Tenquestionmarks {
let arc_envelope = Arc::new(envelope);
for (key, sender) in &module_senders {
let from_this = String::from(*key);
match self.modules.get::<String>(&from_this) {
Some(module) => {
if Some(from_this.clone()) == arc_envelope.from {
debug!("Refusing to transmit event to its originator ({:?})", from_this);
continue;
}
else if !(arc_envelope.to.is_empty() || !arc_envelope.to.contains(&from_this)) {
debug!(
"Refusing to transmit envelope from {:?} to {:?} since it is not on the list of allowed recipients ({:?})",
arc_envelope.from,
from_this,
arc_envelope.to
);
continue;
}
else if !module.can_handle_event(&arc_envelope) {
debug!(
"Refusing to transmit envelope from {:?} to {:?} since envelope was filtered out",
arc_envelope.from,
from_this
);
continue;
}
match sender.send(arc_envelope.clone()) {
Err(err) => debug!("Failed to dispatch event to module \"{}\": {:?}", key, err),
Ok(_) => {}
match self.subscriptions.get::<String>(&from_this) {
Some(subscription) => {
if subscription.can_handle_event(&arc_envelope) {
match sender.send(arc_envelope.clone()) {
Err(err) => debug!("Failed to dispatch event to module \"{}\": {:?}", key, err),
Ok(_) => {}
}
}
},
None => debug!("Failed to dispatch event to module \"{}\": No such module found!", key)
@ -186,3 +170,53 @@ impl User {
pub trait MessageSender : Sync + Send + std::fmt::Debug {
fn send_message (&self, _: &str) {}
}
struct Subscription {
pub module: Module,
pub name: String,
pub filters: BTreeSet<String>
}
impl Subscription {
pub fn new (name: String, module: Module) -> Subscription {
let filters: BTreeSet<String> = module.config.get("filters")
.and_then(|value| value.as_slice())
.map(|value| value.to_vec())
.unwrap_or(vec![])
.into_iter()
.map(|value| { String::from(value.as_str().unwrap()) })
.collect();
Subscription {
module: module,
name: name,
filters: filters
}
}
pub fn can_handle_event (&self, envelope: &Envelope) -> bool {
if Some(&self.name) == envelope.from.as_ref() {
debug!("Refusing to transmit event to its originator ({:?})", self.name);
return false;
}
else if !(envelope.to.is_empty() || !envelope.to.contains(&self.name)) {
debug!(
"Refusing to transmit envelope from {:?} to {:?} since it is not on the list of allowed recipients ({:?})",
envelope.from,
self.name,
envelope.to
);
return false;
}
else if !(self.filters.is_empty() || self.filters.intersection(&envelope.tags).count() > 0) {
debug!(
"Refusing to transmit envelope from {:?} to {:?} since envelope was filtered out",
envelope.from,
self.name
);
return false;
}
true
}
}

View File

@ -19,34 +19,16 @@ use transformable_channels::mpsc::ExtSender;
use toml::Table;
use std::collections::btree_set::BTreeSet;
pub struct Module {
event_loop: Box<EventLoop>,
module_type: String,
config: Table
pub config: Table
}
impl Module {
pub fn run (&self, sender: Box<ExtSender<Envelope>>, receiver: Receiver<Arc<Envelope>>) {
self.event_loop.run(sender, receiver);
}
pub fn can_handle_event (&self, envelope: &Envelope) -> bool {
let filters: BTreeSet<String> = self.config.get("filters")
.and_then(|value| value.as_slice())
.map(|value| value.to_vec())
.unwrap_or(vec![])
.into_iter()
.map(|value| { String::from(value.as_str().unwrap()) })
.collect();
if filters.is_empty() {
return true;
}
filters.intersection(&envelope.tags).count() > 0
}
}
pub trait EventLoop : Sync {