tenquestionmarks/tenquestionmarks.py.old

384 lines
16 KiB
Python
Raw Normal View History

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
tenquestionmarks is an extensible, modular IRC bot.
This file is governed by the following license:
Copyright (c) 2011 Adrian Malacoda, Monarch Pass
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 3, as published by
the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Based on the IRC bot at:
http://kstars.wordpress.com/2009/09/05/a-python-irc-bot-for-keeping-up-with-arxiv-or-any-rss-feed/
Copyright 2009 by Akarsh Simha
"""
import sys
import os
import threading
import time
import optparse
import json
import traceback
import datetime
import inspect
import re
for libdir in os.listdir(os.path.join(os.path.dirname(__file__),"lib")):
sys.path.append(os.path.join(os.path.dirname(__file__),"lib",libdir))
import irclib
class Tenquestionmarks(irclib.SimpleIRCClient):
VERSION_NAME = "Tenquestionmarks"
VERSION_NUMBER = "0.13.23"
def __init__(self,hostname, channels, nick, command_prefix="!", password=None, port=6667, directory=None):
irclib.SimpleIRCClient.__init__(self)
self.hostname = hostname
self.port = port
self.channel_list = channels
self.nick = nick
self.password = password
if directory == None:
directory = os.path.join(os.environ.get("HOME"),".tenquestionmarks")
self.directory = directory
try:
os.makedirs(directory)
except OSError: pass
self.msgqueue = []
self.logfile = open(os.path.join(self.directory,"log.log"),"w")
self.command_prefix = command_prefix
self.config = self.get_json("options.json")
self.modules = {}
def load_modules(self,modules):
"""Loads a set of modules given by name. The modules reside in the
modules subdirectory and so are prefixed with the string "module." """
self.modules = {}
for module in modules:
self.log("module","Loading module %s" % (module))
try:
self.modules[module] = getattr(__import__("modules.%s" % (module)),module)
self.log("module","%s loaded" % (module))
except ImportError:
traceback.print_exc(file=sys.stdout)
self.log("module","No module named %s" % (module))
def _dispatcher(self,connection,event):
irclib.SimpleIRCClient._dispatcher(self,connection,event)
self.log(event.eventtype(),"%s from %s to %s" % (event.arguments(), event.source(), event.target()))
if self.modules:
method = "on_" + event.eventtype()
self._fire_method_call(method,self,event)
def _fire_method_call(self,method,*args):
if self.modules:
for module in self.modules:
module = self.modules[module]
if hasattr(module, method):
getattr(module, method)(*args)
def version(self):
"""Creates the version string that is returned from a CTCP version
request. This can be overridden."""
return "%s %s" % (Tenquestionmarks.VERSION_NAME, Tenquestionmarks.VERSION_NUMBER)
def connect(self):
"""Connects to the IRC network given in the constructor and joins a
set of channels.
When this method completes, the on_connected event is fired for modules
to handle."""
irclib.SimpleIRCClient.connect(self,self.hostname, self.port, self.nick, None, self.nick, ircname=self.nick)
if not self.password == None:
self.server.privmsg("NickServ", "identify %s" % (self.nick))
for channel in self.channel_list:
self.connection.join(channel)
self._fire_method_call("on_connected",self)
def loop(self):
"""Starts the IRC bot's loop."""
while 1:
while len(self.msgqueue) > 0:
msg = self.msgqueue.pop()
for channel in self.channel_list:
self.log("post","Posting queued message %s" % (msg))
try:
self.connection.privmsg(channel, msg)
except Exception, e:
traceback.print_exc(file=sys.stdout)
self.log("Error","%s %s" % (e,msg))
time.sleep(1) # TODO: Fix bad code
self.ircobj.process_once()
time.sleep(1) # So that we don't hog the CPU!
def queue(self, message):
"""Adds a message to the end of the bot's message queue."""
self.msgqueue.append(message.encode("UTF-8"))
def on_ctcp(self, connection, event):
"""Event handler for CTCP messages. If the CTCP message is a version request,
a version string is returned."""
self.log("ctcp","%s from %s" % (event.arguments(), event.source()))
ctcptype = event.arguments()[0]
if ctcptype == "VERSION":
self.connection.ctcp_reply(event.source().split("!")[0],self.version())
def on_privmsg(self, connection, event):
"""Event handler for messages sent directly to the bot. These
are always treated as commands."""
message = event.arguments()[0]
nick = event.source().split("!")[0]
try:
(command, arg) = message.split(" ",1)
except ValueError:
command = message
arg = ""
self.log("cmd","%s called command %s with arg %s" % (nick,command,arg))
self.call_command(nick, None, command, arg)
def on_pubmsg(self, connection, event):
"""Event handler for messages sent to a channel in which the
bot resides. If the message starts with a certain string, the
message is treated as a command."""
message = event.arguments()[0]
nick = event.source().split("!")[0]
channel = event.target()
if message.startswith(self.command_prefix):
try:
(command, arg) = message[1:].split(" ",1)
except ValueError:
command = message[1:]
arg = ""
self.log("cmd","%s called command %s in %s with arg %s" % (nick,command,channel,arg))
self.call_command(nick, channel, command, arg)
def call_command(self, nick, channel, command, arg):
"""Calls a command defined in one or more modules. This method is
called indirectly through IRC messages sent from a user, who may
optionally be sending that message publicly in a channel.
The argument is a single string. Depending on how many arguments
the command takes, this string may be broken up into a number
of strings separated by spaces.
The return value of the command is output either back to the
user or to the channel in which the command was invoked."""
command_functions = self.resolve_command(command)
if len(command_functions) > 0:
for func in command_functions:
argspec = inspect.getargspec(func)
numargs = len(argspec.args) - 3
varargs = not argspec.varargs == None
args = []
if varargs:
args = arg.split(" ")
else:
args = arg.split(" ",numargs - 1)
args.insert(0,self)
args.insert(0,channel)
args.insert(0,nick)
try:
returnvalue = func(*args)
if not returnvalue == None:
if not channel == None:
self.multiline_privmsg(channel, returnvalue)
else:
self.multiline_privmsg(nick, returnvalue)
except Exception, e:
traceback.print_exc(file=sys.stdout)
self.log("Error","Command %s caused an %s" % (command, e))
def resolve_command(self, cmdname):
"""Given a command name, traverses through the modules loaded into
the bot and finds functions that match the command name.
If given a command name by itself, this method will look through
all modules for the command. If the command name is given in
the form [module].[command], only the given module is considered."""
funcs = []
module = ""
if "." in cmdname:
(module, cmdname) = cmdname.split(".")
self.log("cmd","%s is located in module %s" % (cmdname, module))
modobj = self.modules[module]
if hasattr(modobj,cmdname):
func = getattr(modobj,cmdname)
funcs.append(func)
else:
self.log("cmd","%s is not looking for a specific module" % (cmdname))
for modname in self.modules:
modobj = self.modules[modname]
if hasattr(modobj,cmdname):
func = getattr(modobj,cmdname)
funcs.append(func)
return funcs
def log(self, operation, message):
"""Logs a message onto standard out and into a file in the bot's directory."""
try:
logmessage = u"[%s] %s: %s" % (datetime.datetime.now(), operation.upper(), message)
print logmessage
self.logfile.write(logmessage)
except Exception, e:
traceback.print_exc(file=sys.stdout)
def get_json(self,filename):
"""Retrieves a JSON object (Python dictionary) from a file in this bot's directory.
If the file does not exist, one is created and an empty dictionary
is returned."""
try:
target = open(os.path.join(options.directory,filename))
obj = json.loads(target.read())
target.close()
return obj
except IOError:
self.put_json(filename, {})
return {}
def put_json(self,filename,obj):
"""Saves a JSON object (Python dictionary) as a file in this bot's directory."""
target = open(os.path.join(options.directory,filename),"w")
target.write(json.dumps(obj))
target.close()
def degrade_to_ascii(self,string):
"""In order to allow as wide a range of inputs as possible, if
a Unicode string cannot be decoded, it can instead be run through
this function, which will change "fancy quotes" to regular quotes
and remove diacritics and accent marks, among other things.
To doubly sure that the string is safe for ASCII, any non-ASCII
characters are then removed after any conversion is done."""
chars = {
u"": "'",
u"": "'",
u"": '"',
u"": '"',
u"Æ": "AE",
u"À": "A",
u"Á": "A",
u"Â": "A",
u"Ã": "A",
u"Ä": "A",
u"Å": "A",
u"Ç": "C",
u"È": "E",
u"É": "E",
u"Ê": "E",
u"Ë": "E",
u"Ì": "I",
u"Í": "I",
u"Î": "I",
u"Ï": "I",
u"Ð": "D",
u"Ñ": "N",
u"Ò": "O",
u"Ó": "O",
u"Ô": "O",
u"Õ": "O",
u"Ö": "O",
u"Ø": "O",
u"Ù": "U",
u"Ú": "U",
u"Û": "U",
u"Ü": "U",
u"Ý": "Y",
u"Þ": "Th",
u"ß": "S",
u"": "-"
}
for char in chars:
string = string.replace(char,chars[char]).replace(char.lower(),chars[char].lower())
# Strip away anything non-ascii that remains
string = "".join([char for char in string if ord(char) < 128])
return string
def html(self,string):
"""Basic parser that converts between a very limited subset of
HTML and IRC control codes.
Note that this parser is very strict about the font tag. It expects
either color or color and bgcolor, but not bgcolor alone. It also wants
both color and bgcolor to be quoted and separated by no more or less than one space."""
tags = [
[re.compile("<b>(.+?)</b>"),"\x02%(0)s\x02"],
[re.compile("<u>(.+?)</u>"),"\x1f%(0)s\x1f"],
[re.compile("<font color=\"(.+?)\">(.+?)</font>"),"\x03%(0)s%(1)s\x03"],
[re.compile("<font color=\"(.+?)\" bgcolor=\"(.+?)\">(.+?)</font>"),"\x03%(0)s,%(1)s%(2)s\x03"],
[re.compile("<font bgcolor=\"(.+?)\" color=\"(.+?)\">(.+?)</font>"),"\x03%(1)s,%(0)s%2(2)s\x03"]
]
for (regex,replacement) in tags:
regex_match = regex.search(string)
while regex_match is not None:
groups_dict = {}
for i in xrange(len(regex_match.groups())):
groups_dict[str(i)] = regex_match.groups()[i]
string = string.replace(regex_match.group(0), replacement % groups_dict)
regex_match = regex.search(string)
return string
def multiline_privmsg(self, target, message):
for line in message.split("\n"):
self.connection.privmsg(target, line)
if __name__ == "__main__":
optparser = optparse.OptionParser()
optparser.add_option("-n", "--nick", action="store", type="string", dest="nick", help="IRC bot nick")
optparser.add_option("-s", "--host", action="store", type="string", dest="host", help="Hostname of the IRC server")
optparser.add_option("-d", "--directory", action="store", type="string", dest="directory", default=os.path.join(os.environ.get("HOME"),".tenquestionmarks"),
help="Directory where the bot stores things (default: ~/.tenquestionmarks)")
optparser.add_option("-p", "--password", action="store", type="string", dest="password", help="Nickserv password")
optparser.add_option("-c", "--channels", action="store", type="string", dest="channels", help="Comma-separated list of channels to join")
optparser.add_option("-m", "--modules", action="store", type="string", dest="modules", help="Comma-separated names of modules to load")
optparser.add_option("-r", "--command-prefix", action="store", type="string", dest="commandprefix", help="Prefix put in front of user commands (default: !)")
(options, args) = optparser.parse_args()
conf_file = open(os.path.join(options.directory,"options.json"))
conf = json.loads(conf_file.read())
conf_file.close()
if options.nick == None and "nick" in conf:
options.nick = conf["nick"]
if options.password == None and "password" in conf:
options.password = conf["password"]
if options.channels == None and "channels" in conf:
options.channels = conf["channels"]
else:
options.channels = options.channels.split(",")
if options.modules == None and "modules" in conf:
options.modules = conf["modules"]
elif not options.modules == None:
options.modules = options.modules.split(",")
if options.host == None and "host" in conf:
options.host = conf["host"]
if options.commandprefix == None and "commandprefix" in conf:
options.commandprefix = conf["commandprefix"]
tqm = Tenquestionmarks(options.host, options.channels, options.nick, options.commandprefix, options.password)
if not options.modules == None:
tqm.load_modules(options.modules)
tqm.connect()
tqm.loop()