From 56ca5ae767a9ff26e2027cfc13ce157d7bbcc689 Mon Sep 17 00:00:00 2001 From: Adrian Malacoda Date: Thu, 22 Feb 2018 02:04:09 -0600 Subject: [PATCH] move to a model of one thread per dispatcher, instead of one main thread --- src/lib.rs | 76 +++++++++++++++++++++++++++++------------------------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f623477..e358932 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ use std::sync::mpsc; use std::sync::mpsc::Sender; use transformable_channels::mpsc::TransformableSender; +use transformable_channels::mpsc::Receiver; use multimap::MultiMap; mod helpers; @@ -71,18 +72,17 @@ impl Tenquestionmarks { pub fn run (&self) { crossbeam::scope(|scope| { - // Our main event channel. - // Modules push events to tenquestionmarks using this channel. - let (ref main_sender, ref main_receiver) = transformable_channels::mpsc::channel(); + let mut dispatchers: BTreeMap<&str, Receiver> = BTreeMap::new(); - // Module threads. - // Each Module will produce events which tenquestionmarks will push - // into all other Modules. - // tenquestionmarks propagates all events to each Module through these - // channels. + // Event loop threads. + // Event loop threads consume events passed in by dispatcher threads, and + // generate events through dispatcher threads. let module_senders: BTreeMap<&str, Sender>> = self.modules.iter().map(|(key, module)| { let from = key.clone(); - let main_sender_mapped = main_sender.map(move |envelope: Envelope| { + let (dispatcher_sender, dispatcher_receiver) = transformable_channels::mpsc::channel(); + dispatchers.insert(key, dispatcher_receiver); + + let mapped_sender = dispatcher_sender.map(move |envelope: Envelope| { Envelope { from: Some(from.clone()), event: envelope.event @@ -90,43 +90,49 @@ impl Tenquestionmarks { }); let (module_sender, module_receiver) = mpsc::channel(); - info!("Spawning thread for \"{}\"", key); + info!("Spawning event loop thread for \"{}\"", key); scope.spawn(move || { - module.run(Box::new(main_sender_mapped), module_receiver); - info!("Thread for \"{}\" is exiting", key); + module.run(Box::new(mapped_sender), module_receiver); + info!("Event loop thread for \"{}\" is exiting", key); }); (&key[..], module_sender) }).collect(); - // tenquestionmarks main event loop. - // tenquestionmarks receives events produced by Modules and pushes them - // into all other Modules - loop { - match main_receiver.recv() { - Ok(envelope) => { - /* - * Check if the target module is a valid destination for the envelope. - * We want to deliver this envelope if all of the following are true: - * 1. The target module is not the originator of the envelope - * 2. The envelope's list of allowed recipients is empty, or specifically - * names the target module. - */ - let arc_envelope = Arc::new(envelope); - if let Some(ref from) = arc_envelope.from { - if let Some(subscriptions) = self.subscriptions.get_vec(&**from) { - for subscription in subscriptions { - if subscription.can_handle_event(&arc_envelope) { - if let Some(sender) = module_senders.get(&*subscription.name) { - if let Err(err) = sender.send(arc_envelope.clone()) { - debug!("Failed to dispatch event to module \"{}\": {:?}", subscription.name, err); + // Dispatcher threads. + // Dispatcher threads transmit events produced by parent modules to child modules. + for (from, receiver) in dispatchers.into_iter() { + if let Some(subscriptions) = self.subscriptions.get_vec(from) { + let senders: BTreeMap<&str, Sender>> = module_senders.iter().filter(|&(key, _)| { + subscriptions.iter().any(|subscription| subscription.name == *key) + }).map(|(key, value)| { + (*key, value.clone()) + }).collect(); + + info!("Spawning dispatcher thread for \"{}\"", from); + scope.spawn(move || { + loop { + match receiver.recv() { + Ok(envelope) => { + let arc_envelope = Arc::new(envelope); + for subscription in subscriptions { + if subscription.can_handle_event(&arc_envelope) { + if let Some(sender) = senders.get(&*subscription.name) { + if let Err(err) = sender.send(arc_envelope.clone()) { + debug!("Failed to dispatch event to module \"{}\": {:?}", subscription.name, err); + } } } } + }, + Err(err) => { + error!("Failed to receive event from module: \"{}\": {:?}", from, err); + break; } } } - }, - Err(err) => { error!("Failed to receive event in main event loop: {:?}", err); } + + info!("Dispatcher thread for \"{}\" is exiting", from); + }); } } });