Revision: 5037
Author: rotem
Date: 2008-02-16 11:09:47 +0000 (Sat, 16 Feb 2008)
Log Message:
-----------
Reorganization of the user data retrieving: moving it to a single function.
Modified Paths:
--------------
trunk/pywikipedia/family.py
trunk/pywikipedia/wikipedia.py
Modified: trunk/pywikipedia/family.py
===================================================================
--- trunk/pywikipedia/family.py 2008-02-15 21:00:20 UTC (rev 5036)
+++ trunk/pywikipedia/family.py 2008-02-16 11:09:47 UTC (rev 5037)
@@ -2963,12 +2963,6 @@
found is an existing page, in case the normal regexp does not work."""
return None
- def sandboxpage(self, code):
- """Give the title of a sandbox page for a given language. It should be
- a page with no edit restrictions, that is, if we can edit any page, we
- should be able to edit this page."""
- return "%s:Sandbox"%self.namespace(code,4)
-
def has_query_api(self,code):
"""Is query.php installed in the wiki?"""
return False
Modified: trunk/pywikipedia/wikipedia.py
===================================================================
--- trunk/pywikipedia/wikipedia.py 2008-02-15 21:00:20 UTC (rev 5036)
+++ trunk/pywikipedia/wikipedia.py 2008-02-16 11:09:47 UTC (rev 5037)
@@ -738,12 +738,6 @@
self.moveRestriction = m.group(1);
else:
self.moveRestriction = ''
- # Look for the edit token
- tokenloc = Rwatch.search(text)
- if tokenloc:
- self.site().putToken(tokenloc.group(1), sysop = sysop)
- elif not self.site().getToken(getalways = False):
- self.site().putToken('', sysop = sysop)
if change_edit_time:
# Get timestamps
m = re.search('value="(\d+)" name=["\']wpEdittime["\']', text)
@@ -885,7 +879,7 @@
ns = self.namespace()
return ns >= 0 and ns % 2 == 1
- def botMayEdit(self):
+ def botMayEdit(self, username):
"""Return True if this page allows bots to edit it.
This will be True if the page doesn't contain {{bots}} or
@@ -907,15 +901,6 @@
except (NoPage, IsRedirectPage, SectionError):
return True
- try:
- if self.editRestriction == 'sysop':
- self.site().forceLogin(sysop=True)
- else:
- self.site().forceLogin()
- username = self.site()._loggedInAs
- except NoUsername:
- username = None
-
for template in templates:
if template[0] == 'Nobots':
return False
@@ -1109,6 +1094,49 @@
onlyTemplateInclusion, redirectsOnly):
yield p
+ def _getActionUser(self, action, restriction = '', sysop = False):
+ """
+ Get the user to do an action: sysop or not sysop, or raise an exception
+ if the user cannot do that.
+
+ Parameters:
+ * action - the action done, which is the name of the right
+ * restriction - the restriction level or an empty string for no restriction
+ * sysop - initially use sysop user?
+ """
+ # Login
+ self.site().forceLogin(sysop = sysop)
+
+ # Check permissions
+ if not self.site().isAllowed(action, sysop):
+ if sysop:
+ raise LockedPage(u'The sysop user is not allowed to %s in site %s' % (action, self.site()))
+ else:
+ try:
+ user = self._getActionUser(action, restriction, sysop = True)
+ output(u'The user is not allowed to %s on site %s. Using sysop account.' % (action, self.site()))
+ return user
+ except NoUsername:
+ raise LockedPage(u'The user is not allowed to %s on site %s, and no sysop account is defined.' % (action, self.site()))
+ except LockedPage:
+ raise
+
+ # Check restrictions
+ if not self.site().isAllowed(restriction, sysop):
+ if sysop:
+ raise LockedPage(u'Page on %s is locked in a way that sysop user cannot %s it' % (self.site(), action))
+ else:
+ try:
+ user = self._getActionUser(action, restriction, sysop = True)
+ output(u'Page is locked on %s - cannot %s, using sysop account.' % (self.site(), action))
+ return user
+ except NoUsername:
+ raise LockedPage(u'Page is locked on %s - cannot %s, and no sysop account is defined.' % (self.site(), action))
+ except LockedPage:
+ raise
+
+ return sysop
+
def put_async(self, newtext,
comment=None, watchArticle=None, minorEdit=True, force=False,
callback=None):
@@ -1151,9 +1179,17 @@
force: ignore botMayEdit() setting
"""
+ # Login
+ try:
+ self.get()
+ except:
+ pass
+ sysop = self._getActionUser(action = 'edit', restriction = self.editRestriction, sysop = False)
+ username = self.site().loggedInAs()
+
# Determine if we are allowed to edit
if not force:
- if not self.botMayEdit():
+ if not self.botMayEdit(username):
raise LockedPage(u'Not allowed to edit %s because of a restricting template' % self.aslink())
# If there is an unchecked edit restriction, we need to load the page
@@ -1163,17 +1199,6 @@
self._editrestriction = False
# If no comment is given for the change, use the default
comment = comment or action
- # Check if using sysop account
- sysop = False
- if self.editRestriction == 'sysop':
- try:
- self.site().forceLogin(sysop = True)
- output(u'Page is locked, using sysop account.')
- sysop = True
- except NoUsername:
- raise LockedPage()
- else:
- self.site().forceLogin()
if config.cosmetic_changes and not self.isTalkPage():
old = newtext
if not config.cosmetic_changes_mylang_only or (self.site().family.name == config.family and self.site().lang == config.mylang):
@@ -1191,14 +1216,6 @@
import watchlist
watchArticle = watchlist.isWatched(self.title(), site = self.site())
newPage = not self.exists()
- # If we are a sysop, we need to re-obtain the tokens.
- if sysop:
- if hasattr(self, '_contents'): del self._contents
- try:
- self.get(force = True, get_redirect = True,
- change_edit_time = True, sysop = True)
- except NoPage:
- pass
# if posting to an Esperanto wiki, we must e.g. write Bordeauxx instead
# of Bordeaux
if self.site().lang == 'eo':
@@ -1206,18 +1223,12 @@
return self._putPage(newtext, comment, watchArticle, minorEdit, newPage, self.site().getToken(sysop = sysop), sysop = sysop)
def _putPage(self, text, comment=None, watchArticle=False, minorEdit=True,
- newPage=False, token=None, gettoken=False, sysop=False):
+ newPage=False, token=None, newToken=False, sysop=False):
"""Upload 'text' as new content of Page by filling out the edit form.
Don't use this directly, use put() instead.
"""
- newTokenRetrieved = False
- if self.site().versionnumber() >= 4:
- if gettoken or not token:
- token = self.site().getToken(getagain = gettoken, sysop = sysop)
- newTokenRetrieved = True
-
host = self.site().hostname()
# Get the address of the page on that host.
address = self.site().put_address(self.urlname())
@@ -1338,24 +1349,20 @@
# reasons why this didn't work, e.g. the page might be
# locked via a cascade lock.
try:
- # Page is restricted - try using the sysop account, unless we're using one already
+ # Page is locked - try using the sysop account, unless we're using one already
if sysop:
# Unknown permissions error
raise LockedPage()
else:
self.site().forceLogin(sysop = True)
output(u'Page is locked, retrying using sysop account.')
- return self._putPage(text, comment, watchArticle,
- minorEdit, newPage, token=None,
- gettoken=True, sysop=True)
+ return self._putPage(text, comment, watchArticle, minorEdit, newPage, token=self.site().getToken(sysop = True), sysop = True)
except NoUsername:
raise LockedPage()
- if not newTokenRetrieved and "<textarea" in data:
+ if not newToken and "<textarea" in data:
# We might have been using an outdated token
output(u"Changing page has failed. Retrying.")
- return self._putPage(text = text, comment = comment,
- watchArticle = watchArticle, minorEdit = minorEdit, newPage = newPage,
- token = None, gettoken = True, sysop = sysop)
+ return self._putPage(text, comment, watchArticle, minorEdit, newPage, token=self.site().getToken(sysop = sysop, getagain = True), newToken = True, sysop = sysop)
if data.find("<title>Wikimedia Error</title>") > -1:
output(
u"Wikimedia has technical problems; will retry in %i minute%s."
@@ -1381,6 +1388,10 @@
* page is unprotected, and bot has an account for this site, or
* page is protected, and bot has a sysop account for this site.
"""
+ try:
+ self.get()
+ except:
+ pass
if self.editRestriction == 'sysop':
userdict = config.sysopnames
else:
@@ -1897,17 +1908,23 @@
throttle=True, deleteAndMove=False, safe=True):
"""Move this page to new title given by newtitle. If safe, don't try
to move and delete if not directly requested."""
+ # Login
+ try:
+ self.get()
+ except:
+ pass
+ sysop = self._getActionUser(action = 'move', restriction = self.moveRestriction, sysop = False)
+ if deleteAndMove:
+ sysop = self._getActionUser(action = 'delete', restriction = '', sysop = True)
+
if throttle:
put_throttle()
if reason == None:
reason = input(u'Please enter a reason for the move:')
if self.isTalkPage():
movetalkpage = False
- if deleteAndMove:
- sysop = True
host = self.site().hostname()
address = self.site().move_address()
- self.site().forceLogin(sysop = sysop)
token = self.site().getToken(self, sysop = sysop)
predata = {
'wpOldTitle': self.title().encode(self.site().encoding()),
@@ -1973,6 +1990,18 @@
If the user does not have admin rights and mark is True,
the page is marked for deletion instead.
"""
+ # Login
+ try:
+ self._getActionUser(action = 'delete', sysop = True)
+ except NoUserName:
+ if mark and self.exists():
+ text = self.get(get_redirect = True)
+ output(u'Cannot delete page %s - marking the page for deletion instead:' % self.aslink())
+ self.put(u'{{delete}}\n%s ~~~~\n----\n\n%s' % (reason, text), comment = reason)
+ return
+ else:
+ raise
+
if throttle:
put_throttle()
if reason == None:
@@ -1987,16 +2016,6 @@
host = self.site().hostname()
address = self.site().delete_address(self.urlname())
- try:
- self.site().forceLogin(sysop = True)
- except NoUsername, error:
- # user hasn't entered an admin username.
- output(str(error))
- if mark and self.exists():
- text = self.get(get_redirect = True)
- output(u'Marking the page for deletion instead:')
- self.put(u'{{delete}}\n%s ~~~~\n----\n\n%s' % (reason, text), comment = reason)
- return
reason = reason.encode(self.site().encoding())
token = self.site().getToken(self, sysop = True)
predata = {
@@ -2044,11 +2063,13 @@
later on).
"""
+ # Login
+ self._getActionUser(action = 'deletedhistory', sysop = True)
+
#TODO: Handle image file revisions too.
output(u'Loading list of deleted revisions for [[%s]]...' % self.title())
address = self.site().undelete_view_address(self.urlname())
- self.site().forceLogin(sysop = True)
text = self.site().getUrl(address, sysop = True)
#TODO: Handle non-existent pages etc
@@ -2081,9 +2102,11 @@
return None
if retrieveText and not self._deletedRevs[timestamp][3] and timestamp[:2]=='ts':
+ # Login
+ self._getActionUser(action = 'delete', sysop = True)
+
output(u'Retrieving text of deleted revision...')
address = self.site().undelete_view_address(self.urlname(),timestamp)
- self.site().forceLogin(sysop = True)
text = self.site().getUrl(address, sysop = True)
und = re.search('<textarea readonly="1" cols="80" rows="25">(.*?)</textarea><div><form method="post"',text,re.DOTALL)
if und:
@@ -2123,11 +2146,13 @@
pg.undelete('This will restore only selected revisions.')
"""
+ # Login
+ self._getActionUser(action = 'undelete', sysop = True)
+
if throttle:
put_throttle()
address = self.site().undelete_address()
- self.site().forceLogin(sysop = True)
token = self.site().getToken(self, sysop=True)
formdata = {
@@ -2160,6 +2185,9 @@
* 'sysop'
"""
+ # Login
+ self._getActionUser(action = 'protect', sysop = True)
+
address = self.site().protect_address(self.urlname())
if unprotect:
address = self.site().unprotect_address(self.urlname())
@@ -2177,8 +2205,6 @@
if answer in ['y', 'Y']:
host = self.site().hostname()
- self.site().forceLogin(sysop = True)
-
token = self.site().getToken(self, sysop = True)
#Translate 'none' to ''
@@ -3731,11 +3757,14 @@
self._mediawiki_messages = {}
self.nocapitalize = self.lang in self.family.nocapitalize
self.user = user
- self._token = None
- self._sysoptoken = None
- self.loginStatusKnown = {}
- self._loggedInAs = None
- self.userGroups = []
+ self._userData = [False, False]
+ self._userName = [None, None]
+ self._isLoggedIn = [None, None]
+ self._isBlocked = [None, None]
+ self._messages = [None, None]
+ self._rights = [None, None]
+ self._token = [None, None]
+ self._cookies = [None, None]
# Calculating valid languages took quite long, so we calculate it once
# in initialization instead of each time it is used.
self._validlanguages = []
@@ -3753,7 +3782,12 @@
# self.conn = httplib.HTTPSConnection(self.hostname())
self.persistent_http = False
- self.sandboxpage = Page(self, self.family.sandboxpage(code))
+ def _userIndex(self, sysop = False):
+ """Returns the internal index of the user."""
+ if sysop:
+ return 1
+ else:
+ return 0
def loggedInAs(self, sysop = False):
"""Return the current username if logged in, otherwise return None.
@@ -3763,52 +3797,57 @@
loading the test page is only required once.
"""
- self._loadCookies(sysop = sysop)
- if not self.loginStatusKnown:
- output(u'Getting a page to check if we\'re logged in on %s' % self)
- path = self.put_address('Non-existing_page')
- text = self.getUrl(path, sysop = sysop)
- # Search for the "my talk" link at the top
- mytalkR = re.compile('<li id="pt-userpage"><a href=".+?">(?P<username>.+?)</a></li>')
- m = mytalkR.search(text)
- if m:
- self.loginStatusKnown = True
- self._loggedInAs = m.group('username')
- # While we're at it, check if we have got unread messages
- if '<div class="usermessage">' in text:
- output(u'NOTE: You have unread messages on %s' % self)
- messages=True
- else:
- messages=False
- # Check whether we found a token
- Rwatch = re.compile(r"\<input type='hidden' value=\"(.*?)\" name=\"wpEditToken\"")
- tokenloc = Rwatch.search(text)
- if tokenloc:
- self.putToken(tokenloc.group(1), sysop = sysop)
- return self._loggedInAs
+ index = self._userIndex(sysop)
+ if self._isLoggedIn[index] is None:
+ # Load the details only if you don't know the login status.
+ # Don't load them just because the other details aren't known.
+ self._load(sysop = sysop)
+ if self._isLoggedIn[index]:
+ return self._userName[index]
+ else:
+ return None
def forceLogin(self, sysop = False):
"""Log the user in if not already logged in."""
if not self.loggedInAs(sysop = sysop):
loginMan = login.LoginManager(site = self, sysop = sysop)
if loginMan.login(retry = True):
- self.loginStatusKnown = True
- self._loggedInAs = loginMan.username
+ index = self._userIndex(sysop)
+ self._isLoggedIn[index] = True
+ self._userName[index] = loginMan.username
+ # We know nothing about the new user (but its name)
+ # Old info is about the anonymous user
+ self._userData[index] = False
def isBlocked(self, sysop = False):
"""Check if the user is blocked."""
- text = self.getUrl( "%saction=query&meta=userinfo&uiprop=blockinfo" % self.api_address(), sysop = sysop );
- return text.find( "blockedby=" ) > -1;
+ text = self.getUrl(u'%saction=query&meta=userinfo&uiprop=blockinfo' % self.api_address(), sysop = sysop)
+ return text.find('blockedby=') > -1
+ def isAllowed(self, right, sysop = False):
+ """Check if the user has a specific right.
+ Among possible rights:
+ * Actions: edit, move, delete, protect, upload
+ * User levels: autoconfirmed, sysop, bot, empty string (always true)
+ """
+ if right == '':
+ return True
+ else:
+ self._load(sysop = sysop)
+ index = self._userIndex(sysop)
+ return right in self._rights[index]
+
def cookies(self, sysop = False):
"""Return a string containing the user's current cookies."""
- # TODO: cookie caching is disabled
- #if not hasattr(self,'_cookies'):
self._loadCookies(sysop = sysop)
- return self._cookies
+ index = self._userIndex(sysop)
+ return self._cookies[index]
def _loadCookies(self, sysop = False):
"""Retrieve session cookies for login"""
+ index = self._userIndex(sysop)
+ if self._cookies[index] is not None:
+ return
try:
if sysop:
try:
@@ -3822,18 +3861,18 @@
else:
username = config.usernames[self.family.name][self.lang]
except KeyError:
- self._cookies = None
- self.loginStatusKnown = True
+ self._cookies[index] = None
+ self._isLoggedIn[index] = False
else:
tmp = '%s-%s-%s-login.data' % (
self.family.name, self.lang, username)
fn = config.datafilepath('login-data', tmp)
if not os.path.exists(fn):
- self._cookies = None
- self.loginStatusKnown = True
+ self._cookies[index] = None
+ self._isLoggedIn[index] = False
else:
f = open(fn)
- self._cookies = '; '.join([x.strip() for x in f.readlines()])
+ self._cookies[index] = '; '.join([x.strip() for x in f.readlines()])
f.close()
def urlEncode(self, query):
@@ -3943,9 +3982,12 @@
if True: #not self.persistent_http:
conn.close()
+
+ # If a wiki page, get user data
+ self._getUserData(data, sysop = sysop)
+
return response, data
- r_userGroups = re.compile(ur'var wgUserGroups \= (.*)\;')
def getUrl(self, path, retry = True, sysop = False, data = None, compress = True):
"""
Low-level routine to get a URL from the wiki.
@@ -4069,18 +4111,128 @@
# We use error='replace' in case of bad encoding.
text = unicode(text, charset, errors = 'replace')
- # Try and see whether we can extract the user groups
- match = self.r_userGroups.search(text)
- if match:
- self.userGroups = []
- if match.group(1) != 'null':
- uG = match.group(1)[1:-1].split(', ')
- for group in uG:
- if group.strip('"') != '*':
- self.userGroups.append(group.strip('"'))
+ # If a wiki page, get user data
+ self._getUserData(text, sysop = sysop)
return text
+ def _getUserData(self, text, sysop = False):
+ """
+ Get the user data from a wiki page data.
+
+ Parameters:
+ * text - the page text
+ * sysop - is the user a sysop?
+ """
+ if '<div id="globalWrapper">' not in text:
+ # Not a wiki page
+ return
+
+ index = self._userIndex(sysop)
+
+ # Check for blocks
+ blocked = False
+ if self.versionnumber() >= 11:
+ if self._userData[index]:
+ # Don't check every time a page is loaded
+ blocked = self._isBlocked[index]
+ else:
+ blocked = self.isBlocked(sysop = sysop)
+ else:
+ blocked = self.mediawiki_message('blockedtitle') in text
+ if blocked and not self._isBlocked[index]:
+ # Write a warning if not shown earlier
+ if sysop:
+ account = 'Your sysop account'
+ else:
+ account = 'Your account'
+ output(u'WARNING: %s on %s is blocked. Editing using this account will stop the run.' % (account, self))
+ self._isBlocked[index] = blocked
+
+ # Check for new messages
+ if '<div class="usermessage">' in text:
+ if not self._messages[index]:
+ # User has *new* messages
+ if sysop:
+ output(u'NOTE: You have new messages in your sysop account on %s' % self)
+ else:
+ output(u'NOTE: You have new messages on %s' % self)
+ self._messages[index] = True
+ else:
+ self._messages[index] = False
+
+ # Don't perform other checks if the data was already loaded
+ if self._userData[index]:
+ return
+
+ # Search for the the user page link at the top.
+ # Note that the link of anonymous users (which doesn't exist at all
+ # in Wikimedia sites) has the ID pt-anonuserpage, and thus won't be
+ # found here.
+ userpageR = re.compile('<li id="pt-userpage"><a href=".+?">(?P<username>.+?)</a></li>')
+ m = userpageR.search(text)
+ if m:
+ self._isLoggedIn[index] = True
+ self._userName[index] = m.group('username')
+ else:
+ self._isLoggedIn[index] = False
+ # No idea what is the user name, and it isn't important
+ self._userName[index] = None
+
+ # Check user groups, if possible (introduced in 1.10)
+ groupsR = re.compile(r'var wgUserGroups = \[\"(.+)\"\];')
+ m = groupsR.search(text)
+ if m:
+ rights = m.group(1)
+ rights = rights.split('", "')
+ if '*' in rights:
+ rights.remove('*')
+ self._rights[index] = rights
+ # Warnings
+ # Don't show warnings for not logged in users, they will just fail to
+ # do any action
+ if self._isLoggedIn[index]:
+ if 'bot' not in self._rights[index]:
+ if sysop:
+ output(u'Note: Your sysop account on %s does not have a bot flag. Its edits will be visible in the recent changes.' % self)
+ else:
+ output(u'WARNING: Your account on %s does not have a bot flag. Its edits will be visible in the recent changes and it may get blocked.' % self)
+ if sysop and 'sysop' not in self._rights[index]:
+ output(u'WARNING: Your sysop account on %s does not seem to have sysop rights. You may not be able to perform any sysop-restricted actions using it.' % self)
+ else:
+ # We don't have wgUserGroups, and can't check the rights
+ self._rights[index] = []
+ if self._isLoggedIn[index]:
+ # Logged in user
+ self._rights[index].append('user')
+ # Assume bot, and thus autoconfirmed
+ self._rights[index].extend(['bot', 'autoconfirmed'])
+ if sysop:
+ # Assume user reported as a sysop indeed has the sysop rights
+ self._rights[index].append('sysop')
+ # Assume the user has the default rights
+ self._rights[index].extend(['read', 'createaccount', 'edit', 'upload', 'createpage', 'createtalk', 'move', 'upload'])
+ if 'bot' in self._rights[index] or 'sysop' in self._rights[index]:
+ self._rights[index].append('apihighlimits')
+ if 'sysop' in self._rights[index]:
+ self._rights[index].extend(['delete', 'undelete', 'block', 'protect', 'import', 'deletedhistory', 'unwatchedpages'])
+
+ # Search for a token
+ tokenR = re.compile(r"\<input type='hidden' value=\"(.*?)\" name=\"wpEditToken\"")
+ tokenloc = tokenR.search(text)
+ if tokenloc:
+ self._token[index] = tokenloc.group(1)
+ if self._rights[index] is not None:
+ # In this case, token and rights are loaded - user data is now loaded
+ self._userData[index] = True
+ else:
+ # Token not found
+ # Possible reason for this is the user is blocked, don't show a
+ # warning in this case, otherwise do show a warning
+ if not self._isBlocked[index]:
+ # Token not found
+ output(u'WARNING: Token not found on %s. You will not be able to edit any page.' % self)
+
def mediawiki_message(self, key):
"""Return the MediaWiki message text for key "key" """
global mwpage, tree
@@ -4145,6 +4297,29 @@
except KeyError:
return False
+ def _load(self, sysop = False):
+ """
+ Loads user data.
+ This is only done if we didn't do get any page yet and the information
+ is requested, otherwise we should already have this data.
+ Parameters:
+
+ * sysop - Get sysop user data?
+ """
+ index = self._userIndex(sysop)
+ if self._userData[index]:
+ return
+
+ if verbose:
+ output(u'Getting information for site %s' % self)
+
+ # Get data
+ url = self.edit_address('Non-existing_page')
+ text = self.getUrl(url, sysop = sysop)
+
+ # Parse data
+ self._getUserData(text, sysop = sysop)
+
def search(self, query, number = 10, namespaces = None):
"""Yield search results (using Special:Search page) for query."""
throttle = True
@@ -5147,33 +5322,15 @@
raise NoPage
def getToken(self, getalways = True, getagain = False, sysop = False):
- if getagain or (getalways and ((sysop and not self._sysoptoken) or (not sysop and not self._token))):
- output(u"Getting page to get a token.")
- try:
- self.sandboxpage.get(force = True, get_redirect = True, sysop = sysop)
- #Page(self, "Non-existing page").get(force = True, sysop = sysop)
- except UserBlocked:
- raise
- except Error:
- pass
- if sysop:
- if not self._sysoptoken:
- return False
- else:
- return self._sysoptoken
+ index = self._userIndex(sysop)
+ if getagain or (getalways and self._token[index] is None):
+ output(u'Getting a token.')
+ self._load(sysop = sysop)
+ if self._token[index] is not None:
+ return self._token[index]
else:
- if not self._token:
- return False
- else:
- return self._token
+ return False
- def putToken(self, value, sysop = False):
- if sysop:
- self._sysoptoken = value
- else:
- self._token = value
- return
-
# Caches to provide faster access
_sites = {}
_namespaceCache = {}