)
.+?', self.getImagePageContents()).group()
lineR = re.compile('
\(.+?\) \(.+?\) (?P.+?) . . (?P.+?) \(.+?\) . . (?P\d+.+?\d+) \((?P\d+) .+?\) ')
for match in lineR.finditer(history):
datetime = match.group('datetime')
username = match.group('username')
resolution = match.group('resolution')
size = match.group('size')
comment = match.group('comment')
result.append((datetime, username, resolution, size, comment))
return result
def getFileVersionHistoryTable(self):
lines = []
for (datetime, username, resolution, size, comment) in self.getFileVersionHistory():
lines.append('%s || %s || %s || %s ||
%s' % (datetime, username, resolution, size, comment))
return u'{| border="1"\n! date/time || username || resolution || size || edit summary\n|----\n| ' + u'\n|----\n'.join(lines) + '\n|}'
class XmlPage(Page):
# In my opinion, this should be deleted. --Daniel Herding
'''A subclass of Page that wraps an XMLEntry object (from xmlreader.py).
Sample usage:
>>> source = xmlreader.XmlDump(some_file_name)
>>> for entry in source.parse():
... page = XmlPage(getSite(), entry)
... # do something with page...
'''
def __init__(self, site, xmlentry):
if not isinstance(xmlentry, xmlreader.XmlEntry):
raise TypeError("Invalid argument to XmlPage constructor.")
Page.__init__(self, site, xmlentry.title)
self.editRestriction = xmlentry.editRestriction
self.moveRestriction = xmlentry.moveRestriction
self._contents = xmlentry.text
self._xml = xmlentry # save XML source in case we need it later
m = self.site().redirectRegex().match(self._contents)
if m:
self._redirarg = m.group(1)
self._getexception = IsRedirectPage
class GetAll(object):
def __init__(self, site, pages, throttle, force):
"""First argument is Site object.
Second argument is list (should have .append and be iterable)"""
self.site = site
self.pages = []
self.throttle = throttle
for pl in pages:
if ((not hasattr(pl,'_contents') and not hasattr(pl,'_getexception')) or force):
self.pages.append(pl)
else:
output(u"BUGWARNING: %s already done!" % pl.aslink())
def run(self):
dt=15
while True:
try:
data = self.getData()
except (socket.error, httplib.BadStatusLine, ServerError):
# Print the traceback of the caught exception
print ''.join(traceback.format_exception(*sys.exc_info()))
# DQM output(u'DBG> got network error in GetAll.run. Sleeping for %d seconds'%dt)
# DQM time.sleep(dt)
if dt <= 60:
dt += 15
elif dt < 360:
dt += 60
else:
break
if not data:
return
# They're doing strange things with their XML on Lovetoknow...
R = re.compile(r"\s*<\?xml([^>]*)\?>(.*)",re.DOTALL)
M = R.match(data)
if M:
data = M.group(2)
handler = xmlreader.MediaWikiXmlHandler()
handler.setCallback(self.oneDone)
handler.setHeaderCallback(self.headerDone)
try:
xml.sax.parseString(data, handler)
except xml.sax._exceptions.SAXParseException, err:
f=open('sax_parse_bug.dat','w')
f.write('Error reported: '+str(err))
f.write('\n')
f.write(data)
f.close()
print >>sys.stderr, "Dumped invalid XML to sax_parse_bug.dat"
raise
except PageNotFound:
return
# All of the ones that have not been found apparently do not exist
for pl in self.pages:
if not hasattr(pl,'_contents') and not hasattr(pl,'_getexception'):
pl._getexception = NoPage
def oneDone(self, entry):
title = entry.title
timestamp = entry.timestamp
text = entry.text
editRestriction = entry.editRestriction
moveRestriction = entry.moveRestriction
pl = Page(self.site, title)
for pl2 in self.pages:
if Page(self.site, pl2.sectionFreeTitle()) == pl:
if not hasattr(pl2,'_contents') and not hasattr(pl2,'_getexception'):
break
else:
print "BUG: page not found in list"
print 'Title:', repr(title)
print 'Page:', repr(pl)
print 'Expected one of:', repr(self.pages)
raise PageNotFound
pl2.editRestriction = entry.editRestriction
pl2.moveRestriction = entry.moveRestriction
if editRestriction == 'autoconfirmed':
output(u'Page %s is semi-protected. Getting edit page to find out if we are allowed to edit.' % pl2.title())
try:
pl2.get()
except:
pass
else:
pl2._permalink = entry.revisionid
m = self.site.redirectRegex().match(text)
if m:
pl._editTime = timestamp
redirectto=m.group(1)
pl2._getexception = IsRedirectPage
pl2._redirarg = redirectto
# There's no possibility to read the wpStarttime argument from the XML.
# This argument makes sure an edit conflict is raised if the page is
# deleted between retrieval and saving of the page. It contains the
# UTC timestamp (server time) of the moment we first retrieved the edit
# page. As we can't use server time, we simply use client time. Please
# make sure your system clock is correct. If it's too slow, the bot might
# recreate pages someone deleted. If it's too fast, the bot will raise
# EditConflict exceptions although there's no conflict.
pl2._startTime = time.strftime('%Y%m%d%H%M%S', time.gmtime(time.time()))
section = pl2.section()
if section:
m = re.search("== *%s *==" % section, text)
if not m:
output(u"WARNING: Section not found: %s" % pl2.title())
else:
# Store the content
pl2._contents = text
# Store the time stamp
pl2._editTime = timestamp
else:
# Store the content
pl2._contents = text
# Store the time stamp
pl2._editTime = timestamp
def headerDone(self, header):
# Verify our family data
lang = self.site.lang
ids = header.namespaces.keys()
ids.sort()
for id in ids:
nshdr = header.namespaces[id]
if self.site.family.namespaces.has_key(id):
ns = self.site.namespace(id)
if ns == None:
ns = u''
if ns != nshdr:
dflt = self.site.family.namespaces[id]['_default']
if dflt == ns:
flag = u"is set to default ('%s'), but should be '%s'" % (ns, nshdr)
elif dflt == nshdr:
flag = u"is '%s', but should be removed (default value '%s')" % (ns, nshdr)
else:
flag = u"is '%s', but should be '%s'" % (ns, nshdr)
output(u"WARNING: Outdated family file %s: namespace['%s'][%i] %s" % (self.site.family.name, lang, id, flag))
self.site.family.namespaces[id][lang] = nshdr
else:
output(u"WARNING: Missing namespace in family file %s: namespace['%s'][%i] (it is set to '%s')" % (self.site.family.name, lang, id, nshdr))
def getData(self):
if self.pages == []:
return
address = self.site.export_address()
pagenames = [page.sectionFreeTitle() for page in self.pages]
# We need to use X convention for requested page titles.
if self.site.lang == 'eo':
pagenames = [doubleXForEsperanto(pagetitle) for pagetitle in pagenames]
pagenames = u'\r\n'.join(pagenames)
if type(pagenames) != type(u''):
print 'Warning: xmlreader.WikipediaXMLHandler.getData() got non-unicode page names. Please report this.'
print pagenames
# convert Unicode string to the encoding used on that wiki
pagenames = pagenames.encode(self.site.encoding())
data = [
('action', 'submit'),
('pages', pagenames),
('curonly', 'True'),
]
#print repr(data)
# Slow ourselves down
get_throttle(requestsize = len(self.pages))
# Now make the actual request to the server
now = time.time()
if self.site.hostname() in config.authenticate.keys():
data.append(("Content-type","application/x-www-form-urlencoded"))
data.append(("User-agent", "PythonWikipediaBot/1.0"))
data = urlencode(tuple(data))
response = urllib2.urlopen(urllib2.Request('http://' + self.site.hostname() + address, data))
data = response.read()
else:
data = urlencode(tuple(data))
conn = httplib.HTTPConnection(self.site.hostname())
conn.putrequest("POST", address)
conn.putheader('Content-Length', str(len(data)))
conn.putheader("Content-type", "application/x-www-form-urlencoded")
conn.putheader("User-agent", "PythonWikipediaBot/1.0")
if self.site.cookies():
conn.putheader('Cookie', self.site.cookies())
conn.endheaders()
conn.send(data)
response = conn.getresponse()
if (response.status >= 300):
raise ServerError(response.status, response.reason)
data = response.read()
conn.close()
get_throttle.setDelay(time.time() - now)
return data
def getall(site, pages, throttle = True, force = False):
output(u'Getting %d pages from %s...' % (len(pages), site))
return GetAll(site, pages, throttle, force).run()
# Library functions
def unescape(s):
"""Replace escaped HTML-special characters by their originals"""
if '&' not in s:
return s
s = s.replace("<", "<")
s = s.replace(">", ">")
s = s.replace("'", "'")
s = s.replace(""", '"')
s = s.replace("&", "&") # Must be last
return s
def setAction(s):
"""Set a summary to use for changed page submissions"""
global action
action = s
# Default action
setAction('Wikipedia python library')
def urlencode(query):
"""This can encode a query so that it can be sent as a query using
a http POST request"""
l=[]
for k, v in query:
k = urllib.quote(k)
v = urllib.quote(v)
l.append(k + '=' + v)
return '&'.join(l)
# Mechanics to slow down page download rate.
class Throttle(object):
def __init__(self, mindelay = config.minthrottle, maxdelay = config.maxthrottle, multiplydelay = True):
"""Make sure there are at least 'delay' seconds between page-gets
after 'ignore' initial page-gets"""
self.mindelay = mindelay
self.maxdelay = maxdelay
self.pid = False # If self.pid remains False, we're not checking for multiple processes
self.now = 0
self.next_multiplicity = 1.0
self.checkdelay = 240 # Check the file with processes again after this many seconds
self.dropdelay = 360 # Drop processes from the list that have not made a check in this many seconds
self.releasepid = 100000 # Free the process id
self.lastwait = 0.0
self.delay = 0
if multiplydelay:
self.checkMultiplicity()
self.setDelay(mindelay)
def checkMultiplicity(self):
processes = {}
my_pid = 1
count = 1
try:
f = open('throttle.log','r')
except IOError:
if not self.pid:
pass
else:
raise
else:
now = time.time()
for line in f.readlines():
line = line.split(' ')
pid = int(line[0])
ptime = int(line[1].split('.')[0])
if now - ptime <= self.releasepid:
if now - ptime <= self.dropdelay and pid != self.pid:
count += 1
processes[pid] = ptime
if pid >= my_pid:
my_pid = pid+1
if not self.pid:
self.pid = my_pid
self.checktime = time.time()
processes[self.pid] = self.checktime
f = open('throttle.log','w')
for p in processes.keys():
f.write(str(p)+' '+str(processes[p])+'\n')
f.close()
self.process_multiplicity = count
#DQM print("Checked for running processes. %s processes currently running, "%count +
#DQM "including the current process.")
def setDelay(self, delay = config.minthrottle, absolute = False):
if absolute:
self.maxdelay = delay
self.mindelay = delay
self.delay = delay
# Don't count the time we already waited as part of our waiting time :-0
self.now = time.time()
def getDelay(self):
thisdelay = self.delay
if self.pid: # If self.pid, we're checking for multiple processes
if time.time() > self.checktime + self.checkdelay:
self.checkMultiplicity()
if thisdelay < (self.mindelay * self.next_multiplicity):
thisdelay = self.mindelay * self.next_multiplicity
elif thisdelay > self.maxdelay:
thisdelay = self.maxdelay
thisdelay *= self.process_multiplicity
return thisdelay
def waittime(self):
"""Calculate the time in seconds we will have to wait if a query
would be made right now"""
# Take the previous requestsize in account calculating the desired
# delay this time
thisdelay = self.getDelay()
now = time.time()
ago = now - self.now
if ago < thisdelay:
delta = thisdelay - ago
return delta
else:
return 0.0
def drop(self):
"""Remove me from the list of running bots processes."""
self.checktime = 0
processes = {}
try:
f = open('throttle.log','r')
except IOError:
return
else:
now = time.time()
for line in f.readlines():
line = line.split(' ')
pid = int(line[0])
ptime = int(line[1].split('.')[0])
if now - ptime <= self.releasepid and pid != self.pid:
processes[pid] = ptime
f = open('throttle.log','w')
for p in processes.keys():
f.write(str(p)+' '+str(processes[p])+'\n')
f.close()
def __call__(self, requestsize = 1):
"""This is called from getEditPage without arguments. It will make sure
that if there are no 'ignores' left, there are at least delay seconds
since the last time it was called before it returns."""
waittime = self.waittime()
# Calculate the multiplicity of the next delay based on how
# big the request is that is being posted now.
# We want to add "one delay" for each factor of two in the
# size of the request. Getting 64 pages at once allows 6 times
# the delay time for the server.
self.next_multiplicity = math.log(1+requestsize)/math.log(2.0)
# Announce the delay if it exceeds a preset limit
#if waittime > config.noisysleep:
#DQM print "Sleeping for %.1f seconds" % waittime
#DQM time.sleep(waittime)
self.now = time.time()
get_throttle = Throttle(config.minthrottle,config.maxthrottle)
put_throttle = Throttle(config.put_throttle,config.put_throttle,False)
class MyURLopener(urllib.FancyURLopener):
version="PythonWikipediaBot/1.0"
# Special opener in case we are using a site with authentication
if config.authenticate:
import urllib2, cookielib
COOKIEFILE = 'login-data/cookies.lwp'
cj = cookielib.LWPCookieJar()
if os.path.isfile(COOKIEFILE):
cj.load(COOKIEFILE)
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
for site in config.authenticate.keys():
passman.add_password(None, site, config.authenticate[site][0], config.authenticate[site][1])
authhandler = urllib2.HTTPBasicAuthHandler(passman)
authenticateURLopener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj),authhandler)
urllib2.install_opener(authenticateURLopener)
def replaceExceptNowikiAndComments(text, old, new):
""" Deprecated. """
return replaceExceptMathNowikiAndComments(text, old, new)
def replaceExceptMathNowikiAndComments(text, old, new):
"""
Replaces old by new in text, skipping occurences of old within nowiki tags
and HTML comments.
Parameters:
text - a string
old - a compiled regular expression
new - a string
"""
if type(old) == type('') or type(old) == type(u''):
old = re.compile(old)
nowikiOrHtmlCommentR = re.compile(r'
.*?||
', re.IGNORECASE | re.DOTALL)
# How much of the text we have looked at so far
index = 0
while True:
match = old.search(text, index)
if not match:
break
noTouchMatch = nowikiOrHtmlCommentR.search(text, index)
if noTouchMatch and noTouchMatch.start() < match.start():
# an HTML comment or text in nowiki tags stands before the next valid match. Skip.
index = noTouchMatch.end()
else:
# We found a valid match. Replace it.
text = text[:match.start()] + old.sub(new, text[match.start():match.end()]) + text[match.end():]
# continue the search on the remaining text
index = match.start() + len(new)
return text
# Part of library dealing with interwiki links
def getLanguageLinks(text, insite = None, pageLink = "[[]]"):
"""
Returns a dictionary with language codes as keys and Page objects as values
for each interwiki link found in the text. Do not call this routine
directly, use Page objects instead"""
if insite == None:
insite = getSite()
result = {}
# Ignore interwiki links within nowiki tags and HTML comments
nowikiOrHtmlCommentR = re.compile(r'
.*?|', re.IGNORECASE | re.DOTALL)
match = nowikiOrHtmlCommentR.search(text)
while match:
text = text[:match.start()] + text[match.end():]
match = nowikiOrHtmlCommentR.search(text)
# This regular expression will find every link that is possibly an
# interwiki link.
# NOTE: language codes are case-insensitive and only consist of basic latin
# letters and hyphens.
interwikiR = re.compile(r'\[\[([a-zA-Z\-]+)\s?:([^\[\]\n]*)\]\]')
for lang, pagetitle in interwikiR.findall(text):
lang = lang.lower()
# Check if it really is in fact an interwiki link to a known
# language, or if it's e.g. a category tag or an internal link
if lang in insite.family.obsolete:
lang = insite.family.obsolete[lang]
if lang in insite.family.langs:
if '|' in pagetitle:
# ignore text after the pipe
pagetitle = pagetitle[:pagetitle.index('|')]
if not pagetitle:
output(u"ERROR: %s - ignoring impossible link to %s:%s" % (pageLink, lang, pagetitle))
else:
# we want the actual page objects rather than the titles
result[insite.getSite(code = lang)] = Page(insite.getSite(code = lang), pagetitle, insite=insite)
return result
def removeLanguageLinks(text, site = None):
"""Given the wiki-text of a page, return that page with all interwiki
links removed. If a link to an unknown language is encountered,
a warning is printed."""
if site == None:
site = getSite()
# This regular expression will find every interwiki link, plus trailing
# whitespace.
languageR = '|'.join(site.family.langs)
interwikiR = re.compile(r'\[\[(%s)\s?:[^\]]*\]\][\s]*' % languageR, re.IGNORECASE)
text = replaceExceptMathNowikiAndComments(text, interwikiR, '')
return normalWhitespace(text)
def replaceLanguageLinks(oldtext, new, site = None):
"""Replace the interwiki language links given in the wikitext given
in oldtext by the new links given in new.
'new' should be a dictionary with the language names as keys, and
Page objects as values.
"""
if site == None:
site = getSite()
s = interwikiFormat(new, insite = site)
s2 = removeLanguageLinks(oldtext, site = site)
if s:
if site.language() in site.family.interwiki_attop:
newtext = s + site.family.interwiki_text_separator + s2
elif site.language() in site.family.categories_last:
cats = getCategoryLinks(s2, site = site)
s2 = removeCategoryLinks(s2, site) + site.family.interwiki_text_separator + s
newtext = replaceCategoryLinks(s2, cats, site=site)
else:
newtext = s2 + site.family.interwiki_text_separator + s
else:
newtext = s2
return newtext
def interwikiFormat(links, insite = None):
"""Create a suitable string encoding all interwiki links for a wikipedia
page.
'links' should be a dictionary with the language names as keys, and
Page objects as values.
The string is formatted for inclusion in insite (defaulting to your own).
"""
if insite is None:
insite = getSite()
if not links:
return ''
# Security check: site may not refer to itself.
for pl in links.values():
if pl.site() == insite:
raise ValueError("Trying to add interwiki link to self")
s = []
ar = links.keys()
ar.sort()
putfirst = insite.interwiki_putfirst()
if putfirst:
#In this case I might have to change the order
ar2 = []
for code in putfirst:
# The code may not exist in this family?
if code in getSite().family.langs:
site = insite.getSite(code = code)
if site in ar:
del ar[ar.index(site)]
ar2 = ar2 + [site]
ar = ar2 + ar
if insite.interwiki_putfirst_doubled(ar):
ar = insite.interwiki_putfirst_doubled(ar) + ar
for site in ar:
try:
link = links[site].aslink(forceInterwiki = True)
s.append(link)
except AttributeError:
s.append(site.linkto(links[site],othersite=insite))
if insite.lang in insite.family.interwiki_on_one_line:
sep = ' '
else:
sep = '\r\n'
s=sep.join(s) + '\r\n'
return s
def normalWhitespace(text):
# Remove white space at the beginning
while 1:
if text and text.startswith('\r\n'):
text=text[2:]
elif text and text.startswith(' '):
# This assumes that the first line NEVER starts with a space!
text=text[1:]
else:
break
# Remove white space at the end
while 1:
if text and text[-1:] in '\r\n \t':
text=text[:-1]
else:
break
return text
# Categories
def getCategoryLinks(text, site):
import catlib
"""Returns a list of category links.
in the form {code:pagename}. Do not call this routine directly, use
Page objects instead"""
result = []
# Ignore interwiki links within nowiki tags and HTML comments
nowikiOrHtmlCommentR = re.compile(r'
.*?|', re.IGNORECASE | re.DOTALL)
match = nowikiOrHtmlCommentR.search(text)
while match:
text = text[:match.start()] + text[match.end():]
match = nowikiOrHtmlCommentR.search(text)
catNamespace = '|'.join(site.category_namespaces())
R = re.compile(r'\[\[\s*(?P
%s)\s*:(?P.+?)(?:\|(?P.+?))?\]\]' % catNamespace)
for match in R.finditer(text):
cat = catlib.Category(site, '%s:%s' % (match.group('namespace'), match.group('catName')), sortKey = match.group('sortKey'))
result.append(cat)
return result
def removeCategoryLinks(text, site):
"""Given the wiki-text of a page, return that page with all category
links removed. """
# This regular expression will find every link that is possibly an
# interwiki link, plus trailing whitespace. The language code is grouped.
# NOTE: This assumes that language codes only consist of non-capital
# ASCII letters and hyphens.
catNamespace = '|'.join(site.category_namespaces())
categoryR = re.compile(r'\[\[\s*(%s)\s*:.*?\]\][\s]*' % catNamespace)
text = replaceExceptMathNowikiAndComments(text, categoryR, '')
return normalWhitespace(text)
def replaceCategoryLinks(oldtext, new, site = None):
"""Replace the category links given in the wikitext given
in oldtext by the new links given in new.
'new' should be a list of Category objects.
"""
if site is None:
site = getSite()
# first remove interwiki links and add them later, so that
# interwiki tags appear below category tags if both are set
# to appear at the bottom of the article
if not site.lang in site.family.categories_last:
interwiki_links = getLanguageLinks(oldtext, insite = site)
oldtext = removeLanguageLinks(oldtext, site = site)
s = categoryFormat(new, insite = site)
s2 = removeCategoryLinks(oldtext, site = site)
if s:
if site.language() in site.family.category_attop:
newtext = s + site.family.category_text_separator + s2
else:
newtext = s2 + site.family.category_text_separator + s
else:
newtext = s2
# now re-add interwiki links
if not site.lang in site.family.categories_last:
newtext = replaceLanguageLinks(newtext, interwiki_links, site = site)
return newtext
def categoryFormat(categories, insite = None):
"""Create a suitable string with all category links for a wiki
page.
'categories' should be a list of Category objects.
The string is formatted for inclusion in insite.
"""
if not categories:
return ''
if insite is None:
insite = getSite()
catLinks = [category.aslink() for category in categories]
if insite.category_on_one_line():
sep = ' '
else:
sep = '\r\n'
# Some people don't like the categories sorted
#catLinks.sort()
return sep.join(catLinks) + '\r\n'
# end of category specific code
def myencoding():
"""The character encoding used by the home wiki"""
return getSite().encoding()
def url2link(percentname, insite, site):
"""Convert a url-name of a page into a proper name for an interwiki link
the argument 'insite' specifies the target wiki
"""
percentname = percentname.replace('_', ' ')
x = url2unicode(percentname, site = site)
return unicode2html(x, insite.encoding())
def resolveEsperantoXConvention(text):
"""
Resolves the x convention used to encode Esperanto special characters,
e.g. Cxefpagxo and CXefpagXo will both be converted to Ĉefpaĝo.
Note that to encode non-Esperanto words like Bordeaux, one uses a
double x, i.e. Bordeauxx or BordeauxX.
"""
chars = {
u'c': u'ĉ',
u'C': u'Ĉ',
u'g': u'ĝ',
u'G': u'Ĝ',
u'h': u'ĥ',
u'H': u'Ĥ',
u'j': u'ĵ',
u'J': u'Ĵ',
u's': u'ŝ',
u'S': u'Ŝ',
u'u': u'ŭ',
u'U': u'Ŭ',
}
for latin, esperanto in chars.iteritems():
# A regular expression that matches a letter combination which IS
# encoded using x-convention.
xConvR = re.compile(latin + '[xX]+')
pos = 0
result = ''
# Each matching substring will be regarded exactly once.
while True:
match = xConvR.search(text[pos:])
if match:
old = match.group()
if len(old) % 2 == 0:
# The first two chars represent an Esperanto letter.
# Following x's are doubled.
new = esperanto + ''.join([old[2 * i] for i in range(1, len(old)/2)])
else:
# The first character stays latin; only the x's are doubled.
new = latin + ''.join([old[2 * i + 1] for i in range(0, len(old)/2)])
result += text[pos : match.start() + pos] + new
pos += match.start() + len(old)
else:
result += text[pos:]
text = result
break
return text
def doubleXForEsperanto(text):
"""
Doubles X-es where necessary so that we can submit a page to an Esperanto
wiki. Again, we have to keep stupid stuff like cXxXxxX in mind. Maybe
someone wants to write about the Sony Cyber-shot DSC-Uxx camera series on
eo: ;)
"""
# A regular expression that matches a letter combination which is NOT
# encoded in x-convention.
notXConvR = re.compile('[cghjsuCGHJSU][xX]+')
pos = 0
result = ''
while True:
match = notXConvR.search(text[pos:])
if match:
old = match.group()
# the first letter stays; add an x after each X or x.
new = old[0] + ''.join([old[i] + 'x' for i in range(1, len(old))])
result += text[pos : match.start() + pos] + new
pos += match.start() + len(old)
else:
result += text[pos:]
text = result
break
return text
######## Unicode library functions ########
def UnicodeToAsciiHtml(s):
html = []
for c in s:
cord = ord(c)
#print cord,
if cord < 128:
html.append(c)
else:
html.append('%d;'%cord)
return ''.join(html)
def url2unicode(title, site):
try:
t = title.encode(site.encoding())
t = urllib.unquote(t)
return unicode(t, site.encoding())
except UnicodeError:
# try to handle all encodings (will probably retry utf-8)
for enc in site.encodings():
try:
t = title.encode(enc)
t = urllib.unquote(t)
return unicode(t, enc)
except UnicodeError:
pass
# Couldn't convert, raise the original exception
raise
def unicode2html(x, encoding):
"""
We have a unicode string. We can attempt to encode it into the desired
format, and if that doesn't work, we encode the unicode into html #
entities. If it does work, we return it unchanged.
"""
try:
x.encode(encoding)
except UnicodeError:
x = UnicodeToAsciiHtml(x)
return x
def html2unicode(text, ignore = []):
"""
Given a string, replaces all HTML entities by the equivalent unicode
characters.
"""
# This regular expression will match any decimal and hexadecimal entity and
# also entities that might be named entities.
entityR = re.compile(r'&(#(?P\d+)|#x(?P[0-9a-fA-F]+)|(?P[A-Za-z]+));')
result = u''
i = 0
found = True
while found:
text = text[i:]
match = entityR.search(text)
if match:
unicodeCodepoint = None
if match.group('decimal'):
unicodeCodepoint = int(match.group('decimal'))
elif match.group('hex'):
unicodeCodepoint = int(match.group('hex'), 16)
elif match.group('name'):
name = match.group('name')
if htmlentitydefs.name2codepoint.has_key(name):
# We found a known HTML entity.
unicodeCodepoint = htmlentitydefs.name2codepoint[name]
result += text[:match.start()]
if unicodeCodepoint and unicodeCodepoint not in ignore:
result += unichr(unicodeCodepoint)
else:
# Leave the entity unchanged
result += text[match.start():match.end()]
i = match.end()
else:
result += text
found = False
return result
def Family(fam = None, fatal = True):
"""
Import the named family.
"""
if fam == None:
fam = config.family
try:
# search for family module in the 'families' subdirectory
sys.path.append('families')
exec "import %s_family as myfamily" % fam
except ImportError:
if fatal:
print "Error importing the %s family. This probably means the family"%fam
print "does not exist. Also check your configuration file"
sys.exit(1)
else:
raise ValueError("Family does not exist")
return myfamily.Family()
class Site(object):
def __init__(self, code, fam=None, user=None):
"""Constructor takes three arguments:
code language code for Site
fam Wikimedia family (optional: defaults to configured)
user User to use (optional: defaults to configured)"""
self.lang = code.lower()
if isinstance(fam, basestring) or fam is None:
self.family = Family(fam)
else:
self.family = fam
if self.lang not in self.family.langs:
raise KeyError("Language %s does not exist in family %s"%(self.lang,self.family.name))
self.nocapitalize = self.lang in self.family.nocapitalize
self.user = user
self._token = None
self._sysoptoken = None
self.loginStatusKnown = False
self.loggedInAs = None
def forceLogin(self, sysop = False):
if not self.loggedin(sysop = sysop):
loginMan = login.LoginManager(site = self, sysop = sysop)
if loginMan.login(retry = True):
self.loginStatusKnown = True
self.loggedInAs = loginMan.username
def loggedin(self, sysop = False):
"""
Checks if we're logged in by loading a page and looking for the login
link. We assume that we're not being logged out during a bot run, so
loading the test page is only required once.
"""
self._loadCookies()
if not self.loginStatusKnown:
output(u'Getting a page to check if we\'re logged in on %s' % self)
path = self.get_address('Non-existing_page')
text = self.getUrl(path, sysop = sysop)
# Search for the "my talk" link at the top
mytalkR = re.compile('(?P.+?)\s*')
m = mytalkR.search(text)
if m:
self.loginStatusKnown = True
self.loggedInAs = m.group('username')
# While we're at it, check if we have got unread messages
if '' in text:
output(u'NOTE: You have unread messages on %s' % self)
return (self.loggedInAs is not None)
def cookies(self, sysop = False):
# TODO: cookie caching is disabled
#if not hasattr(self,'_cookies'):
self._loadCookies(sysop = sysop)
return self._cookies
def _loadCookies(self, sysop = False):
"""Retrieve session cookies for login"""
try:
if sysop:
username = config.sysopnames[self.family.name][self.lang]
else:
username = config.usernames[self.family.name][self.lang]
except KeyError:
self._cookies = None
self.loginStatusKnown = True
else:
fn = 'login-data/%s-%s-%s-login.data' % (self.family.name, self.lang, username)
#if not os.path.exists(fn):
# fn = 'login-data/%s-login.data' % self.lang
if not os.path.exists(fn):
#print "Not logged in"
self._cookies = None
self.loginStatusKnown = True
else:
f = open(fn)
self._cookies = '; '.join([x.strip() for x in f.readlines()])
f.close()
def getUrl(self, path, retry = True, sysop = False):
"""
Low-level routine to get a URL from the wiki.
Parameters:
path - The absolute path, without the hostname.
retry - If True, retries loading the page when a network error
occurs.
sysop - If True, the sysop account's cookie will be used.
Returns the HTML text of the page converted to unicode.
"""
if self.hostname() in config.authenticate.keys():
uo = authenticateURLopener
else:
uo = MyURLopener()
if self.cookies(sysop = sysop):
uo.addheader('Cookie', self.cookies(sysop = sysop))
# Try to retrieve the page until it was successfully loaded (just in
# case the server is down or overloaded).
# Wait for retry_idle_time minutes (growing!) between retries.
retry_idle_time = 1
starttime = time.time()
retrieved = False
while not retrieved:
try:
if self.hostname() in config.authenticate.keys():
f = urllib2.urlopen('http://%s%s' % (self.hostname(), path))
else:
f = uo.open('http://%s%s' % (self.hostname(), path))
retrieved = True
except KeyboardInterrupt:
raise
except Exception, e:
if retry:
# We assume that the server is down. Wait some time, then try again.
output(u"%s" % e)
output(u"WARNING: Could not load 'http://%s%s'. Maybe the server or your connection is down. Retrying in %i minutes..." % (self.hostname(), path, retry_idle_time))
time.sleep(retry_idle_time * 60)
# Next time wait longer, but not longer than half an hour
retry_idle_time *= 2
if retry_idle_time > 30:
retry_idle_time = 30
else:
raise
text = f.read()
# Find charset in the content-type meta tag
contentType = f.info()['Content-Type']
R = re.compile('charset=([^\'\"]+)')
m = R.search(contentType)
if m:
charset = m.group(1)
else:
print "WARNING: No character set found"
# UTF-8 as default
charset = 'utf-8'
# Check if this is the charset we expected
self.checkCharset(charset)
# Convert HTML to Unicode
try:
text = unicode(text, charset, errors = 'strict')
except UnicodeDecodeError, e:
print e
output(u'ERROR: Invalid characters found on http://%s%s, replaced by \\ufffd.' % (self.hostname(), path))
# We use error='replace' in case of bad encoding.
text = unicode(text, charset, errors = 'replace')
return text
def newpages(self, number = 10, repeat = False):
"""Generator which yields new articles subsequently.
It starts with the article created 'number' articles
ago (first argument). When these are all yielded
it fetches NewPages again. If there is no new page,
it blocks until there is one, sleeping between subsequent
fetches of NewPages.
The objects yielded are dictionairies. The keys are
date (datetime object), title (pagelink), length (int)
user_login (only if user is logged in, string), comment
(string) and user_anon (if user is not logged in, string).
The throttling is important here, so always enabled.
"""
throttle = True
seen = set()
while True:
path = self.newpages_address()
get_throttle()
html = self.getUrl(path)
entryR = re.compile('
(?P.+?) .+? \((?P\d+)(.+?)\) \. \. (?P)?(?P.+?)()?( \((?P.+?)\))?')
for m in entryR.finditer(html):
date = m.group('date')
title = m.group('title')
title = title.replace('"', '"')
length = int(m.group('length'))
loggedIn = (m.group('loggedin') is not None)
username = m.group('username')
comment = m.group('comment')
if title not in seen:
seen.add(title)
page = Page(self, title)
yield page, date, length, loggedIn, username, comment
if not repeat:
break
def longpages(self, number = 10, repeat = False):
throttle = True
seen = set()
while True:
path = self.longpages_address()
get_throttle()
html = self.getUrl(path)
entryR = re.compile('
.+? \((?P\d+)(.+?)\)')
for m in entryR.finditer(html):
title = m.group('title')
length = int(m.group('length'))
if title not in seen:
seen.add(title)
page = Page(self, title)
yield page, length
if not repeat:
break
def shortpages(self, number = 10, repeat = False):
throttle = True
seen = set()
while True:
path = self.shortpages_address()
get_throttle()
html = self.getUrl(path)
entryR = re.compile('
.+? \((?P\d+)(.+?)\)')
for m in entryR.finditer(html):
title = m.group('title')
length = int(m.group('length'))
if title not in seen:
seen.add(title)
page = Page(self, title)
yield page, length
if not repeat:
break
def categories(self, number = 10, repeat = False):
throttle = True
seen = set()
while True:
path = self.categories_address()
get_throttle()
html = self.getUrl(path)
entryR = re.compile('
.+?')
for m in entryR.finditer(html):
title = m.group('title')
if title not in seen:
seen.add(title)
page = Page(self, title)
yield page
if not repeat:
break
def deadendpages(self, number = 10, repeat = False):
throttle = True
seen = set()
while True:
path = self.deadendpages_address()
get_throttle()
html = self.getUrl(path)
entryR = re.compile('
.+?')
for m in entryR.finditer(html):
title = m.group('title')
if title not in seen:
seen.add(title)
page = Page(self, title)
yield page
if not repeat:
break
def ancientpages(self, number = 10, repeat = False):
throttle = True
seen = set()
while True:
path = self.ancientpages_address()
get_throttle()
html = self.getUrl(path)
entryR = re.compile('
.+? (?P.+?)')
for m in entryR.finditer(html):
title = m.group('title')
date = m.group('date')
if title not in seen:
seen.add(title)
page = Page(self, title)
yield page, date
if not repeat:
break
def lonelypages(self, number = 10, repeat = False):
throttle = True
seen = set()
while True:
path = self.lonelypages_address()
get_throttle()
html = self.getUrl(path)
entryR = re.compile('
.+?')
for m in entryR.finditer(html):
title = m.group('title')
if title not in seen:
seen.add(title)
page = Page(self, title)
yield page
if not repeat:
break
def uncategorizedcategories(self, number = 10, repeat = False):
throttle = True
seen = set()
while True:
path = self.uncategorizedcategories_address()
get_throttle()
html = self.getUrl(path)
entryR = re.compile('
.+?')
for m in entryR.finditer(html):
title = m.group('title')
if title not in seen:
seen.add(title)
page = Page(self, title)
yield page
if not repeat:
break
def uncategorizedpages(self, number = 10, repeat = False):
throttle = True
seen = set()
while True:
path = self.uncategorizedpages_address()
get_throttle()
html = self.getUrl(path)
entryR = re.compile('
.+?')
for m in entryR.finditer(html):
title = m.group('title')
if title not in seen:
seen.add(title)
page = Page(self, title)
yield page
if not repeat:
break
def unusedcategories(self, number = 10, repeat = False):
throttle = True
seen = set()
while True:
path = self.unusedcategories_address()
get_throttle()
html = self.getUrl(path)
entryR = re.compile('
.+?')
for m in entryR.finditer(html):
title = m.group('title')
if title not in seen:
seen.add(title)
page = Page(self, title)
yield page
if not repeat:
break
def allpages(self, start = '!', namespace = 0, throttle = True):
"""Generator which yields all articles in the home language in
alphanumerical order, starting at a given page. By default,
it starts at '!', so it should yield all pages.
The objects returned by this generator are all Page()s.
It is advised not to use this directly, but to use the
AllpagesPageGenerator from pagegenerators.py instead.
"""
while True:
# encode Non-ASCII characters in hexadecimal format (e.g. %F6)
start = start.encode(self.encoding())
start = urllib.quote(start)
# load a list which contains a series of article names (always 480)
path = self.allpages_address(start, namespace)
print 'Retrieving Allpages special page for %s from %s, namespace %i' % (repr(self), start, namespace)
returned_html = self.getUrl(path)
# Try to find begin and end markers
try:
# In 1.4, another table was added above the navigational links
if self.version() < "1.4":
begin_s = '
= self.family.interwiki_putfirst_doubled[self.lang][0]:
list_of_links2 = []
for lang in list_of_links:
list_of_links2.append(lang.language())
list = []
for lang in self.family.interwiki_putfirst_doubled[self.lang][1]:
try:
list.append(list_of_links[list_of_links2.index(lang)])
except ValueError:
pass
return list
else:
return False
else:
return False
def login_address(self):
return self.family.login_address(self.lang)
def watchlist_address(self):
return self.family.watchlist_address(self.lang)
def getSite(self, code):
return getSite(code = code, fam = self.family, user=self.user)
def namespace(self, num):
return self.family.namespace(self.lang, num)
def namespaces(self):
list=()
for n in self.family.namespaces:
ns = self.family.namespace(self.lang, n)
if ns is not None:
list += (self.family.namespace(self.lang, n),)
return list
def linktrail(self):
return self.family.linktrail(self.lang)
def language(self):
return self.lang
def family(self):
return self.family
def sitename(self):
return self.family.name+':'+self.lang
def languages(self):
return self.family.langs.keys()
def getToken(self, getalways = True, getagain = False, sysop = False):
if getagain or (getalways and ((sysop and not self._sysoptoken) or (not sysop and not self._token))):
output(u"Getting page to get a token.")
try:
Page(self, "%s:Sandbox" % self.family.namespace(self.lang, 4)).get(force = True, get_redirect = True, sysop = sysop)
#Page(self, "Non-existing page").get(force = True, sysop = sysop)
except UserBlocked:
raise
except Error:
pass
if sysop:
if not self._sysoptoken:
return False
else:
return self._sysoptoken
else:
if not self._token:
return False
else:
return self._token
def putToken(self,value, sysop = False):
if sysop:
self._sysoptoken = value
else:
self._token = value
return
_sites = {}
def getSite(code = None, fam = None, user=None):
if code == None:
code = default_code
if fam == None:
fam = default_family
key = '%s:%s'%(fam,code)
if not _sites.has_key(key):
_sites[key] = Site(code=code, fam=fam, user=user)
return _sites[key]
def setSite(site):
default_code = site.language
default_family = site.family
def argHandler(arg, moduleName):
'''
DEPRECATED - use handleArgs instead
Takes a commandline parameter, converts it to unicode, and returns it unless
it is one of the global parameters as -lang or -log. If it is a global
parameter, processes it and returns None.
moduleName should be the name of the module calling this function. This is
required because the -help option loads the module's docstring and because
the module name will be used for the filename of the log.
'''
global default_code, default_family
if sys.platform=='win32':
# stupid Windows gives parameters encoded as windows-1252, but input
# encoded as cp850
arg = unicode(arg, 'windows-1252')
else:
# Linux uses the same encoding for both
arg = unicode(arg, config.console_encoding)
if arg == '-help':
showHelp(moduleName)
sys.exit(0)
elif arg.startswith('-family:'):
global default_family
default_family = arg[8:]
elif arg.startswith('-lang:'):
global default_code
default_code = arg[6:]
elif arg.startswith('-putthrottle:'):
put_throttle.setDelay(int(arg[13:]),absolute = True)
elif arg == '-log':
activateLog('%s.log' % moduleName)
elif arg.startswith('-log:'):
activateLog(arg[5:])
elif arg == '-nolog':
global logfile
logfile = None
else:
return arg
return None
def handleArgs():
'''
Takes the commandline arguments, converts them to Unicode, processes all
global parameters such as -lang or -log. Returns a list of all arguments
that are not global.
'''
global default_code, default_family
# get commandline arguemnts
args = sys.argv
# get the name of the module calling this function. This is
# required because the -help option loads the module's docstring and because
# the module name will be used for the filename of the log.
# TODO: check if the following line is platform-independent
moduleName = args[0][:args[0].rindex('.')]
nonGlobalArgs = []
for arg in args[1:]:
if sys.platform=='win32':
# stupid Windows gives parameters encoded as windows-1252, but input
# encoded as cp850
arg = unicode(arg, 'windows-1252')
else:
# Linux uses the same encoding for both
arg = unicode(arg, config.console_encoding)
if arg == '-help':
showHelp(moduleName)
sys.exit(0)
elif arg.startswith('-family:'):
global default_family
default_family = arg[8:]
elif arg.startswith('-lang:'):
global default_code
default_code = arg[6:]
elif arg.startswith('-putthrottle:'):
put_throttle.setDelay(int(arg[13:]), absolute = True)
elif arg == '-log':
activateLog('%s.log' % moduleName)
elif arg.startswith('-log:'):
activateLog(arg[5:])
elif arg == '-nolog':
global logfile
logfile = None
else:
# the argument is not global. Let the specific bot script care
# about it.
nonGlobalArgs.append(arg)
return nonGlobalArgs
#########################
# Interpret configuration
#########################
# search for user interface module in the 'userinterfaces' subdirectory
sys.path.append('userinterfaces')
exec "import %s_interface as uiModule" % config.userinterface
ui = uiModule.UI()
default_family = config.family
default_code = config.mylang
logfile = None
# Check
try:
getSite()
except KeyError:
print(
u"""Please create a file user-config.py, and put in there:\n
One line saying \"mylang='language'\"
One line saying \"usernames['wikipedia']['language']='yy'\"\n
...filling in your username and the language code of the wiki you want to work
on.\n
For other possible configuration variables check config.py.
""")
sys.exit(1)
# Languages to use for comment text after the actual language but before
# en:. For example, if for language 'xx', you want the preference of
# languages to be:
# xx:, then fr:, then ru:, then en:
# you let altlang return ['fr','ru'].
# This code is used by translate() below.
def altlang(code):
if code=='aa':
return ['am']
if code in ['fa','so']:
return ['ar']
if code=='ku':
return ['ar','tr']
if code=='sk':
return ['cs']
if code=='nds':
return ['de','nl']
if code in ['als','lb']:
return ['de','fr']
if code in ['an','ast','ay','ca','gn','nah','qu']:
return ['es']
if code=='eu':
return ['es','fr']
if code=='gl':
return ['es','pt']
if code=='lad':
return ['es','he']
if code in ['br','ht','ln','lo','vi','wa']:
return ['fr']
if code in ['ie','oc']:
return ['ie','oc','fr']
if code=='co':
return ['fr','it']
if code in ['lmo','nap','sc','scn','vec']:
return ['it']
if code=='rm':
return ['it','de','fr']
if code=='fy':
return ['nl']
if code=='li':
return ['nl','de']
if code=='csb':
return ['pl']
if code in ['mo','roa-rup']:
return ['ro']
if code in ['av','be','cv','hy','lt','lv','tt','udm','uk']:
return ['ru']
if code=='got':
return ['ru','uk']
if code in ['kk','ky','tk','ug','uz']:
return ['tr','ru']
if code in ['bo','ja','ko','minnan','za','zh','zh-cn','zh-tw']:
return ['zh','zh-tw','zh-cn']
if code=='da':
return ['nb','no']
if code in ['is','no','nb','nn']:
return ['no','nb','nn','da','sv']
if code=='sv':
return ['da','no','nb']
if code=='se':
return ['no','nb','sv','nn','fi','da']
if code in ['bug','id','jv','ms','su']:
return ['id','ms','jv']
if code in ['bs','hr','mk','sh','sr']:
return ['sh','hr','sr','bs']
if code=='ia':
return ['la','es','fr','it']
if code=='sa':
return ['hi']
if code=='yi':
return ['he']
if code in ['ceb','war']:
return ['tl']
if code=='bi':
return ['tpi']
if code=='tpi':
return ['bi']
return []
def translate(code, dict):
"""
Given a language code and a dictionary, returns the dictionary's value for
key 'code' if this key exists; otherwise tries to return a value for an
alternative language that is most applicable to use on the Wikipedia in
language 'code'.
The language itself is always checked first, then languages that
have been defined to be alternatives, and finally English. If none of
the options gives result, we just take the first language in the
list.
"""
# If a site is given instead of a code, use its language
if hasattr(code,'lang'):
code = code.lang
if dict.has_key(code):
return dict[code]
for alt in altlang(code):
if dict.has_key(alt):
return dict[alt]
if dict.has_key('en'):
return dict['en']
return dict.values()[0]
def showDiff(oldtext, newtext):
"""
Prints a string showing the differences between oldtext and newtext.
The differences are highlighted (only on Unix systems) to show which
changes were made.
"""
# For information on difflib, see http://pydoc.org/2.3/difflib.html
color = {
'+': 10, # green
'-': 12 # red
}
diff = u''
colors = []
# This will store the last line beginning with + or -.
lastline = None
# For testing purposes only: show original, uncolored diff
# for line in difflib.ndiff(oldtext.splitlines(), newtext.splitlines()):
# print line
for line in difflib.ndiff(oldtext.splitlines(), newtext.splitlines()):
if line.startswith('?'):
# initialize color vector with None, which means default color
lastcolors = [None for c in lastline]
# colorize the + or - sign
lastcolors[0] = color[lastline[0]]
# colorize changed parts in red or green
for i in range(min(len(line), len(lastline))):
if line[i] != ' ':
lastcolors[i] = color[lastline[0]]
diff += lastline + '\n'
# append one None (default color) for the newline character
colors += lastcolors + [None]
elif lastline:
diff += lastline + '\n'
# colorize the + or - sign only
lastcolors = [None for c in lastline]
lastcolors[0] = color[lastline[0]]
colors += lastcolors + [None]
lastline = None
if line[0] in ('+', '-'):
lastline = line
# there might be one + or - line left that wasn't followed by a ? line.
if lastline:
diff += lastline + '\n'
# colorize the + or - sign only
lastcolors = [None for c in lastline]
lastcolors[0] = color[lastline[0]]
colors += lastcolors + [None]
output(diff, colors = colors)
def activateLog(logname):
global logfile
try:
logfile = codecs.open('logs/%s' % logname, 'a', 'utf-8')
except IOError:
logfile = codecs.open('logs/%s' % logname, 'w', 'utf-8')
def output(text, decoder = None, colors = [], newline = True):
"""
Works like print, but uses the encoding used by the user's console
(console_encoding in the configuration file) instead of ASCII.
If decoder is None, text should be a unicode string. Otherwise it
should be encoded in the given encoding.
colors is a list of integers, one for each character of text. If a
list entry is None, the default color will be used for the
character at that position.
If newline is True, a linebreak will be added after printing the text.
"""
if decoder:
text = unicode(text, decoder)
elif type(text) != type(u''):
print "DBG> BUG: Non-unicode passed to wikipedia.output without decoder!"
print traceback.print_stack()
print "DBG> Attempting to recover, but please report this problem"
try:
text = unicode(text, 'utf-8')
except UnicodeDecodeError:
text = unicode(text, 'iso8859-1')
if logfile:
# save the text in a logfile (will be written in utf-8)
logfile.write(text + '\n')
logfile.flush()
ui.output(text, colors = colors, newline = newline)
def input(question):
return ui.input(question)
def inputChoice(question, answers, hotkeys, default = None):
return ui.inputChoice(question, answers, hotkeys, default)
def showHelp(moduleName = None):
# the parameter moduleName is deprecated and should be left out.
moduleName = moduleName or sys.argv[0][:sys.argv[0].rindex('.')]
globalHelp =u'''
Global arguments available for all bots:
-lang:xx Set the language of the wiki you want to work on, overriding
the configuration in user-config.py. xx should be the
language code.
-family:xyz Set the family of the wiki you want to work on, e.g.
wikipedia, wiktionary, wikitravel, ...
This will override the configuration in user-config.py.
-log Enable the logfile. Logs will be stored in the logs
subdirectory.
-log:xyz Enable the logfile, using xyz as the filename.
-nolog Disable the logfile (if it's enabled by default).
-putthrottle:nn Set the minimum time (in seconds) the bot will wait between
saving pages.
'''
output(globalHelp)
try:
exec('import %s as module' % moduleName)
output(module.__doc__, 'utf-8')
except:
output(u'Sorry, no help available for %s' % moduleName)
def stopme():
"""This should be run when a bot does not interact with the Wiki, or
when it has stopped doing so. After a bot has run stopme() it will
not slow down other bots any more.
"""
get_throttle.drop()