move to a model of one thread per dispatcher, instead of one main thread
This commit is contained in:
parent
fcc86a671e
commit
56ca5ae767
76
src/lib.rs
76
src/lib.rs
@ -28,6 +28,7 @@ use std::sync::mpsc;
|
||||
use std::sync::mpsc::Sender;
|
||||
|
||||
use transformable_channels::mpsc::TransformableSender;
|
||||
use transformable_channels::mpsc::Receiver;
|
||||
use multimap::MultiMap;
|
||||
|
||||
mod helpers;
|
||||
@ -71,18 +72,17 @@ impl Tenquestionmarks {
|
||||
|
||||
pub fn run (&self) {
|
||||
crossbeam::scope(|scope| {
|
||||
// Our main event channel.
|
||||
// Modules push events to tenquestionmarks using this channel.
|
||||
let (ref main_sender, ref main_receiver) = transformable_channels::mpsc::channel();
|
||||
let mut dispatchers: BTreeMap<&str, Receiver<Envelope>> = BTreeMap::new();
|
||||
|
||||
// Module threads.
|
||||
// Each Module will produce events which tenquestionmarks will push
|
||||
// into all other Modules.
|
||||
// tenquestionmarks propagates all events to each Module through these
|
||||
// channels.
|
||||
// Event loop threads.
|
||||
// Event loop threads consume events passed in by dispatcher threads, and
|
||||
// generate events through dispatcher threads.
|
||||
let module_senders: BTreeMap<&str, Sender<Arc<Envelope>>> = self.modules.iter().map(|(key, module)| {
|
||||
let from = key.clone();
|
||||
let main_sender_mapped = main_sender.map(move |envelope: Envelope| {
|
||||
let (dispatcher_sender, dispatcher_receiver) = transformable_channels::mpsc::channel();
|
||||
dispatchers.insert(key, dispatcher_receiver);
|
||||
|
||||
let mapped_sender = dispatcher_sender.map(move |envelope: Envelope| {
|
||||
Envelope {
|
||||
from: Some(from.clone()),
|
||||
event: envelope.event
|
||||
@ -90,43 +90,49 @@ impl Tenquestionmarks {
|
||||
});
|
||||
|
||||
let (module_sender, module_receiver) = mpsc::channel();
|
||||
info!("Spawning thread for \"{}\"", key);
|
||||
info!("Spawning event loop thread for \"{}\"", key);
|
||||
scope.spawn(move || {
|
||||
module.run(Box::new(main_sender_mapped), module_receiver);
|
||||
info!("Thread for \"{}\" is exiting", key);
|
||||
module.run(Box::new(mapped_sender), module_receiver);
|
||||
info!("Event loop thread for \"{}\" is exiting", key);
|
||||
});
|
||||
(&key[..], module_sender)
|
||||
}).collect();
|
||||
|
||||
// tenquestionmarks main event loop.
|
||||
// tenquestionmarks receives events produced by Modules and pushes them
|
||||
// into all other Modules
|
||||
loop {
|
||||
match main_receiver.recv() {
|
||||
Ok(envelope) => {
|
||||
/*
|
||||
* Check if the target module is a valid destination for the envelope.
|
||||
* We want to deliver this envelope if all of the following are true:
|
||||
* 1. The target module is not the originator of the envelope
|
||||
* 2. The envelope's list of allowed recipients is empty, or specifically
|
||||
* names the target module.
|
||||
*/
|
||||
let arc_envelope = Arc::new(envelope);
|
||||
if let Some(ref from) = arc_envelope.from {
|
||||
if let Some(subscriptions) = self.subscriptions.get_vec(&**from) {
|
||||
for subscription in subscriptions {
|
||||
if subscription.can_handle_event(&arc_envelope) {
|
||||
if let Some(sender) = module_senders.get(&*subscription.name) {
|
||||
if let Err(err) = sender.send(arc_envelope.clone()) {
|
||||
debug!("Failed to dispatch event to module \"{}\": {:?}", subscription.name, err);
|
||||
// Dispatcher threads.
|
||||
// Dispatcher threads transmit events produced by parent modules to child modules.
|
||||
for (from, receiver) in dispatchers.into_iter() {
|
||||
if let Some(subscriptions) = self.subscriptions.get_vec(from) {
|
||||
let senders: BTreeMap<&str, Sender<Arc<Envelope>>> = module_senders.iter().filter(|&(key, _)| {
|
||||
subscriptions.iter().any(|subscription| subscription.name == *key)
|
||||
}).map(|(key, value)| {
|
||||
(*key, value.clone())
|
||||
}).collect();
|
||||
|
||||
info!("Spawning dispatcher thread for \"{}\"", from);
|
||||
scope.spawn(move || {
|
||||
loop {
|
||||
match receiver.recv() {
|
||||
Ok(envelope) => {
|
||||
let arc_envelope = Arc::new(envelope);
|
||||
for subscription in subscriptions {
|
||||
if subscription.can_handle_event(&arc_envelope) {
|
||||
if let Some(sender) = senders.get(&*subscription.name) {
|
||||
if let Err(err) = sender.send(arc_envelope.clone()) {
|
||||
debug!("Failed to dispatch event to module \"{}\": {:?}", subscription.name, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
error!("Failed to receive event from module: \"{}\": {:?}", from, err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(err) => { error!("Failed to receive event in main event loop: {:?}", err); }
|
||||
|
||||
info!("Dispatcher thread for \"{}\" is exiting", from);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
Loading…
x
Reference in New Issue
Block a user