http://www.mediawiki.org/wiki/Special:Code/pywikipedia/11611
Revision: 11611 Author: xqt Date: 2013-06-02 11:37:43 +0000 (Sun, 02 Jun 2013) Log Message: ----------- normalize username, new method exists(), some PEP8 changes
Modified Paths: -------------- trunk/pywikipedia/userlib.py
Modified: trunk/pywikipedia/userlib.py =================================================================== --- trunk/pywikipedia/userlib.py 2013-06-02 09:17:57 UTC (rev 11610) +++ trunk/pywikipedia/userlib.py 2013-06-02 11:37:43 UTC (rev 11611) @@ -3,7 +3,7 @@ Library to work with users, their pages and talk pages. """ # -# (C) Pywikipedia bot team, 2008-2012 +# (C) Pywikipedia bot team, 2008-2013 # # Distributed under the terms of the MIT license. # @@ -11,7 +11,8 @@
import re import wikipedia as pywikibot -import query, config +import config +import query
class AutoblockUser(pywikibot.Error): @@ -20,6 +21,7 @@ an action is requested on a virtual autoblock user that's not available for him (i.e. roughly everything except unblock). """ + pass
class UserActionRefuse(pywikibot.Error): pass @@ -82,6 +84,7 @@ self._name = name else: self._name = name[0].upper() + name[1:] + self._name = self._name.replace('_', ' ') # None means not loaded self._blocked = None self._groups = None @@ -89,7 +92,6 @@ #if self.site().versionnumber() >= 16: # self._urToken = None
- def site(self): return self._site
@@ -115,6 +117,9 @@ def isAnonymous(self): return ip_regexp.match(self.username.split("/")[0]) is not None
+ def exists(self): + return self.isAnonymous() or self.isRegistered() + def __str__(self): return (u'%s:%s' % (self.site(), self.name())).encode(config.console_encoding, @@ -125,7 +130,6 @@
def _load(self): getall(self.site(), [self], force=True) - return
def registrationTime(self, force=False): if self._registrationTime < 0 or force: @@ -185,10 +189,12 @@ if self._isAutoblock: #This user is probably being queried for purpose of lifting #an autoblock, so has no user pages per se. - raise AutoblockUser("This is an autoblock ID, you can only use to unblock it.") + raise AutoblockUser( + u"This is an autoblock ID, you can only use to unblock it.") if subpage: subpage = u'/' + subpage - return pywikibot.Page(self.site(), self.name() + subpage, defaultNamespace=2) + return pywikibot.Page(self.site(), self.name() + subpage, + defaultNamespace=2)
def getUserTalkPage(self, subpage=u''): """ Return a pywikibot.Page object corresponding to this user's main @@ -201,7 +207,8 @@ if self._isAutoblock: #This user is probably being queried for purpose of lifting #an autoblock, so has no user talk pages per se. - raise AutoblockUser("This is an autoblock ID, you can only use to unblock it.") + raise AutoblockUser( + u"This is an autoblock ID, you can only use to unblock it.") if subpage: subpage = u'/' + subpage return pywikibot.Page(self.site(), self.name() + subpage, @@ -251,19 +258,19 @@ return True return False
- def sendMailOld(self, subject = u'', text = u'', ccMe = False): + def sendMailOld(self, subject=u'', text=u'', ccMe=False): address = self.site().put_address('Special:EmailUser') predata = { - "wpSubject" : subject, - "wpText" : text, - 'wpSend' : "Send", - 'wpCCMe' : '0', + "wpSubject": subject, + "wpText": text, + 'wpSend': "Send", + 'wpCCMe': '0', } if ccMe: predata['wpCCMe'] = '1' predata['wpEditToken'] = self.site().getToken()
- response, data = self.site().postForm(address, predata, sysop = False) + response, data = self.site().postForm(address, predata, sysop=False) if data: if 'var wgAction = "success";' in data: pywikibot.output(u'Email sent.') @@ -275,7 +282,6 @@ pywikibot.output(u'No data found.') return False
- @pywikibot.deprecated('contributions()') def editedPages(self, limit=500): """ Deprecated function that wraps 'contributions' for backwards @@ -307,7 +313,7 @@ 'action': 'query', 'list': 'usercontribs', 'ucuser': self.name(), - 'ucprop': ['ids','title','timestamp','comment'],# 'size','flags'], + 'ucprop': ['ids', 'title', 'timestamp', 'comment'], # 'size','flags'], 'uclimit': limit, 'ucdir': 'older', } @@ -331,8 +337,7 @@ ts = pywikibot.parsetime2stamp(contrib['timestamp']) yield (pywikibot.Page(self.site(), contrib['title'], defaultNamespace=contrib['ns']), - contrib['revid'], ts, contrib.get('comment', None) - ) + contrib['revid'], ts, contrib.get('comment', None)) nbresults += 1 if nbresults >= limit: break @@ -358,15 +363,21 @@ yield c return
- for item in self.site().logpages(number, mode='upload', user=self.username, dump=True): - yield pywikibot.ImagePage(self.site(), item['title']), item['timestamp'], item['comment'], item['pageid'] > 0 + for item in self.site().logpages(number, mode='upload', + user=self.username, dump=True): + yield pywikibot.ImagePage(self.site(), item['title']), \ + item['timestamp'], item['comment'], item['pageid'] > 0 return
- def _uploadedImagesOld(self, number = 10): + def _uploadedImagesOld(self, number=10): """Yield ImagePages from Special:Log&type=upload"""
- regexp = re.compile('<li[^>]*>(?P<date>.+?)\s+<a href=.*?>(?P<user>.+?)</a> .* uploaded "<a href=".*?"(?P<new> class="new")? title="(Image|File):(?P<image>.+?)"\s*>(?:.*?<span class="comment">(?P<comment>.*?)</span>)?', re.UNICODE) - path = self.site().log_address(number, mode = 'upload', user = self.name()) + regexp = re.compile( + '<li[^>]*>(?P<date>.+?)\s+<a href=.*?>(?P<user>.+?)</a> ' + '.* uploaded "<a href=".*?"(?P<new> class="new")? ' + 'title="(Image|File):(?P<image>.+?)"\s*>' + '(?:.*?<span class="comment">(?P<comment>.*?)</span>)?', re.UNICODE) + path = self.site().log_address(number, mode='upload', user=self.name()) html = self.site().getUrl(path) redlink_key = self.site().mediawiki_message('red-link-title') redlink_tail_len = None @@ -382,11 +393,12 @@
date = m.group('date') comment = m.group('comment') or '' - yield pywikibot.ImagePage(self.site(), image), date, comment, deleted + yield pywikibot.ImagePage(self.site(), + image), date, comment, deleted
def block(self, expiry=None, reason=None, anon=True, noCreate=False, - onAutoblock=False, banMail=False, watchUser=False, allowUsertalk=True, - reBlock=False, hidename=False): + onAutoblock=False, banMail=False, watchUser=False, + allowUsertalk=True, reBlock=False, hidename=False): """ Block the user by API.
@@ -418,7 +430,8 @@ if not self.site().isAllowed('block', sysop=True): raise UserActionRefuse('You don't have permission to block') if not expiry: - expiry = pywikibot.input(u'Please enter the expiry time for the block:') + expiry = pywikibot.input( + u'Please enter the expiry time for the block:') if not reason: reason = pywikibot.input(u'Please enter a reason for the block:')
@@ -429,7 +442,7 @@ params = { 'action': 'block', 'user': self.name(), - 'token': self.site().getToken(self, sysop = True), + 'token': self.site().getToken(self, sysop=True), 'reason': reason, } if expiry: @@ -450,14 +463,14 @@ params['reblock'] = 1
data = query.GetData(params, self.site(), sysop=True) - if 'error' in data: #error occured + if 'error' in data: # error occured errCode = data['error']['code'] if errCode == 'alreadyblocked': raise AlreadyBlocked() elif errCode == 'blockedasrange': raise AlreadyBlocked("Range Blocked") - #elif errCode == 'invalidrange': - # pass +## elif errCode == 'invalidrange': +## pass elif errCode == 'invalidexpiry': raise BlockError("Invaild expiry") elif errCode == 'pastexpiry ': @@ -465,24 +478,24 @@ elif errCode == 'cantblock-email': raise BlockError("You don't have permission to ban mail")
- elif 'block' in data: #success - return True + elif 'block' in data: # success + return True else: pywikibot.output("Unknown Error, result: %s" % data) raise BlockError raise False
- def _blockOld(self, expiry, reason, anonOnly, noSignup, enableAutoblock, emailBan, - watchUser, allowUsertalk): + def _blockOld(self, expiry, reason, anonOnly, noSignup, enableAutoblock, + emailBan, watchUser, allowUsertalk): """ Internal use to block the user by web page. Don't use this function directly.
"""
- token = self.site().getToken(self, sysop = True) + token = self.site().getToken(self, sysop=True) pywikibot.output(u"Blocking [[User:%s]]..." % self.name()) - boolStr = ['0','1'] + boolStr = ['0', '1'] predata = { 'wpBlockAddress': self.name(), 'wpBlockOther': expiry, @@ -498,13 +511,15 @@ } address = self.site().block_address()
- response, data = self.site().postForm(address, predata, sysop = True) + response, data = self.site().postForm(address, predata, sysop=True) if data: - if self.site().mediawiki_message('ipb_already_blocked').replace('$1', self.name()) in data: + if self.site().mediawiki_message( + 'ipb_already_blocked').replace('$1', self.name()) in data: raise AlreadyBlockedError
raise BlockError return True + def unblock(self, reason=None): """ Unblock the user. @@ -519,38 +534,41 @@ blockID = self._getBlockID() if not self.site().has_api() or self.site().versionnumber() < 12: return self._unblockOld(blockID, reason) - self._unblock(blockID,reason) + self._unblock(blockID, reason)
def _getBlockID(self): pywikibot.output(u"Getting block id for [[User:%s]]..." % self.name()) if self.isAnonymous(): - usertype="ip" + usertype = ip" else: - usertype="users" + usertype = "users" if not self.site().has_api() or self.site().versionnumber() < 12: return getBlockIDOld() - data = self.site().blocksearch_address(self.name(),usertype) + data = self.site().blocksearch_address(self.name(), usertype) try: bIDre = data[1]["query"]["blocks"][0]["id"] except IndexError: pywikibot.output(data) raise BlockIDError return bIDre + def _unblock(self, blockID, reason): pywikibot.output(u"Unblocking [[User:%s]]..." % self.name()) params = { - 'action':'unblock', - 'reason':reason, + 'action': 'unblock', + 'reason': reason, 'id': blockID, - 'token': self.site().getToken(self, sysop = True), + 'token': self.site().getToken(self, sysop=True), } - data = query.GetData(params, self.site(), back_response=True,sysop=True) + data = query.GetData(params, self.site(), back_response=True, + sysop=True) if 'error' in data: pywikibot.output("Error happend: %s" % str(data['error'])) return True + def _unblockOld(self, blockID, reason): pywikibot.output(u"Unblocking [[User:%s]]..." % self.name()) - token = self.site().getToken(self, sysop = True) + token = self.site().getToken(self, sysop=True) predata = { 'id': blockID, 'wpUnblockReason': reason, @@ -558,12 +576,14 @@ 'wpEditToken': token, } address = self.site().unblock_address() - response, data = self.site().postForm(address, predata, sysop = True) + response, data = self.site().postForm(address, predata, sysop=True) if response.code != 302: - if self.site().mediawiki_message('ipb_cant_unblock').replace('$1',blockID) in data: + if self.site().mediawiki_message( + 'ipb_cant_unblock').replace('$1', blockID) in data: raise AlreadyUnblockedError raise UnblockError, data return True + def getBlockIDOld(self): self.family.blocksearch_address(self.lang, s) address = self.site().blocksearch_address(self.name()) @@ -573,6 +593,8 @@ pywikibot.output(data) raise BlockIDError return bIDre.group(1) + + def getall(site, users, throttle=True, force=False): """Bulk-retrieve users data from site
@@ -585,9 +607,9 @@ pywikibot.output(u'Getting %d users data from %s...' % (len(users), site))
- if len(users) > 250: # max load prevents HTTPError 400 + if len(users) > 250: # max load prevents HTTPError 400 for urg in range(0, len(users), 250): - if urg == range(0, len(users), 250)[-1]: #latest + if urg == range(0, len(users), 250)[-1]: # latest k = users[urg:] _GetAllUI(site, k, throttle, force).run() users[urg:] = k @@ -600,6 +622,7 @@
class _GetAllUI(object): + def __init__(self, site, users, throttle, force): self.site = site self.users = [] @@ -636,7 +659,8 @@ else: uj._groups = [] if x['registration']: - uj._registrationTime = pywikibot.parsetime2stamp(x['registration']) + uj._registrationTime = pywikibot.parsetime2stamp( + x['registration']) else: uj._registrationTime = 0 uj._mailable = ("emailable" in x) @@ -648,13 +672,16 @@ params = { 'action': 'query', 'list': 'users', - 'usprop': ['blockinfo', 'groups', 'editcount', 'registration', 'emailable', 'gender'], + 'usprop': ['blockinfo', 'groups', 'editcount', 'registration', + 'emailable', 'gender'], 'ususers': u'|'.join([n.name() for n in self.users]), } data = query.GetData(params, self.site) for user in data['query']['users']: if u'invalid' in user: - raise InvalidUser("User name '%s' is invalid. IP addresses are not supported." % user['name']) + raise InvalidUser( + "User name '%s' is invalid. IP addresses are not supported." + % user['name']) users[user['name']] = user return users