Revision: 5874
Author: russblau
Date: 2008-09-04 18:35:41 +0000 (Thu, 04 Sep 2008)
Log Message:
-----------
More Site methods, and minor fixes in data.api
Modified Paths:
--------------
branches/rewrite/pywikibot/data/api.py
branches/rewrite/pywikibot/site.py
Modified: branches/rewrite/pywikibot/data/api.py
===================================================================
--- branches/rewrite/pywikibot/data/api.py 2008-09-04 15:30:59 UTC (rev 5873)
+++ branches/rewrite/pywikibot/data/api.py 2008-09-04 18:35:41 UTC (rev 5874)
@@ -282,12 +282,10 @@
except KeyError:
self.site = pywikibot.Site()
# make sure request type is valid, and get limit key if any
- if "generator" in kwargs:
- self.module = kwargs["generator"]
- elif "list" in kwargs:
- self.module = kwargs["list"]
- elif "prop" in kwargs:
- self.module = kwargs["prop"]
+ for modtype in ("generator", "list", "prop", "meta"):
+ if modtype in kwargs:
+ self.module = kwargs[modtype]
+ break
else:
raise Error("%s: No query module name found in arguments."
% self.__class__.__name__)
Modified: branches/rewrite/pywikibot/site.py
===================================================================
--- branches/rewrite/pywikibot/site.py 2008-09-04 15:30:59 UTC (rev 5873)
+++ branches/rewrite/pywikibot/site.py 2008-09-04 18:35:41 UTC (rev 5874)
@@ -350,6 +350,7 @@
15: [u"Category talk"],
}
self.sitelock = threading.Lock()
+ self._msgcache = {}
return
# ANYTHING BELOW THIS POINT IS NOT YET IMPLEMENTED IN __init__()
@@ -417,7 +418,8 @@
- blockinfo: present if user is blocked (dict)
"""
- if not hasattr(self, "_userinfo") or "rights" not in self._userinfo:
+ if not hasattr(self, "_userinfo") or "rights" not in self._userinfo \
+ or self._userinfo['name'] != self.user():
uirequest = api.Request(
site=self,
action="query",
@@ -432,6 +434,79 @@
self._userinfo = uidata['query']['userinfo']
return self._userinfo
+ def is_blocked(self, sysop=False):
+ """Return true if and only if user is blocked.
+
+ @param sysop: If true, log in to sysop account (if available)
+
+ """
+ if not self.logged_in(sysop):
+ self.login(sysop)
+ return 'blockinfo' in self._userinfo
+
+ def isBlocked(self, sysop=False):
+ """Deprecated; retained for backwards-compatibility"""
+ logger.debug("Site.isBlocked() method is deprecated; use is_blocked()")
+ return self.is_blocked(sysop)
+
+ def has_right(self, right, sysop=False):
+ """Return true if and only if the user has a specific right.
+
+ Possible values of 'right' may vary depending on wiki settings,
+ but will usually include:
+
+ * Actions: edit, move, delete, protect, upload
+ * User levels: autoconfirmed, sysop, bot
+
+ """
+ if not self.logged_in(sysop):
+ self.login(sysop)
+ return right.lower() in self._userinfo['rights']
+
+ def isAllowed(self, right, sysop=False):
+ """Deprecated; retained for backwards-compatibility"""
+ logger.debug("Site.isAllowed() method is deprecated; use has_right()")
+ return self.has_right(right, sysop)
+
+ def has_group(self, group, sysop=False):
+ """Return true if and only if the user is a member of specified group.
+
+ Possible values of 'group' may vary depending on wiki settings,
+ but will usually include bot.
+
+ """
+ if not self.logged_in(sysop):
+ self.login(sysop)
+ return group.lower() in self._userinfo['groups']
+
+ def messages(self, sysop=False):
+ """Returns true if the user has new messages, and false otherwise."""
+ if not self.logged_in(sysop):
+ self.login(sysop)
+ return 'hasmsg' in self._userinfo
+
+ def mediawiki_message(self, key):
+ """Return the MediaWiki message text for key "key" """
+ if not key in self._msgcache:
+ msg_query = api.QueryGenerator(site=self, meta="allmessages",
+ amfilter=key)
+ for msg in msg_query:
+ if msg['name'] == key and not 'missing' in msg:
+ self._msgcache[key] = msg['*']
+ break
+ else:
+ raise KeyError("Site %(self)s has no message '%(key)s'"
+ % locals())
+ return self._msgcache[key]
+
+ def has_mediawiki_message(self, key):
+ """Return True iff this site defines a MediaWiki message for 'key'."""
+ try:
+ v = self.mediawiki_message(key)
+ return True
+ except KeyError:
+ return False
+
def getcurrenttimestamp(self):
"""Return (Mediawiki) timestamp, {{CURRENTTIMESTAMP}}, the server time.
@@ -1039,7 +1114,7 @@
logger.debug("allpages: the 'throttle' parameter is deprecated.")
if includeRedirects is not None:
logger.debug(
- "allpages: the 'includeRedirect' parameter is deprecated.")
+ "allpages: the 'includeRedirects' parameter is deprecated.")
if includeRedirects:
if includeRedirects == "only":
filterredirs = True
@@ -1074,6 +1149,17 @@
apgen.request["gapdir"] = "descending"
return apgen
+ def prefixindex(self, prefix, namespace=0, includeredirects=True):
+ """Yield all pages with a given prefix. Deprecated.
+
+ Use allpages() with the prefix= parameter instead of this method.
+
+ """
+ logger.debug("Site.prefixindex() is deprecated; use allpages instead.")
+ return self.allpages(prefix=prefix, namespace=namespace,
+ includeRedirects=includeredirects)
+
+
def alllinks(self, start="!", prefix="", namespace=0, unique=False,
limit=None, fromids=False):
"""Iterate all links to pages (which need not exist) in one namespace.
@@ -2023,34 +2109,7 @@
#### METHODS NOT IMPLEMENTED YET (but may be delegated to Family object) ####
class NotImplementedYet:
- def isBlocked(self, sysop=False):
- """Check if the user is blocked."""
- try:
- text = self.getUrl(u'%saction=query&meta=userinfo&uiprop=blockinfo'
- % self.api_address(), sysop=sysop)
- return text.find('blockedby=') > -1
- except NotImplementedError:
- return False
-
- def isAllowed(self, right, sysop = False):
- """Check if the user has a specific right.
- Among possible rights:
- * Actions: edit, move, delete, protect, upload
- * User levels: autoconfirmed, sysop, bot, empty string (always true)
- """
- if right == '' or right == None:
- return True
- else:
- self._load(sysop = sysop)
- index = self._userIndex(sysop)
- return right in self._rights[index]
-
- def messages(self, sysop = False):
- """Returns true if the user has new messages, and false otherwise."""
- self._load(sysop = sysop)
- index = self._userIndex(sysop)
- return self._messages[index]
-
+ # TODO: is this needed any more? can it be obtained from the http module?
def cookies(self, sysop = False):
"""Return a string containing the user's current cookies."""
self._loadCookies(sysop = sysop)
@@ -2089,447 +2148,6 @@
self._cookies[index] = '; '.join([x.strip() for x in f.readlines()])
f.close()
- def urlEncode(self, query):
- """Encode a query so that it can be sent using an http POST request."""
- if not query:
- return None
- if hasattr(query, 'iteritems'):
- iterator = query.iteritems()
- else:
- iterator = iter(query)
- l = []
- wpEditToken = None
- for key, value in iterator:
- if isinstance(key, unicode):
- key = key.encode('utf-8')
- if isinstance(value, unicode):
- value = value.encode('utf-8')
- key = urllib.quote(key)
- value = urllib.quote(value)
- if key == 'wpEditToken':
- wpEditToken = value
- continue
- l.append(key + '=' + value)
-
- # wpEditToken is explicicmy added as last value.
- # If a premature connection abort occurs while putting, the server will
- # not have received an edit token and thus refuse saving the page
- if wpEditToken != None:
- l.append('wpEditToken=' + wpEditToken)
- return '&'.join(l)
-
- def postForm(self, address, predata, sysop=False, useCookie=True):
- """Post http form data to the given address at this site.
-
- address is the absolute path without hostname.
- predata is a dict or any iterable that can be converted to a dict,
- containing keys and values for the http form.
-
- Return a (response, data) tuple, where response is the HTTP
- response object and data is a Unicode string containing the
- body of the response.
-
- """
- data = self.urlEncode(predata)
- try:
- return self.postData(address, data, sysop=sysop,
- useCookie=useCookie)
- except socket.error, e:
- raise ServerError(e)
-
- def postData(self, address, data,
- contentType='application/x-www-form-urlencoded',
- sysop=False, useCookie=True, compress=True):
- """Post encoded data to the given http address at this site.
-
- address is the absolute path without hostname.
- data is an ASCII string that has been URL-encoded.
-
- Returns a (response, data) tuple where response is the HTTP
- response object and data is a Unicode string containing the
- body of the response.
- """
-
- # TODO: add the authenticate stuff here
-
- if False: #self.persistent_http:
- conn = self.conn
- else:
- # Encode all of this into a HTTP request
- if self.protocol() == 'http':
- conn = httplib.HTTPConnection(self.hostname())
- elif self.protocol() == 'https':
- conn = httplib.HTTPSConnection(self.hostname())
- # otherwise, it will crash, as other protocols are not supported
-
- conn.putrequest('POST', address)
- conn.putheader('Content-Length', str(len(data)))
- conn.putheader('Content-type', contentType)
- conn.putheader('User-agent', useragent)
- if useCookie and self.cookies(sysop = sysop):
- conn.putheader('Cookie', self.cookies(sysop = sysop))
- if False: #self.persistent_http:
- conn.putheader('Connection', 'Keep-Alive')
- if compress:
- conn.putheader('Accept-encoding', 'gzip')
- conn.endheaders()
- conn.send(data)
-
- # Prepare the return values
- # Note that this can raise network exceptions which are not
- # caught here.
- try:
- response = conn.getresponse()
- except httplib.BadStatusLine:
- # Blub.
- conn.close()
- conn.connect()
- return self.postData(address, data, contentType, sysop, useCookie)
-
- data = response.read()
-
- if compress and response.getheader('Content-Encoding') == 'gzip':
- data = decompress_gzip(data)
-
- data = data.decode(self.encoding())
- response.close()
-
- if True: #not self.persistent_http:
- conn.close()
-
- # If a wiki page, get user data
- self._getUserData(data, sysop = sysop)
-
- return response, data
-
- def getUrl(self, path, retry = True, sysop = False, data = None, compress = True):
- """
- Low-level routine to get a URL from the wiki.
-
- Parameters:
- path - The absolute path, without the hostname.
- retry - If True, retries loading the page when a network error
- occurs.
- sysop - If True, the sysop account's cookie will be used.
- data - An optional dict providing extra post request parameters
-
- Returns the HTML text of the page converted to unicode.
- """
- if False: #self.persistent_http and not data:
- self.conn.putrequest('GET', path)
- self.conn.putheader('User-agent', useragent)
- self.conn.putheader('Cookie', self.cookies(sysop = sysop))
- self.conn.putheader('Connection', 'Keep-Alive')
- if compress:
- self.conn.putheader('Accept-encoding', 'gzip')
- self.conn.endheaders()
-
- # Prepare the return values
- # Note that this can raise network exceptions which are not
- # caught here.
- try:
- response = self.conn.getresponse()
- except httplib.BadStatusLine:
- # Blub.
- self.conn.close()
- self.conn.connect()
- return self.getUrl(path, retry, sysop, data, compress)
-
- text = response.read()
- headers = dict(response.getheaders())
-
- else:
- if self.hostname() in config.authenticate.keys():
- uo = authenticateURLopener
- else:
- uo = MyURLopener()
- if self.cookies(sysop = sysop):
- uo.addheader('Cookie', self.cookies(sysop = sysop))
- if compress:
- uo.addheader('Accept-encoding', 'gzip')
-
- url = '%s://%s%s' % (self.protocol(), self.hostname(), path)
- data = self.urlEncode(data)
-
- # Try to retrieve the page until it was successfully loaded (just in
- # case the server is down or overloaded).
- # Wait for retry_idle_time minutes (growing!) between retries.
- retry_idle_time = 1
- retrieved = False
- while not retrieved:
- try:
- if self.hostname() in config.authenticate.keys():
- if False: # compress:
- request = urllib2.Request(url, data)
- request.add_header('Accept-encoding', 'gzip')
- opener = urllib2.build_opener()
- f = opener.open(request)
- else:
- f = urllib2.urlopen(url, data)
- else:
- f = uo.open(url, data)
- retrieved = True
- except KeyboardInterrupt:
- raise
- except Exception, e:
- if retry:
- # We assume that the server is down. Wait some time, then try again.
- output(u"%s" % e)
- output(u"""\
-WARNING: Could not open '%s://%s%s'. Maybe the server or
-your connection is down. Retrying in %i minutes..."""
- % (self.protocol(), self.hostname(), path,
- retry_idle_time))
- time.sleep(retry_idle_time * 60)
- # Next time wait longer, but not longer than half an hour
- retry_idle_time *= 2
- if retry_idle_time > 30:
- retry_idle_time = 30
- else:
- raise
- text = f.read()
-
- headers = f.info()
-
- contentType = headers.get('content-type', '')
- contentEncoding = headers.get('content-encoding', '')
-
- # Ensure that all sent data is received
- if int(headers.get('content-length', '0')) != len(text) and 'content-length' in headers:
- output(u'Warning! len(text) does not match content-length: %s != %s' % \
- (len(text), headers.get('content-length')))
- if False: #self.persistent_http
- self.conn.close()
- self.conn.connect()
- return self.getUrl(path, retry, sysop, data, compress)
-
- if compress and contentEncoding == 'gzip':
- text = decompress_gzip(text)
-
- R = re.compile('charset=([^\'\";]+)')
- m = R.search(contentType)
- if m:
- charset = m.group(1)
- else:
- output(u"WARNING: No character set found.")
- # UTF-8 as default
- charset = 'utf-8'
- # Check if this is the charset we expected
- self.checkCharset(charset)
- # Convert HTML to Unicode
- try:
- text = unicode(text, charset, errors = 'strict')
- except UnicodeDecodeError, e:
- print e
- output(u'ERROR: Invalid characters found on %s://%s%s, replaced by \\ufffd.' % (self.protocol(), self.hostname(), path))
- # We use error='replace' in case of bad encoding.
- text = unicode(text, charset, errors = 'replace')
-
- # If a wiki page, get user data
- self._getUserData(text, sysop = sysop)
-
- return text
-
- def _getUserData(self, text, sysop = False):
- """
- Get the user data from a wiki page data.
-
- Parameters:
- * text - the page text
- * sysop - is the user a sysop?
- """
- if '<div id="globalWrapper">' not in text:
- # Not a wiki page
- return
-
- index = self._userIndex(sysop)
-
- # Check for blocks - but only if version is 1.11 (userinfo is available)
- # and the user data was not yet loaded
- if self.versionnumber() >= 11 and not self._userData[index]:
- blocked = self.isBlocked(sysop = sysop)
- if blocked and not self._isBlocked[index]:
- # Write a warning if not shown earlier
- if sysop:
- account = 'Your sysop account'
- else:
- account = 'Your account'
- output(u'WARNING: %s on %s is blocked. Editing using this account will stop the run.' % (account, self))
- self._isBlocked[index] = blocked
-
- # Check for new messages
- if '<div class="usermessage">' in text:
- if not self._messages[index]:
- # User has *new* messages
- if sysop:
- output(u'NOTE: You have new messages in your sysop account on %s' % self)
- else:
- output(u'NOTE: You have new messages on %s' % self)
- self._messages[index] = True
- else:
- self._messages[index] = False
-
- # Don't perform other checks if the data was already loaded
- if self._userData[index]:
- return
-
- # Search for the the user page link at the top.
- # Note that the link of anonymous users (which doesn't exist at all
- # in Wikimedia sites) has the ID pt-anonuserpage, and thus won't be
- # found here.
- userpageR = re.compile('<li id="pt-userpage"><a href=".+?">(?P<username>.+?)</a></li>')
- m = userpageR.search(text)
- if m:
- self._isLoggedIn[index] = True
- self._userName[index] = m.group('username')
- else:
- self._isLoggedIn[index] = False
- # No idea what is the user name, and it isn't important
- self._userName[index] = None
-
- # Check user groups, if possible (introduced in 1.10)
- groupsR = re.compile(r'var wgUserGroups = \[\"(.+)\"\];')
- m = groupsR.search(text)
- if m:
- rights = m.group(1)
- rights = rights.split('", "')
- if '*' in rights:
- rights.remove('*')
- self._rights[index] = rights
- # Warnings
- # Don't show warnings for not logged in users, they will just fail to
- # do any action
- if self._isLoggedIn[index]:
- if 'bot' not in self._rights[index]:
- if sysop:
- output(u'Note: Your sysop account on %s does not have a bot flag. Its edits will be visible in the recent changes.' % self)
- else:
- output(u'WARNING: Your account on %s does not have a bot flag. Its edits will be visible in the recent changes and it may get blocked.' % self)
- if sysop and 'sysop' not in self._rights[index]:
- output(u'WARNING: Your sysop account on %s does not seem to have sysop rights. You may not be able to perform any sysop-restricted actions using it.' % self)
- else:
- # We don't have wgUserGroups, and can't check the rights
- self._rights[index] = []
- if self._isLoggedIn[index]:
- # Logged in user
- self._rights[index].append('user')
- # Assume bot, and thus autoconfirmed
- self._rights[index].extend(['bot', 'autoconfirmed'])
- if sysop:
- # Assume user reported as a sysop indeed has the sysop rights
- self._rights[index].append('sysop')
- # Assume the user has the default rights
- self._rights[index].extend(['read', 'createaccount', 'edit', 'upload', 'createpage', 'createtalk', 'move', 'upload'])
- if 'bot' in self._rights[index] or 'sysop' in self._rights[index]:
- self._rights[index].append('apihighlimits')
- if 'sysop' in self._rights[index]:
- self._rights[index].extend(['delete', 'undelete', 'block', 'protect', 'import', 'deletedhistory', 'unwatchedpages'])
-
- # Search for a token
- tokenR = re.compile(r"\<input type='hidden' value=\"(.*?)\" name=\"wpEditToken\"")
- tokenloc = tokenR.search(text)
- if tokenloc:
- self._token[index] = tokenloc.group(1)
- if self._rights[index] is not None:
- # In this case, token and rights are loaded - user data is now loaded
- self._userData[index] = True
- else:
- # Token not found
- # Possible reason for this is the user is blocked, don't show a
- # warning in this case, otherwise do show a warning
- # Another possible reason is that the page cannot be edited - ensure
- # there is a textarea and the tab "view source" is not shown
- if u'<textarea' in text and u'<li id="ca-viewsource"' not in text and not self._isBlocked[index]:
- # Token not found
- output(u'WARNING: Token not found on %s. You will not be able to edit any page.' % self)
-
- def mediawiki_message(self, key):
- """Return the MediaWiki message text for key "key" """
- global mwpage, tree
- if key.lower() not in self._mediawiki_messages.keys() \
- and not hasattr(self, "_phploaded"):
- get_throttle()
- mwpage = self.getUrl("%s?title=%s:%s&action=edit"
- % (self.path(), urllib.quote(
- self.namespace(8).replace(' ', '_').encode(
- self.encoding())),
- key))
- tree = BeautifulSoup(mwpage,
- convertEntities=BeautifulSoup.HTML_ENTITIES,
- parseOnlyThese=SoupStrainer("textarea"))
- if tree.textarea is not None and tree.textarea.string is not None:
- value = tree.textarea.string.strip()
- else:
- value = None
- if value:
- self._mediawiki_messages[key.lower()] = value
- else:
- self._mediawiki_messages[key.lower()] = None
- # Fallback in case MediaWiki: page method doesn't work
- if verbose:
- output(
- u"Retrieving mediawiki messages from Special:Allmessages")
- retry_idle_time = 1
- while True:
- get_throttle()
- phppage = self.getUrl(self.get_address("Special:Allmessages")
- + "&ot=php")
- Rphpvals = re.compile(r"(?ms)'([^']*)' => '(.*?[^\\])',")
- count = 0
- for (phpkey, phpval) in Rphpvals.findall(phppage):
- count += 1
- self._mediawiki_messages[str(phpkey).lower()] = phpval
- if count == 0:
- # No messages could be added.
- # We assume that the server is down.
- # Wait some time, then try again.
- output('WARNING: No messages found in Special:Allmessages. Maybe the server is down. Retrying in %i minutes...' % retry_idle_time)
- time.sleep(retry_idle_time * 60)
- # Next time wait longer, but not longer than half an hour
- retry_idle_time *= 2
- if retry_idle_time > 30:
- retry_idle_time = 30
- continue
- break
- self._phploaded = True
-
- key = key.lower()
- if self._mediawiki_messages[key] is None:
- raise KeyError("MediaWiki key '%s' does not exist on %s"
- % (key, self))
- return self._mediawiki_messages[key]
-
- def has_mediawiki_message(self, key):
- """Return True iff this site defines a MediaWiki message for 'key'."""
- try:
- v = self.mediawiki_message(key)
- return True
- except KeyError:
- return False
-
- def _load(self, sysop = False):
- """
- Loads user data.
- This is only done if we didn't do get any page yet and the information
- is requested, otherwise we should already have this data.
-
- Parameters:
- * sysop - Get sysop user data?
- """
- index = self._userIndex(sysop)
- if self._userData[index]:
- return
-
- if verbose:
- output(u'Getting information for site %s' % self)
-
- # Get data
- url = self.edit_address('Non-existing_page')
- text = self.getUrl(url, sysop = sysop)
-
- # Parse data
- self._getUserData(text, sysop = sysop)
-
# TODO: avoid code duplication for the following methods
def newpages(self, number = 10, get_redirect = False, repeat = False):
"""Yield new articles (as Page objects) from Special:Newpages.
@@ -2850,29 +2468,6 @@
if not repeat:
break
- def prefixindex(self, prefix, namespace=0, includeredirects=True):
- """Yield all pages with a given prefix.
-
- Parameters:
- prefix The prefix of the pages.
- namespace Namespace number; defaults to 0.
- MediaWiki software will only return pages in one namespace
- at a time.
-
- If includeredirects is False, redirects will not be found.
- If includeredirects equals the string 'only', only redirects
- will be found. Note that this has not been tested on older
- versions of the MediaWiki code.
-
- It is advised not to use this directly, but to use the
- PrefixingPageGenerator from pagegenerators.py instead.
- """
- for page in self.allpages(start = prefix, namespace = namespace, includeredirects = includeredirects):
- if page.titleWithoutNamespace().startswith(prefix):
- yield page
- else:
- break
-
def linksearch(self, siteurl):
"""Yield Pages from results of Special:Linksearch for 'siteurl'."""
if siteurl.startswith('*.'):
@@ -3062,14 +2657,3 @@
self.namespace(14)+':'+self.family.disambcatname[self.code])
except KeyError:
raise NoPage(u'No page %s.' % page)
-
- def getToken(self, getalways = True, getagain = False, sysop = False):
- index = self._userIndex(sysop)
- if getagain or (getalways and self._token[index] is None):
- output(u'Getting a token.')
- self._load(sysop = sysop)
- if self._token[index] is not None:
- return self._token[index]
- else:
- return False
-