256 lines
8.8 KiB
Rust
256 lines
8.8 KiB
Rust
extern crate toml;
|
|
extern crate crossbeam;
|
|
extern crate discord;
|
|
extern crate rand;
|
|
extern crate pvn;
|
|
extern crate echobox;
|
|
extern crate transformable_channels;
|
|
extern crate stc;
|
|
extern crate regex;
|
|
extern crate multimap;
|
|
extern crate irc;
|
|
extern crate thread_local;
|
|
|
|
#[macro_use]
|
|
extern crate hlua;
|
|
|
|
use std::collections::BTreeMap;
|
|
use toml::value::Table;
|
|
|
|
mod modules;
|
|
use modules::Module;
|
|
use modules::loader::{ModuleLoader, ModuleLoaderError};
|
|
|
|
mod event;
|
|
use event::Event;
|
|
use event::Envelope;
|
|
use event::filter::{EventFilter, AttributeEventFilter};
|
|
|
|
use std::sync::Arc;
|
|
use std::sync::mpsc;
|
|
use std::sync::mpsc::Sender;
|
|
|
|
use transformable_channels::mpsc::TransformableSender;
|
|
use transformable_channels::mpsc::Receiver;
|
|
use multimap::MultiMap;
|
|
|
|
mod helpers;
|
|
|
|
#[macro_use]
|
|
extern crate log;
|
|
|
|
pub struct Tenquestionmarks {
|
|
modules: BTreeMap<String, Module>,
|
|
subscriptions: MultiMap<String, Subscription>
|
|
}
|
|
|
|
impl Tenquestionmarks {
|
|
pub fn with_modules (modules: BTreeMap<String, Module>) -> Tenquestionmarks {
|
|
let mut subscriptions = MultiMap::new();
|
|
for (name, module) in modules.iter() {
|
|
for parent in module.parents() {
|
|
info!("{:?} registered as parent of {:?}", parent, name);
|
|
subscriptions.insert(parent, Subscription::new(name.to_owned(), &module));
|
|
}
|
|
|
|
for child_name in module.children() {
|
|
if let Some(ref child) = modules.get(&child_name) {
|
|
info!("{:?} registered as child of {:?}", child_name, name);
|
|
subscriptions.insert(name.clone(), Subscription::new(child_name.to_owned(), &child));
|
|
}
|
|
}
|
|
}
|
|
|
|
Tenquestionmarks {
|
|
modules: modules,
|
|
subscriptions: subscriptions
|
|
}
|
|
}
|
|
|
|
pub fn from_configuration (configuration: Table) -> Result<Tenquestionmarks, ModuleLoaderError> {
|
|
let loader = ModuleLoader::new();
|
|
let modules = loader.load_from_configuration(configuration)?;
|
|
Result::Ok(Tenquestionmarks::with_modules(modules))
|
|
}
|
|
|
|
pub fn get_module (&self, name: &str) -> Option<&Module> {
|
|
self.modules.get(name)
|
|
}
|
|
|
|
pub fn reconfigure (&self, configuration: &Table) {
|
|
for (key, module_configuration) in configuration {
|
|
if let (Some(module_configuration_table), Some(ref module)) = (module_configuration.as_table(), self.modules.get(key)) {
|
|
module.reconfigure(module_configuration_table.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn run (&self) {
|
|
crossbeam::scope(|scope| {
|
|
let mut dispatchers: BTreeMap<&str, Receiver<Envelope>> = BTreeMap::new();
|
|
|
|
// Event loop threads.
|
|
// Event loop threads consume events passed in by other modules' dispatcher threads,
|
|
// and produce events through their own dispatcher threads.
|
|
let senders: BTreeMap<&str, Sender<Arc<Envelope>>> = self.modules.iter().map(|(key, module)| {
|
|
let from = key.clone();
|
|
let (dispatcher_sender, dispatcher_receiver) = transformable_channels::mpsc::channel();
|
|
dispatchers.insert(key, dispatcher_receiver);
|
|
|
|
let mapped_sender = dispatcher_sender.map(move |event: Event| {
|
|
Envelope {
|
|
from: from.clone(),
|
|
event: event
|
|
}
|
|
});
|
|
|
|
let (module_sender, module_receiver) = mpsc::channel();
|
|
info!("Spawning event loop thread for \"{}\"", key);
|
|
scope.spawn(move || {
|
|
module.run(Box::new(mapped_sender), module_receiver);
|
|
info!("Event loop thread for \"{}\" is exiting", key);
|
|
});
|
|
module.set_sender(&module_sender);
|
|
(&key[..], module_sender)
|
|
}).collect();
|
|
|
|
// Dispatcher threads.
|
|
// Dispatcher threads transmit events produced by parent modules to child modules.
|
|
for (from, receiver) in dispatchers.into_iter() {
|
|
if let Some(subscriptions) = self.subscriptions.get_vec(from) {
|
|
let dispatcher_senders: BTreeMap<&str, (&Subscription, Sender<Arc<Envelope>>)> = senders.iter().filter_map(|(key, value)| {
|
|
subscriptions.iter().find(|subscription| subscription.name == *key)
|
|
.map(|subscription| (*key, (subscription, value.clone())))
|
|
}).collect();
|
|
|
|
info!("Spawning dispatcher thread for \"{}\"", from);
|
|
scope.spawn(move || {
|
|
loop {
|
|
match receiver.recv() {
|
|
Ok(envelope) => {
|
|
let arc_envelope = Arc::new(envelope);
|
|
for (child_name, &(subscription, ref sender)) in dispatcher_senders.iter() {
|
|
if subscription.can_handle_event(&arc_envelope) {
|
|
if let Err(err) = sender.send(arc_envelope.clone()) {
|
|
debug!("Failed to dispatch event to module \"{}\": {:?}", child_name, err);
|
|
}
|
|
}
|
|
}
|
|
},
|
|
Err(err) => {
|
|
error!("Failed to receive event from module: \"{}\": {:?}", from, err);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("Dispatcher thread for \"{}\" is exiting", from);
|
|
});
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct Message {
|
|
content: String,
|
|
author: User,
|
|
channel: Option<Channel>
|
|
}
|
|
|
|
impl Message {
|
|
fn reply (&self, message: &str) {
|
|
match self.channel {
|
|
Some(ref channel) => channel.send(message),
|
|
None => self.author.send(message)
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct Channel {
|
|
name: String,
|
|
description: String,
|
|
topic: String,
|
|
sender: Box<MessageSender>
|
|
}
|
|
|
|
impl Channel {
|
|
pub fn send (&self, message: &str) {
|
|
self.sender.send_message(message);
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct User {
|
|
name: String,
|
|
sender: Box<MessageSender>
|
|
}
|
|
|
|
impl User {
|
|
pub fn send (&self, message: &str) {
|
|
self.sender.send_message(message);
|
|
}
|
|
}
|
|
|
|
pub trait MessageSender : Sync + Send + std::fmt::Debug {
|
|
fn send_message (&self, _: &str) {}
|
|
}
|
|
|
|
pub struct NullMessageSender {}
|
|
impl MessageSender for NullMessageSender {}
|
|
impl std::fmt::Debug for NullMessageSender {
|
|
fn fmt (&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
write!(formatter, "NullMessageSender")
|
|
}
|
|
}
|
|
|
|
struct Subscription {
|
|
pub name: String,
|
|
pub filters: Vec<Box<EventFilter>>
|
|
}
|
|
|
|
impl Subscription {
|
|
pub fn new (name: String, module: &Module) -> Subscription {
|
|
let filters: Vec<Box<EventFilter>> = module.config.get("filters")
|
|
.and_then(|value| value.as_array())
|
|
.map(|value| value.to_vec())
|
|
.unwrap_or(vec![])
|
|
.into_iter()
|
|
.map(|value| {
|
|
match value.as_table() {
|
|
Some(table) => Some(Box::new(AttributeEventFilter::new(table)) as Box<EventFilter>),
|
|
None => None
|
|
}
|
|
})
|
|
.filter(|possible_filter| possible_filter.is_some())
|
|
.map(|possible_filter| possible_filter.unwrap())
|
|
.collect();
|
|
|
|
Subscription {
|
|
name: name,
|
|
filters: filters
|
|
}
|
|
}
|
|
|
|
pub fn can_handle_event (&self, envelope: &Envelope) -> bool {
|
|
if !self.filters.is_empty() {
|
|
for filter in &self.filters {
|
|
if filter.accept(envelope) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
debug!(
|
|
"Refusing to transmit envelope from {:?} to {:?} since envelope was filtered out",
|
|
envelope.from,
|
|
self.name
|
|
);
|
|
return false;
|
|
}
|
|
|
|
true
|
|
}
|
|
}
|