Revision: 8004 Author: xqt Date: 2010-03-13 16:37:27 +0000 (Sat, 13 Mar 2010)
Log Message: ----------- use site.has_api()
Modified Paths: -------------- trunk/pywikipedia/userlib.py
Modified: trunk/pywikipedia/userlib.py =================================================================== --- trunk/pywikipedia/userlib.py 2010-03-13 16:14:53 UTC (rev 8003) +++ trunk/pywikipedia/userlib.py 2010-03-13 16:37:27 UTC (rev 8004) @@ -12,8 +12,6 @@ import re import wikipedia, query
- - class AutoblockUser(wikipedia.Error): """ The class AutoblockUserError is an exception that is raised whenever @@ -63,48 +61,47 @@ #if self.site().versionnumber() >= 16: # self._urToken = None if name[0] == '#': - #This user is probably being queried for purpose of lifting an autoblock. - wikipedia.output("This is an autoblock ID, you can only use to unblock it.") - - + # This user is probably being queried for purpose of lifting an + # autoblock. + wikipedia.output( + "This is an autoblock ID, you can only use to unblock it.")
- def site(self): return self._site - + def name(self): return self._name - + def __str__(self): return u'%s:%s' % (self.site() , self.name() ) - + def __repr__(self): return self.__str__()
def _load(self): getall(self.site(), [self], force=True) return - + def registrationTime(self, force = False): if not hasattr(self, '_registrationTime') or force: self._load() return self._registrationTime - + def editCount(self, force = False): if not hasattr(self, '_editcount') or force: self._load() return self._editcount - + def isBlocked(self, force = False): if not self._blocked or force: self._load() return self._blocked - + def groups(self, force = False): if not self._groups or force: self._load() return self._groups - + def getUserPage(self, subpage=''): if self.name()[0] == '#': #This user is probably being queried for purpose of lifting @@ -135,15 +132,12 @@ self._load() if not self._mailable: raise UserActionRefuse("This user is not mailable") - if not self.site().isAllowed('sendemail'): raise UserActionRefuse("You don't have permission to send mail") - - if wikipedia.config.use_api and self.site().versionnumber() >= 14: - pass # will handle NotImplementedError later - else: + + if not self.site().has_api() or self.site().versionnumber() < 14: return self.sendMailOld(subject, text, ccMe) - + params = { 'action': 'emailuser', 'target': self.name(), @@ -153,22 +147,19 @@ } if ccMe: params['ccme'] = 1 - result = query.GetData(params, self.site()) if 'error' in result: code = result['error']['code'] if code == 'usermaildisabled ': wikipedia.output("User mail has been disabled") #elif code == '': - # - + # elif 'emailuser' in result: if result['emailuser']['result'] == 'Success': wikipedia.output(u'Email sent.') return True - return False - + def sendMailOld(self, subject = u'', text = u'', ccMe = False): addr = self.site().put_address('Special:EmailUser') predata = { @@ -179,11 +170,9 @@ } if ccMe: predata['wpCCMe'] = '1' - predata['wpEditToken'] = self.site().getToken() - - response, data = self.site().postForm(address, predata, sysop = False)
+ response, data = self.site().postForm(address, predata, sysop = False) if data: if 'var wgAction = "success";' in data: wikipedia.output(u'Email sent.') @@ -200,11 +189,12 @@ """ Yields pages that the user has edited, with an upper bound of ``limit''. Pages returned are not guaranteed to be unique (straight Special:Contributions parsing, in chunks of 500 items).""" + if not self.site().has_api(): + raise NotImplementedError # please stay this in comment until the regex is fixed - #if wikipedia.config.use_api: - #for pg, oldid, date, comment in self._ContributionsOld(limit): - # yield pg, oldid, date, comment - #return + # for pg, oldid, date, comment in self._ContributionsOld(limit): + # yield pg, oldid, date, comment + # return
params = { 'action': 'query', @@ -218,7 +208,6 @@ params['uclimit'] = wikipedia.config.special_page_limit if limit > 5000 and self.site().isAllowed('apihighlimits'): params['uclimit'] = 5000 - if namespace: params['ucnamespace'] = namespace # An user is likely to contribute on several pages, @@ -250,29 +239,23 @@ #This user is probably being queried for purpose of lifting #an autoblock, so has no contribs. raise AutoblockUser - # #TODO: fix contribRX regex # offset = 0 step = min(limit,500) older_str = None - if self.site().versionnumber() <= 11: older_str = self.site().mediawiki_message('sp-contributions-older') else: older_str = self.site().mediawiki_message('pager-older-n') - if older_str.startswith('{{PLURAL:$1'): older_str = older_str[13:] older_str = older_str[older_str.find('|')+1:] older_str = older_str[:-2] older_str = older_str.replace('$1',str(step)) - address = self.site().contribs_address(self.name(),limit=step) contribRX = re.compile(r'<li[^>]*> *<a href="(?P<url>[^"]*?)" title="[^"]+">(?P<date>[^<]+)</a>.*>%s</a>) *(<span class="[^"]+">[A-Za-z]</span>)* *<a href="[^"]+" (class="[^"]+" )?title="[^"]+">(?P<title>[^<]+)</a> *(?P<comment>.*?)(?P<top><strong> *(top) *</strong>)? *(<span class="mw-rollback-link">[<a href="[^"]+token=(?P<rollbackToken>[^"]+)%2B%5C".*%s</a>]</span>)? *</li>' % (self.site().mediawiki_message('diff'),self.site().mediawiki_message('rollback') ) ) - - while offset < limit: data = self.site().getUrl(address) for pg in contribRX.finditer(data): @@ -284,7 +267,6 @@ top = None if pg.group('top'): top = True - # top, new, minor, should all go in a flags field yield wikipedia.Page(self.site(), pg.group('title')), oldid, date, comment
@@ -296,19 +278,13 @@ address = nextRX.group('address').replace('&','&') else: break - + def uploadedImages(self, number = 10): - try: - if wikipedia.config.use_api and self.site().versionnumber() >= 11: - apitest = self.site().api_address() - del apitest - else: - raise NotImplementedError #No enable api or version not support - except NotImplementedError: + if not self.site().has_api() or self.site().versionnumber() < 11: for p,t,c,a in self._uploadedImagesOld(number): yield p,t,c,a return - + params = { 'action': 'query', 'list': 'logevents', @@ -322,29 +298,24 @@ for info in data['query']['logevents']: count += 1 yield wikipedia.ImagePage(self.site(), info['title']), info['timestamp'], info['comment'], False - + if count >= number: break - if 'query-continue' in data and count < number: params['lestart'] = data['query-continue']['logevents']['lestart'] else: break - - + def _uploadedImagesOld(self, number = 10): """Yield ImagePages from Special:Log&type=upload"""
regexp = re.compile('<li[^>]*>(?P<date>.+?)\s+<a href=.*?>(?P<user>.+?)</a> .* uploaded "<a href=".*?"(?P<new> class="new")? title="(Image|File):(?P<image>.+?)"\s*>(?:.*?<span class="comment">(?P<comment>.*?)</span>)?', re.UNICODE) - path = self.site().log_address(number, mode = 'upload', user = self.name()) html = self.site().getUrl(path) - redlink_key = self.site().mediawiki_message('red-link-title') redlink_tail_len = None if redlink_key.startswith('$1 '): redlink_tail_len = len(redlink_key[3:]) - for m in regexp.finditer(html): image = m.group('image') deleted = False @@ -355,7 +326,6 @@
date = m.group('date') comment = m.group('comment') or '' - yield wikipedia.ImagePage(self.site(), image), date, comment, deleted
def block(self, expiry = None, reason = None, anon= True, noCreate = False, @@ -384,24 +354,17 @@ raise AutoblockUser if self.isBlocked() and not reBlock: raise AlreadyBlocked() - + self.site()._getActionUser('block', sysop=True) - if not expiry: expiry = wikipedia.input(u'Please enter the expiry time for the block:') if not reason: reason = wikipedia.input(u'Please enter a reason for the block:') - - try: - if wikipedia.config.use_api and self.site().versionnumber() >= 12: - x = self.site().api_address() - del x - else: - raise NotImplementedError - except NotImplementedError: + + if not self.site().has_api() or self.site().versionnumber() < 12: return self._blockOld(expiry, reason, anon, noCreate, - onAutoblock, banMail, watchUser, allowUsertalk, reBlock) - + onAutoblock, banMail, watchUser, + allowUsertalk, reBlock) params = { 'action': 'block', 'user': self.name(), @@ -425,7 +388,7 @@ params['reblock'] = 1 if allowUsertalk: params['allowusertalk'] = 1 - + data = query.GetData(params, self.site(), sysop=True) if 'error' in data: #error occured errCode = data['error']['code'] @@ -441,7 +404,7 @@ raise BlockError("expiry time is the past") elif errCode == 'cantblock-email': raise BlockError("You don't have permission to ban mail") - + elif 'block' in data: #success return True else: @@ -461,16 +424,12 @@ #an autoblock, so can't be blocked. raise AutoblockUser sefl.site()._getActionUser('block', sysop=True) - if expiry is None: expiry = input(u'Please enter the expiry time for the block:') if reason is None: reason = input(u'Please enter a reason for the block:') - token = self.site().getToken(self, sysop = True) - wikipedia.output(u"Blocking [[User:%s]]..." % self.name()) - boolStr = ['0','1'] predata = { 'wpBlockAddress': self.name(), @@ -485,10 +444,9 @@ 'wpBlock': 'Block this user', 'wpEditToken': token } - address = self.site().block_address() - response, data = self.site().postForm(address, predata, sysop = True)
+ response, data = self.site().postForm(address, predata, sysop = True) if data: if self.site().mediawiki_message('ipb_already_blocked').replace('$1', self.name()) in data: raise AlreadyBlockedError @@ -508,26 +466,21 @@ blockID = self.name()[1:] else: blockID = self._getBlockID() - self._unblock(blockID,reason)
def _getBlockID(self): wikipedia.output(u"Getting block id for [[User:%s]]..." % self.name()) - address = self.site().blocksearch_address(self.name()) data = self.site().getUrl(address) bIDre = re.search(r'action=unblock&id=(\d+)', data) if not bIDre: wikipedia.output(data) raise BlockIDError - return bIDre.group(1)
def _unblock(self, blockID, reason): wikipedia.output(u"Unblocking [[User:%s]]..." % self.name()) - token = self.site().getToken(self, sysop = True) - predata = { 'id': blockID, 'wpUnblockReason': reason, @@ -535,7 +488,6 @@ 'wpEditToken': token, } address = self.site().unblock_address() - response, data = self.site().postForm(address, predata, sysop = True) if response.code != 302: if self.site().mediawiki_message('ipb_cant_unblock').replace('$1',blockID) in data: @@ -573,13 +525,12 @@ self.throttle = throttle self.force = force self.sleeptime = 15 - for user in users: if not hasattr(user, '_editcount') or force: self.users.append(user) elif wikipedia.verbose: wikipedia.output(u"BUGWARNING: %s already done!" % user.name()) - + def run(self): if self.users: while True: @@ -608,7 +559,7 @@ uj._mailable = ("emailable" in x) uj._blocked = ('blockedby' in x) #if self._blocked: #Get block ID - + def getData(self): users = {} params = { @@ -637,7 +588,6 @@ >>> wikipedia.output(exampleUser.getUserPage('Lipsum').get()) >>> wikipedia.output(exampleUser.getUserTalkPage().get()) """) - # unit tests import tests.test_userlib import unittest