Revision: 5874 Author: russblau Date: 2008-09-04 18:35:41 +0000 (Thu, 04 Sep 2008)
Log Message: ----------- More Site methods, and minor fixes in data.api
Modified Paths: -------------- branches/rewrite/pywikibot/data/api.py branches/rewrite/pywikibot/site.py
Modified: branches/rewrite/pywikibot/data/api.py =================================================================== --- branches/rewrite/pywikibot/data/api.py 2008-09-04 15:30:59 UTC (rev 5873) +++ branches/rewrite/pywikibot/data/api.py 2008-09-04 18:35:41 UTC (rev 5874) @@ -282,12 +282,10 @@ except KeyError: self.site = pywikibot.Site() # make sure request type is valid, and get limit key if any - if "generator" in kwargs: - self.module = kwargs["generator"] - elif "list" in kwargs: - self.module = kwargs["list"] - elif "prop" in kwargs: - self.module = kwargs["prop"] + for modtype in ("generator", "list", "prop", "meta"): + if modtype in kwargs: + self.module = kwargs[modtype] + break else: raise Error("%s: No query module name found in arguments." % self.__class__.__name__)
Modified: branches/rewrite/pywikibot/site.py =================================================================== --- branches/rewrite/pywikibot/site.py 2008-09-04 15:30:59 UTC (rev 5873) +++ branches/rewrite/pywikibot/site.py 2008-09-04 18:35:41 UTC (rev 5874) @@ -350,6 +350,7 @@ 15: [u"Category talk"], } self.sitelock = threading.Lock() + self._msgcache = {} return
# ANYTHING BELOW THIS POINT IS NOT YET IMPLEMENTED IN __init__() @@ -417,7 +418,8 @@ - blockinfo: present if user is blocked (dict)
""" - if not hasattr(self, "_userinfo") or "rights" not in self._userinfo: + if not hasattr(self, "_userinfo") or "rights" not in self._userinfo \ + or self._userinfo['name'] != self.user(): uirequest = api.Request( site=self, action="query", @@ -432,6 +434,79 @@ self._userinfo = uidata['query']['userinfo'] return self._userinfo
+ def is_blocked(self, sysop=False): + """Return true if and only if user is blocked. + + @param sysop: If true, log in to sysop account (if available) + + """ + if not self.logged_in(sysop): + self.login(sysop) + return 'blockinfo' in self._userinfo + + def isBlocked(self, sysop=False): + """Deprecated; retained for backwards-compatibility""" + logger.debug("Site.isBlocked() method is deprecated; use is_blocked()") + return self.is_blocked(sysop) + + def has_right(self, right, sysop=False): + """Return true if and only if the user has a specific right. + + Possible values of 'right' may vary depending on wiki settings, + but will usually include: + + * Actions: edit, move, delete, protect, upload + * User levels: autoconfirmed, sysop, bot + + """ + if not self.logged_in(sysop): + self.login(sysop) + return right.lower() in self._userinfo['rights'] + + def isAllowed(self, right, sysop=False): + """Deprecated; retained for backwards-compatibility""" + logger.debug("Site.isAllowed() method is deprecated; use has_right()") + return self.has_right(right, sysop) + + def has_group(self, group, sysop=False): + """Return true if and only if the user is a member of specified group. + + Possible values of 'group' may vary depending on wiki settings, + but will usually include bot. + + """ + if not self.logged_in(sysop): + self.login(sysop) + return group.lower() in self._userinfo['groups'] + + def messages(self, sysop=False): + """Returns true if the user has new messages, and false otherwise.""" + if not self.logged_in(sysop): + self.login(sysop) + return 'hasmsg' in self._userinfo + + def mediawiki_message(self, key): + """Return the MediaWiki message text for key "key" """ + if not key in self._msgcache: + msg_query = api.QueryGenerator(site=self, meta="allmessages", + amfilter=key) + for msg in msg_query: + if msg['name'] == key and not 'missing' in msg: + self._msgcache[key] = msg['*'] + break + else: + raise KeyError("Site %(self)s has no message '%(key)s'" + % locals()) + return self._msgcache[key] + + def has_mediawiki_message(self, key): + """Return True iff this site defines a MediaWiki message for 'key'.""" + try: + v = self.mediawiki_message(key) + return True + except KeyError: + return False + def getcurrenttimestamp(self): """Return (Mediawiki) timestamp, {{CURRENTTIMESTAMP}}, the server time.
@@ -1039,7 +1114,7 @@ logger.debug("allpages: the 'throttle' parameter is deprecated.") if includeRedirects is not None: logger.debug( - "allpages: the 'includeRedirect' parameter is deprecated.") + "allpages: the 'includeRedirects' parameter is deprecated.") if includeRedirects: if includeRedirects == "only": filterredirs = True @@ -1074,6 +1149,17 @@ apgen.request["gapdir"] = "descending" return apgen
+ def prefixindex(self, prefix, namespace=0, includeredirects=True): + """Yield all pages with a given prefix. Deprecated. + + Use allpages() with the prefix= parameter instead of this method. + + """ + logger.debug("Site.prefixindex() is deprecated; use allpages instead.") + return self.allpages(prefix=prefix, namespace=namespace, + includeRedirects=includeredirects) + + def alllinks(self, start="!", prefix="", namespace=0, unique=False, limit=None, fromids=False): """Iterate all links to pages (which need not exist) in one namespace. @@ -2023,34 +2109,7 @@ #### METHODS NOT IMPLEMENTED YET (but may be delegated to Family object) #### class NotImplementedYet:
- def isBlocked(self, sysop=False): - """Check if the user is blocked.""" - try: - text = self.getUrl(u'%saction=query&meta=userinfo&uiprop=blockinfo' - % self.api_address(), sysop=sysop) - return text.find('blockedby=') > -1 - except NotImplementedError: - return False - - def isAllowed(self, right, sysop = False): - """Check if the user has a specific right. - Among possible rights: - * Actions: edit, move, delete, protect, upload - * User levels: autoconfirmed, sysop, bot, empty string (always true) - """ - if right == '' or right == None: - return True - else: - self._load(sysop = sysop) - index = self._userIndex(sysop) - return right in self._rights[index] - - def messages(self, sysop = False): - """Returns true if the user has new messages, and false otherwise.""" - self._load(sysop = sysop) - index = self._userIndex(sysop) - return self._messages[index] - + # TODO: is this needed any more? can it be obtained from the http module? def cookies(self, sysop = False): """Return a string containing the user's current cookies.""" self._loadCookies(sysop = sysop) @@ -2089,447 +2148,6 @@ self._cookies[index] = '; '.join([x.strip() for x in f.readlines()]) f.close()
- def urlEncode(self, query): - """Encode a query so that it can be sent using an http POST request.""" - if not query: - return None - if hasattr(query, 'iteritems'): - iterator = query.iteritems() - else: - iterator = iter(query) - l = [] - wpEditToken = None - for key, value in iterator: - if isinstance(key, unicode): - key = key.encode('utf-8') - if isinstance(value, unicode): - value = value.encode('utf-8') - key = urllib.quote(key) - value = urllib.quote(value) - if key == 'wpEditToken': - wpEditToken = value - continue - l.append(key + '=' + value) - - # wpEditToken is explicicmy added as last value. - # If a premature connection abort occurs while putting, the server will - # not have received an edit token and thus refuse saving the page - if wpEditToken != None: - l.append('wpEditToken=' + wpEditToken) - return '&'.join(l) - - def postForm(self, address, predata, sysop=False, useCookie=True): - """Post http form data to the given address at this site. - - address is the absolute path without hostname. - predata is a dict or any iterable that can be converted to a dict, - containing keys and values for the http form. - - Return a (response, data) tuple, where response is the HTTP - response object and data is a Unicode string containing the - body of the response. - - """ - data = self.urlEncode(predata) - try: - return self.postData(address, data, sysop=sysop, - useCookie=useCookie) - except socket.error, e: - raise ServerError(e) - - def postData(self, address, data, - contentType='application/x-www-form-urlencoded', - sysop=False, useCookie=True, compress=True): - """Post encoded data to the given http address at this site. - - address is the absolute path without hostname. - data is an ASCII string that has been URL-encoded. - - Returns a (response, data) tuple where response is the HTTP - response object and data is a Unicode string containing the - body of the response. - """ - - # TODO: add the authenticate stuff here - - if False: #self.persistent_http: - conn = self.conn - else: - # Encode all of this into a HTTP request - if self.protocol() == 'http': - conn = httplib.HTTPConnection(self.hostname()) - elif self.protocol() == 'https': - conn = httplib.HTTPSConnection(self.hostname()) - # otherwise, it will crash, as other protocols are not supported - - conn.putrequest('POST', address) - conn.putheader('Content-Length', str(len(data))) - conn.putheader('Content-type', contentType) - conn.putheader('User-agent', useragent) - if useCookie and self.cookies(sysop = sysop): - conn.putheader('Cookie', self.cookies(sysop = sysop)) - if False: #self.persistent_http: - conn.putheader('Connection', 'Keep-Alive') - if compress: - conn.putheader('Accept-encoding', 'gzip') - conn.endheaders() - conn.send(data) - - # Prepare the return values - # Note that this can raise network exceptions which are not - # caught here. - try: - response = conn.getresponse() - except httplib.BadStatusLine: - # Blub. - conn.close() - conn.connect() - return self.postData(address, data, contentType, sysop, useCookie) - - data = response.read() - - if compress and response.getheader('Content-Encoding') == 'gzip': - data = decompress_gzip(data) - - data = data.decode(self.encoding()) - response.close() - - if True: #not self.persistent_http: - conn.close() - - # If a wiki page, get user data - self._getUserData(data, sysop = sysop) - - return response, data - - def getUrl(self, path, retry = True, sysop = False, data = None, compress = True): - """ - Low-level routine to get a URL from the wiki. - - Parameters: - path - The absolute path, without the hostname. - retry - If True, retries loading the page when a network error - occurs. - sysop - If True, the sysop account's cookie will be used. - data - An optional dict providing extra post request parameters - - Returns the HTML text of the page converted to unicode. - """ - if False: #self.persistent_http and not data: - self.conn.putrequest('GET', path) - self.conn.putheader('User-agent', useragent) - self.conn.putheader('Cookie', self.cookies(sysop = sysop)) - self.conn.putheader('Connection', 'Keep-Alive') - if compress: - self.conn.putheader('Accept-encoding', 'gzip') - self.conn.endheaders() - - # Prepare the return values - # Note that this can raise network exceptions which are not - # caught here. - try: - response = self.conn.getresponse() - except httplib.BadStatusLine: - # Blub. - self.conn.close() - self.conn.connect() - return self.getUrl(path, retry, sysop, data, compress) - - text = response.read() - headers = dict(response.getheaders()) - - else: - if self.hostname() in config.authenticate.keys(): - uo = authenticateURLopener - else: - uo = MyURLopener() - if self.cookies(sysop = sysop): - uo.addheader('Cookie', self.cookies(sysop = sysop)) - if compress: - uo.addheader('Accept-encoding', 'gzip') - - url = '%s://%s%s' % (self.protocol(), self.hostname(), path) - data = self.urlEncode(data) - - # Try to retrieve the page until it was successfully loaded (just in - # case the server is down or overloaded). - # Wait for retry_idle_time minutes (growing!) between retries. - retry_idle_time = 1 - retrieved = False - while not retrieved: - try: - if self.hostname() in config.authenticate.keys(): - if False: # compress: - request = urllib2.Request(url, data) - request.add_header('Accept-encoding', 'gzip') - opener = urllib2.build_opener() - f = opener.open(request) - else: - f = urllib2.urlopen(url, data) - else: - f = uo.open(url, data) - retrieved = True - except KeyboardInterrupt: - raise - except Exception, e: - if retry: - # We assume that the server is down. Wait some time, then try again. - output(u"%s" % e) - output(u"""\ -WARNING: Could not open '%s://%s%s'. Maybe the server or -your connection is down. Retrying in %i minutes...""" - % (self.protocol(), self.hostname(), path, - retry_idle_time)) - time.sleep(retry_idle_time * 60) - # Next time wait longer, but not longer than half an hour - retry_idle_time *= 2 - if retry_idle_time > 30: - retry_idle_time = 30 - else: - raise - text = f.read() - - headers = f.info() - - contentType = headers.get('content-type', '') - contentEncoding = headers.get('content-encoding', '') - - # Ensure that all sent data is received - if int(headers.get('content-length', '0')) != len(text) and 'content-length' in headers: - output(u'Warning! len(text) does not match content-length: %s != %s' % \ - (len(text), headers.get('content-length'))) - if False: #self.persistent_http - self.conn.close() - self.conn.connect() - return self.getUrl(path, retry, sysop, data, compress) - - if compress and contentEncoding == 'gzip': - text = decompress_gzip(text) - - R = re.compile('charset=([^'";]+)') - m = R.search(contentType) - if m: - charset = m.group(1) - else: - output(u"WARNING: No character set found.") - # UTF-8 as default - charset = 'utf-8' - # Check if this is the charset we expected - self.checkCharset(charset) - # Convert HTML to Unicode - try: - text = unicode(text, charset, errors = 'strict') - except UnicodeDecodeError, e: - print e - output(u'ERROR: Invalid characters found on %s://%s%s, replaced by \ufffd.' % (self.protocol(), self.hostname(), path)) - # We use error='replace' in case of bad encoding. - text = unicode(text, charset, errors = 'replace') - - # If a wiki page, get user data - self._getUserData(text, sysop = sysop) - - return text - - def _getUserData(self, text, sysop = False): - """ - Get the user data from a wiki page data. - - Parameters: - * text - the page text - * sysop - is the user a sysop? - """ - if '<div id="globalWrapper">' not in text: - # Not a wiki page - return - - index = self._userIndex(sysop) - - # Check for blocks - but only if version is 1.11 (userinfo is available) - # and the user data was not yet loaded - if self.versionnumber() >= 11 and not self._userData[index]: - blocked = self.isBlocked(sysop = sysop) - if blocked and not self._isBlocked[index]: - # Write a warning if not shown earlier - if sysop: - account = 'Your sysop account' - else: - account = 'Your account' - output(u'WARNING: %s on %s is blocked. Editing using this account will stop the run.' % (account, self)) - self._isBlocked[index] = blocked - - # Check for new messages - if '<div class="usermessage">' in text: - if not self._messages[index]: - # User has *new* messages - if sysop: - output(u'NOTE: You have new messages in your sysop account on %s' % self) - else: - output(u'NOTE: You have new messages on %s' % self) - self._messages[index] = True - else: - self._messages[index] = False - - # Don't perform other checks if the data was already loaded - if self._userData[index]: - return - - # Search for the the user page link at the top. - # Note that the link of anonymous users (which doesn't exist at all - # in Wikimedia sites) has the ID pt-anonuserpage, and thus won't be - # found here. - userpageR = re.compile('<li id="pt-userpage"><a href=".+?">(?P<username>.+?)</a></li>') - m = userpageR.search(text) - if m: - self._isLoggedIn[index] = True - self._userName[index] = m.group('username') - else: - self._isLoggedIn[index] = False - # No idea what is the user name, and it isn't important - self._userName[index] = None - - # Check user groups, if possible (introduced in 1.10) - groupsR = re.compile(r'var wgUserGroups = ["(.+)"];') - m = groupsR.search(text) - if m: - rights = m.group(1) - rights = rights.split('", "') - if '*' in rights: - rights.remove('*') - self._rights[index] = rights - # Warnings - # Don't show warnings for not logged in users, they will just fail to - # do any action - if self._isLoggedIn[index]: - if 'bot' not in self._rights[index]: - if sysop: - output(u'Note: Your sysop account on %s does not have a bot flag. Its edits will be visible in the recent changes.' % self) - else: - output(u'WARNING: Your account on %s does not have a bot flag. Its edits will be visible in the recent changes and it may get blocked.' % self) - if sysop and 'sysop' not in self._rights[index]: - output(u'WARNING: Your sysop account on %s does not seem to have sysop rights. You may not be able to perform any sysop-restricted actions using it.' % self) - else: - # We don't have wgUserGroups, and can't check the rights - self._rights[index] = [] - if self._isLoggedIn[index]: - # Logged in user - self._rights[index].append('user') - # Assume bot, and thus autoconfirmed - self._rights[index].extend(['bot', 'autoconfirmed']) - if sysop: - # Assume user reported as a sysop indeed has the sysop rights - self._rights[index].append('sysop') - # Assume the user has the default rights - self._rights[index].extend(['read', 'createaccount', 'edit', 'upload', 'createpage', 'createtalk', 'move', 'upload']) - if 'bot' in self._rights[index] or 'sysop' in self._rights[index]: - self._rights[index].append('apihighlimits') - if 'sysop' in self._rights[index]: - self._rights[index].extend(['delete', 'undelete', 'block', 'protect', 'import', 'deletedhistory', 'unwatchedpages']) - - # Search for a token - tokenR = re.compile(r"<input type='hidden' value="(.*?)" name="wpEditToken"") - tokenloc = tokenR.search(text) - if tokenloc: - self._token[index] = tokenloc.group(1) - if self._rights[index] is not None: - # In this case, token and rights are loaded - user data is now loaded - self._userData[index] = True - else: - # Token not found - # Possible reason for this is the user is blocked, don't show a - # warning in this case, otherwise do show a warning - # Another possible reason is that the page cannot be edited - ensure - # there is a textarea and the tab "view source" is not shown - if u'<textarea' in text and u'<li id="ca-viewsource"' not in text and not self._isBlocked[index]: - # Token not found - output(u'WARNING: Token not found on %s. You will not be able to edit any page.' % self) - - def mediawiki_message(self, key): - """Return the MediaWiki message text for key "key" """ - global mwpage, tree - if key.lower() not in self._mediawiki_messages.keys() \ - and not hasattr(self, "_phploaded"): - get_throttle() - mwpage = self.getUrl("%s?title=%s:%s&action=edit" - % (self.path(), urllib.quote( - self.namespace(8).replace(' ', '_').encode( - self.encoding())), - key)) - tree = BeautifulSoup(mwpage, - convertEntities=BeautifulSoup.HTML_ENTITIES, - parseOnlyThese=SoupStrainer("textarea")) - if tree.textarea is not None and tree.textarea.string is not None: - value = tree.textarea.string.strip() - else: - value = None - if value: - self._mediawiki_messages[key.lower()] = value - else: - self._mediawiki_messages[key.lower()] = None - # Fallback in case MediaWiki: page method doesn't work - if verbose: - output( - u"Retrieving mediawiki messages from Special:Allmessages") - retry_idle_time = 1 - while True: - get_throttle() - phppage = self.getUrl(self.get_address("Special:Allmessages") - + "&ot=php") - Rphpvals = re.compile(r"(?ms)'([^']*)' => '(.*?[^\])',") - count = 0 - for (phpkey, phpval) in Rphpvals.findall(phppage): - count += 1 - self._mediawiki_messages[str(phpkey).lower()] = phpval - if count == 0: - # No messages could be added. - # We assume that the server is down. - # Wait some time, then try again. - output('WARNING: No messages found in Special:Allmessages. Maybe the server is down. Retrying in %i minutes...' % retry_idle_time) - time.sleep(retry_idle_time * 60) - # Next time wait longer, but not longer than half an hour - retry_idle_time *= 2 - if retry_idle_time > 30: - retry_idle_time = 30 - continue - break - self._phploaded = True - - key = key.lower() - if self._mediawiki_messages[key] is None: - raise KeyError("MediaWiki key '%s' does not exist on %s" - % (key, self)) - return self._mediawiki_messages[key] - - def has_mediawiki_message(self, key): - """Return True iff this site defines a MediaWiki message for 'key'.""" - try: - v = self.mediawiki_message(key) - return True - except KeyError: - return False - - def _load(self, sysop = False): - """ - Loads user data. - This is only done if we didn't do get any page yet and the information - is requested, otherwise we should already have this data. - - Parameters: - * sysop - Get sysop user data? - """ - index = self._userIndex(sysop) - if self._userData[index]: - return - - if verbose: - output(u'Getting information for site %s' % self) - - # Get data - url = self.edit_address('Non-existing_page') - text = self.getUrl(url, sysop = sysop) - - # Parse data - self._getUserData(text, sysop = sysop) - # TODO: avoid code duplication for the following methods def newpages(self, number = 10, get_redirect = False, repeat = False): """Yield new articles (as Page objects) from Special:Newpages. @@ -2850,29 +2468,6 @@ if not repeat: break
- def prefixindex(self, prefix, namespace=0, includeredirects=True): - """Yield all pages with a given prefix. - - Parameters: - prefix The prefix of the pages. - namespace Namespace number; defaults to 0. - MediaWiki software will only return pages in one namespace - at a time. - - If includeredirects is False, redirects will not be found. - If includeredirects equals the string 'only', only redirects - will be found. Note that this has not been tested on older - versions of the MediaWiki code. - - It is advised not to use this directly, but to use the - PrefixingPageGenerator from pagegenerators.py instead. - """ - for page in self.allpages(start = prefix, namespace = namespace, includeredirects = includeredirects): - if page.titleWithoutNamespace().startswith(prefix): - yield page - else: - break - def linksearch(self, siteurl): """Yield Pages from results of Special:Linksearch for 'siteurl'.""" if siteurl.startswith('*.'): @@ -3062,14 +2657,3 @@ self.namespace(14)+':'+self.family.disambcatname[self.code]) except KeyError: raise NoPage(u'No page %s.' % page) - - def getToken(self, getalways = True, getagain = False, sysop = False): - index = self._userIndex(sysop) - if getagain or (getalways and self._token[index] is None): - output(u'Getting a token.') - self._load(sysop = sysop) - if self._token[index] is not None: - return self._token[index] - else: - return False -