Revision: 5037 Author: rotem Date: 2008-02-16 11:09:47 +0000 (Sat, 16 Feb 2008)
Log Message: ----------- Reorganization of the user data retrieving: moving it to a single function.
Modified Paths: -------------- trunk/pywikipedia/family.py trunk/pywikipedia/wikipedia.py
Modified: trunk/pywikipedia/family.py =================================================================== --- trunk/pywikipedia/family.py 2008-02-15 21:00:20 UTC (rev 5036) +++ trunk/pywikipedia/family.py 2008-02-16 11:09:47 UTC (rev 5037) @@ -2963,12 +2963,6 @@ found is an existing page, in case the normal regexp does not work.""" return None
- def sandboxpage(self, code): - """Give the title of a sandbox page for a given language. It should be - a page with no edit restrictions, that is, if we can edit any page, we - should be able to edit this page.""" - return "%s:Sandbox"%self.namespace(code,4) - def has_query_api(self,code): """Is query.php installed in the wiki?""" return False
Modified: trunk/pywikipedia/wikipedia.py =================================================================== --- trunk/pywikipedia/wikipedia.py 2008-02-15 21:00:20 UTC (rev 5036) +++ trunk/pywikipedia/wikipedia.py 2008-02-16 11:09:47 UTC (rev 5037) @@ -738,12 +738,6 @@ self.moveRestriction = m.group(1); else: self.moveRestriction = '' - # Look for the edit token - tokenloc = Rwatch.search(text) - if tokenloc: - self.site().putToken(tokenloc.group(1), sysop = sysop) - elif not self.site().getToken(getalways = False): - self.site().putToken('', sysop = sysop) if change_edit_time: # Get timestamps m = re.search('value="(\d+)" name=["']wpEdittime["']', text) @@ -885,7 +879,7 @@ ns = self.namespace() return ns >= 0 and ns % 2 == 1
- def botMayEdit(self): + def botMayEdit(self, username): """Return True if this page allows bots to edit it.
This will be True if the page doesn't contain {{bots}} or @@ -907,15 +901,6 @@ except (NoPage, IsRedirectPage, SectionError): return True
- try: - if self.editRestriction == 'sysop': - self.site().forceLogin(sysop=True) - else: - self.site().forceLogin() - username = self.site()._loggedInAs - except NoUsername: - username = None - for template in templates: if template[0] == 'Nobots': return False @@ -1109,6 +1094,49 @@ onlyTemplateInclusion, redirectsOnly): yield p
+ def _getActionUser(self, action, restriction = '', sysop = False): + """ + Get the user to do an action: sysop or not sysop, or raise an exception + if the user cannot do that. + + Parameters: + * action - the action done, which is the name of the right + * restriction - the restriction level or an empty string for no restriction + * sysop - initially use sysop user? + """ + # Login + self.site().forceLogin(sysop = sysop) + + # Check permissions + if not self.site().isAllowed(action, sysop): + if sysop: + raise LockedPage(u'The sysop user is not allowed to %s in site %s' % (action, self.site())) + else: + try: + user = self._getActionUser(action, restriction, sysop = True) + output(u'The user is not allowed to %s on site %s. Using sysop account.' % (action, self.site())) + return user + except NoUsername: + raise LockedPage(u'The user is not allowed to %s on site %s, and no sysop account is defined.' % (action, self.site())) + except LockedPage: + raise + + # Check restrictions + if not self.site().isAllowed(restriction, sysop): + if sysop: + raise LockedPage(u'Page on %s is locked in a way that sysop user cannot %s it' % (self.site(), action)) + else: + try: + user = self._getActionUser(action, restriction, sysop = True) + output(u'Page is locked on %s - cannot %s, using sysop account.' % (self.site(), action)) + return user + except NoUsername: + raise LockedPage(u'Page is locked on %s - cannot %s, and no sysop account is defined.' % (self.site(), action)) + except LockedPage: + raise + + return sysop + def put_async(self, newtext, comment=None, watchArticle=None, minorEdit=True, force=False, callback=None): @@ -1151,9 +1179,17 @@ force: ignore botMayEdit() setting
""" + # Login + try: + self.get() + except: + pass + sysop = self._getActionUser(action = 'edit', restriction = self.editRestriction, sysop = False) + username = self.site().loggedInAs() + # Determine if we are allowed to edit if not force: - if not self.botMayEdit(): + if not self.botMayEdit(username): raise LockedPage(u'Not allowed to edit %s because of a restricting template' % self.aslink())
# If there is an unchecked edit restriction, we need to load the page @@ -1163,17 +1199,6 @@ self._editrestriction = False # If no comment is given for the change, use the default comment = comment or action - # Check if using sysop account - sysop = False - if self.editRestriction == 'sysop': - try: - self.site().forceLogin(sysop = True) - output(u'Page is locked, using sysop account.') - sysop = True - except NoUsername: - raise LockedPage() - else: - self.site().forceLogin() if config.cosmetic_changes and not self.isTalkPage(): old = newtext if not config.cosmetic_changes_mylang_only or (self.site().family.name == config.family and self.site().lang == config.mylang): @@ -1191,14 +1216,6 @@ import watchlist watchArticle = watchlist.isWatched(self.title(), site = self.site()) newPage = not self.exists() - # If we are a sysop, we need to re-obtain the tokens. - if sysop: - if hasattr(self, '_contents'): del self._contents - try: - self.get(force = True, get_redirect = True, - change_edit_time = True, sysop = True) - except NoPage: - pass # if posting to an Esperanto wiki, we must e.g. write Bordeauxx instead # of Bordeaux if self.site().lang == 'eo': @@ -1206,18 +1223,12 @@ return self._putPage(newtext, comment, watchArticle, minorEdit, newPage, self.site().getToken(sysop = sysop), sysop = sysop)
def _putPage(self, text, comment=None, watchArticle=False, minorEdit=True, - newPage=False, token=None, gettoken=False, sysop=False): + newPage=False, token=None, newToken=False, sysop=False): """Upload 'text' as new content of Page by filling out the edit form.
Don't use this directly, use put() instead.
""" - newTokenRetrieved = False - if self.site().versionnumber() >= 4: - if gettoken or not token: - token = self.site().getToken(getagain = gettoken, sysop = sysop) - newTokenRetrieved = True - host = self.site().hostname() # Get the address of the page on that host. address = self.site().put_address(self.urlname()) @@ -1338,24 +1349,20 @@ # reasons why this didn't work, e.g. the page might be # locked via a cascade lock. try: - # Page is restricted - try using the sysop account, unless we're using one already + # Page is locked - try using the sysop account, unless we're using one already if sysop: # Unknown permissions error raise LockedPage() else: self.site().forceLogin(sysop = True) output(u'Page is locked, retrying using sysop account.') - return self._putPage(text, comment, watchArticle, - minorEdit, newPage, token=None, - gettoken=True, sysop=True) + return self._putPage(text, comment, watchArticle, minorEdit, newPage, token=self.site().getToken(sysop = True), sysop = True) except NoUsername: raise LockedPage() - if not newTokenRetrieved and "<textarea" in data: + if not newToken and "<textarea" in data: # We might have been using an outdated token output(u"Changing page has failed. Retrying.") - return self._putPage(text = text, comment = comment, - watchArticle = watchArticle, minorEdit = minorEdit, newPage = newPage, - token = None, gettoken = True, sysop = sysop) + return self._putPage(text, comment, watchArticle, minorEdit, newPage, token=self.site().getToken(sysop = sysop, getagain = True), newToken = True, sysop = sysop) if data.find("<title>Wikimedia Error</title>") > -1: output( u"Wikimedia has technical problems; will retry in %i minute%s." @@ -1381,6 +1388,10 @@ * page is unprotected, and bot has an account for this site, or * page is protected, and bot has a sysop account for this site. """ + try: + self.get() + except: + pass if self.editRestriction == 'sysop': userdict = config.sysopnames else: @@ -1897,17 +1908,23 @@ throttle=True, deleteAndMove=False, safe=True): """Move this page to new title given by newtitle. If safe, don't try to move and delete if not directly requested.""" + # Login + try: + self.get() + except: + pass + sysop = self._getActionUser(action = 'move', restriction = self.moveRestriction, sysop = False) + if deleteAndMove: + sysop = self._getActionUser(action = 'delete', restriction = '', sysop = True) + if throttle: put_throttle() if reason == None: reason = input(u'Please enter a reason for the move:') if self.isTalkPage(): movetalkpage = False - if deleteAndMove: - sysop = True host = self.site().hostname() address = self.site().move_address() - self.site().forceLogin(sysop = sysop) token = self.site().getToken(self, sysop = sysop) predata = { 'wpOldTitle': self.title().encode(self.site().encoding()), @@ -1973,6 +1990,18 @@ If the user does not have admin rights and mark is True, the page is marked for deletion instead. """ + # Login + try: + self._getActionUser(action = 'delete', sysop = True) + except NoUserName: + if mark and self.exists(): + text = self.get(get_redirect = True) + output(u'Cannot delete page %s - marking the page for deletion instead:' % self.aslink()) + self.put(u'{{delete}}\n%s ~~~~\n----\n\n%s' % (reason, text), comment = reason) + return + else: + raise + if throttle: put_throttle() if reason == None: @@ -1987,16 +2016,6 @@ host = self.site().hostname() address = self.site().delete_address(self.urlname())
- try: - self.site().forceLogin(sysop = True) - except NoUsername, error: - # user hasn't entered an admin username. - output(str(error)) - if mark and self.exists(): - text = self.get(get_redirect = True) - output(u'Marking the page for deletion instead:') - self.put(u'{{delete}}\n%s ~~~~\n----\n\n%s' % (reason, text), comment = reason) - return reason = reason.encode(self.site().encoding()) token = self.site().getToken(self, sysop = True) predata = { @@ -2044,11 +2063,13 @@ later on).
""" + # Login + self._getActionUser(action = 'deletedhistory', sysop = True) + #TODO: Handle image file revisions too. output(u'Loading list of deleted revisions for [[%s]]...' % self.title())
address = self.site().undelete_view_address(self.urlname()) - self.site().forceLogin(sysop = True) text = self.site().getUrl(address, sysop = True) #TODO: Handle non-existent pages etc
@@ -2081,9 +2102,11 @@ return None
if retrieveText and not self._deletedRevs[timestamp][3] and timestamp[:2]=='ts': + # Login + self._getActionUser(action = 'delete', sysop = True) + output(u'Retrieving text of deleted revision...') address = self.site().undelete_view_address(self.urlname(),timestamp) - self.site().forceLogin(sysop = True) text = self.site().getUrl(address, sysop = True) und = re.search('<textarea readonly="1" cols="80" rows="25">(.*?)</textarea><div><form method="post"',text,re.DOTALL) if und: @@ -2123,11 +2146,13 @@ pg.undelete('This will restore only selected revisions.')
""" + # Login + self._getActionUser(action = 'undelete', sysop = True) + if throttle: put_throttle()
address = self.site().undelete_address() - self.site().forceLogin(sysop = True) token = self.site().getToken(self, sysop=True)
formdata = { @@ -2160,6 +2185,9 @@ * 'sysop'
""" + # Login + self._getActionUser(action = 'protect', sysop = True) + address = self.site().protect_address(self.urlname()) if unprotect: address = self.site().unprotect_address(self.urlname()) @@ -2177,8 +2205,6 @@ if answer in ['y', 'Y']: host = self.site().hostname()
- self.site().forceLogin(sysop = True) - token = self.site().getToken(self, sysop = True)
#Translate 'none' to '' @@ -3731,11 +3757,14 @@ self._mediawiki_messages = {} self.nocapitalize = self.lang in self.family.nocapitalize self.user = user - self._token = None - self._sysoptoken = None - self.loginStatusKnown = {} - self._loggedInAs = None - self.userGroups = [] + self._userData = [False, False] + self._userName = [None, None] + self._isLoggedIn = [None, None] + self._isBlocked = [None, None] + self._messages = [None, None] + self._rights = [None, None] + self._token = [None, None] + self._cookies = [None, None] # Calculating valid languages took quite long, so we calculate it once # in initialization instead of each time it is used. self._validlanguages = [] @@ -3753,7 +3782,12 @@ # self.conn = httplib.HTTPSConnection(self.hostname()) self.persistent_http = False
- self.sandboxpage = Page(self, self.family.sandboxpage(code)) + def _userIndex(self, sysop = False): + """Returns the internal index of the user.""" + if sysop: + return 1 + else: + return 0
def loggedInAs(self, sysop = False): """Return the current username if logged in, otherwise return None. @@ -3763,52 +3797,57 @@ loading the test page is only required once.
""" - self._loadCookies(sysop = sysop) - if not self.loginStatusKnown: - output(u'Getting a page to check if we're logged in on %s' % self) - path = self.put_address('Non-existing_page') - text = self.getUrl(path, sysop = sysop) - # Search for the "my talk" link at the top - mytalkR = re.compile('<li id="pt-userpage"><a href=".+?">(?P<username>.+?)</a></li>') - m = mytalkR.search(text) - if m: - self.loginStatusKnown = True - self._loggedInAs = m.group('username') - # While we're at it, check if we have got unread messages - if '<div class="usermessage">' in text: - output(u'NOTE: You have unread messages on %s' % self) - messages=True - else: - messages=False - # Check whether we found a token - Rwatch = re.compile(r"<input type='hidden' value="(.*?)" name="wpEditToken"") - tokenloc = Rwatch.search(text) - if tokenloc: - self.putToken(tokenloc.group(1), sysop = sysop) - return self._loggedInAs + index = self._userIndex(sysop) + if self._isLoggedIn[index] is None: + # Load the details only if you don't know the login status. + # Don't load them just because the other details aren't known. + self._load(sysop = sysop) + if self._isLoggedIn[index]: + return self._userName[index] + else: + return None
def forceLogin(self, sysop = False): """Log the user in if not already logged in.""" if not self.loggedInAs(sysop = sysop): loginMan = login.LoginManager(site = self, sysop = sysop) if loginMan.login(retry = True): - self.loginStatusKnown = True - self._loggedInAs = loginMan.username + index = self._userIndex(sysop) + self._isLoggedIn[index] = True + self._userName[index] = loginMan.username + # We know nothing about the new user (but its name) + # Old info is about the anonymous user + self._userData[index] = False
def isBlocked(self, sysop = False): """Check if the user is blocked.""" - text = self.getUrl( "%saction=query&meta=userinfo&uiprop=blockinfo" % self.api_address(), sysop = sysop ); - return text.find( "blockedby=" ) > -1; + text = self.getUrl(u'%saction=query&meta=userinfo&uiprop=blockinfo' % self.api_address(), sysop = sysop) + return text.find('blockedby=') > -1
+ def isAllowed(self, right, sysop = False): + """Check if the user has a specific right. + Among possible rights: + * Actions: edit, move, delete, protect, upload + * User levels: autoconfirmed, sysop, bot, empty string (always true) + """ + if right == '': + return True + else: + self._load(sysop = sysop) + index = self._userIndex(sysop) + return right in self._rights[index] + def cookies(self, sysop = False): """Return a string containing the user's current cookies.""" - # TODO: cookie caching is disabled - #if not hasattr(self,'_cookies'): self._loadCookies(sysop = sysop) - return self._cookies + index = self._userIndex(sysop) + return self._cookies[index]
def _loadCookies(self, sysop = False): """Retrieve session cookies for login""" + index = self._userIndex(sysop) + if self._cookies[index] is not None: + return try: if sysop: try: @@ -3822,18 +3861,18 @@ else: username = config.usernames[self.family.name][self.lang] except KeyError: - self._cookies = None - self.loginStatusKnown = True + self._cookies[index] = None + self._isLoggedIn[index] = False else: tmp = '%s-%s-%s-login.data' % ( self.family.name, self.lang, username) fn = config.datafilepath('login-data', tmp) if not os.path.exists(fn): - self._cookies = None - self.loginStatusKnown = True + self._cookies[index] = None + self._isLoggedIn[index] = False else: f = open(fn) - self._cookies = '; '.join([x.strip() for x in f.readlines()]) + self._cookies[index] = '; '.join([x.strip() for x in f.readlines()]) f.close()
def urlEncode(self, query): @@ -3943,9 +3982,12 @@
if True: #not self.persistent_http: conn.close() + + # If a wiki page, get user data + self._getUserData(data, sysop = sysop) + return response, data
- r_userGroups = re.compile(ur'var wgUserGroups = (.*);') def getUrl(self, path, retry = True, sysop = False, data = None, compress = True): """ Low-level routine to get a URL from the wiki. @@ -4069,18 +4111,128 @@ # We use error='replace' in case of bad encoding. text = unicode(text, charset, errors = 'replace')
- # Try and see whether we can extract the user groups - match = self.r_userGroups.search(text) - if match: - self.userGroups = [] - if match.group(1) != 'null': - uG = match.group(1)[1:-1].split(', ') - for group in uG: - if group.strip('"') != '*': - self.userGroups.append(group.strip('"')) + # If a wiki page, get user data + self._getUserData(text, sysop = sysop)
return text
+ def _getUserData(self, text, sysop = False): + """ + Get the user data from a wiki page data. + + Parameters: + * text - the page text + * sysop - is the user a sysop? + """ + if '<div id="globalWrapper">' not in text: + # Not a wiki page + return + + index = self._userIndex(sysop) + + # Check for blocks + blocked = False + if self.versionnumber() >= 11: + if self._userData[index]: + # Don't check every time a page is loaded + blocked = self._isBlocked[index] + else: + blocked = self.isBlocked(sysop = sysop) + else: + blocked = self.mediawiki_message('blockedtitle') in text + if blocked and not self._isBlocked[index]: + # Write a warning if not shown earlier + if sysop: + account = 'Your sysop account' + else: + account = 'Your account' + output(u'WARNING: %s on %s is blocked. Editing using this account will stop the run.' % (account, self)) + self._isBlocked[index] = blocked + + # Check for new messages + if '<div class="usermessage">' in text: + if not self._messages[index]: + # User has *new* messages + if sysop: + output(u'NOTE: You have new messages in your sysop account on %s' % self) + else: + output(u'NOTE: You have new messages on %s' % self) + self._messages[index] = True + else: + self._messages[index] = False + + # Don't perform other checks if the data was already loaded + if self._userData[index]: + return + + # Search for the the user page link at the top. + # Note that the link of anonymous users (which doesn't exist at all + # in Wikimedia sites) has the ID pt-anonuserpage, and thus won't be + # found here. + userpageR = re.compile('<li id="pt-userpage"><a href=".+?">(?P<username>.+?)</a></li>') + m = userpageR.search(text) + if m: + self._isLoggedIn[index] = True + self._userName[index] = m.group('username') + else: + self._isLoggedIn[index] = False + # No idea what is the user name, and it isn't important + self._userName[index] = None + + # Check user groups, if possible (introduced in 1.10) + groupsR = re.compile(r'var wgUserGroups = ["(.+)"];') + m = groupsR.search(text) + if m: + rights = m.group(1) + rights = rights.split('", "') + if '*' in rights: + rights.remove('*') + self._rights[index] = rights + # Warnings + # Don't show warnings for not logged in users, they will just fail to + # do any action + if self._isLoggedIn[index]: + if 'bot' not in self._rights[index]: + if sysop: + output(u'Note: Your sysop account on %s does not have a bot flag. Its edits will be visible in the recent changes.' % self) + else: + output(u'WARNING: Your account on %s does not have a bot flag. Its edits will be visible in the recent changes and it may get blocked.' % self) + if sysop and 'sysop' not in self._rights[index]: + output(u'WARNING: Your sysop account on %s does not seem to have sysop rights. You may not be able to perform any sysop-restricted actions using it.' % self) + else: + # We don't have wgUserGroups, and can't check the rights + self._rights[index] = [] + if self._isLoggedIn[index]: + # Logged in user + self._rights[index].append('user') + # Assume bot, and thus autoconfirmed + self._rights[index].extend(['bot', 'autoconfirmed']) + if sysop: + # Assume user reported as a sysop indeed has the sysop rights + self._rights[index].append('sysop') + # Assume the user has the default rights + self._rights[index].extend(['read', 'createaccount', 'edit', 'upload', 'createpage', 'createtalk', 'move', 'upload']) + if 'bot' in self._rights[index] or 'sysop' in self._rights[index]: + self._rights[index].append('apihighlimits') + if 'sysop' in self._rights[index]: + self._rights[index].extend(['delete', 'undelete', 'block', 'protect', 'import', 'deletedhistory', 'unwatchedpages']) + + # Search for a token + tokenR = re.compile(r"<input type='hidden' value="(.*?)" name="wpEditToken"") + tokenloc = tokenR.search(text) + if tokenloc: + self._token[index] = tokenloc.group(1) + if self._rights[index] is not None: + # In this case, token and rights are loaded - user data is now loaded + self._userData[index] = True + else: + # Token not found + # Possible reason for this is the user is blocked, don't show a + # warning in this case, otherwise do show a warning + if not self._isBlocked[index]: + # Token not found + output(u'WARNING: Token not found on %s. You will not be able to edit any page.' % self) + def mediawiki_message(self, key): """Return the MediaWiki message text for key "key" """ global mwpage, tree @@ -4145,6 +4297,29 @@ except KeyError: return False
+ def _load(self, sysop = False): + """ + Loads user data. + This is only done if we didn't do get any page yet and the information + is requested, otherwise we should already have this data. + Parameters: + + * sysop - Get sysop user data? + """ + index = self._userIndex(sysop) + if self._userData[index]: + return + + if verbose: + output(u'Getting information for site %s' % self) + + # Get data + url = self.edit_address('Non-existing_page') + text = self.getUrl(url, sysop = sysop) + + # Parse data + self._getUserData(text, sysop = sysop) + def search(self, query, number = 10, namespaces = None): """Yield search results (using Special:Search page) for query.""" throttle = True @@ -5147,33 +5322,15 @@ raise NoPage
def getToken(self, getalways = True, getagain = False, sysop = False): - if getagain or (getalways and ((sysop and not self._sysoptoken) or (not sysop and not self._token))): - output(u"Getting page to get a token.") - try: - self.sandboxpage.get(force = True, get_redirect = True, sysop = sysop) - #Page(self, "Non-existing page").get(force = True, sysop = sysop) - except UserBlocked: - raise - except Error: - pass - if sysop: - if not self._sysoptoken: - return False - else: - return self._sysoptoken + index = self._userIndex(sysop) + if getagain or (getalways and self._token[index] is None): + output(u'Getting a token.') + self._load(sysop = sysop) + if self._token[index] is not None: + return self._token[index] else: - if not self._token: - return False - else: - return self._token + return False
- def putToken(self, value, sysop = False): - if sysop: - self._sysoptoken = value - else: - self._token = value - return - # Caches to provide faster access _sites = {} _namespaceCache = {}