diff --git a/FLOSSbot/bot.py b/FLOSSbot/bot.py index 66eff53..a2509e3 100644 --- a/FLOSSbot/bot.py +++ b/FLOSSbot/bot.py @@ -1,231 +1,231 @@ # # Copyright (C) 2016 Loic Dachary # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import argparse import logging from datetime import datetime, timedelta import pywikibot log = logging.getLogger(__name__) class Bot(object): def __init__(self, args): self.args = args self.site = pywikibot.Site( code="wikidata" if not self.args.test else "test", fam="wikidata", user=self.args.user) if self.args.test: self.site.throttle.setDelays(writedelay=0) if self.args.test: self.wikidata_site = pywikibot.Site(code="wikidata", fam="wikidata") self.reset_cache() @staticmethod def get_parser(): parser = argparse.ArgumentParser(add_help=False) parser.add_argument( '--test', action='store_true', default=None, help='use test.wikidata.org instead of wikidata.org') parser.add_argument( '--user', default=None, help='wikidata user name') parser.add_argument( '--verification-delay', type=int, default=30, help='days to wait before verifying a claim again') return parser @staticmethod def factory(cls, argv): parser = argparse.ArgumentParser( parents=[Bot.get_parser()], add_help=False, conflict_handler='resolve') cls.set_subparser(parser.add_subparsers()) return cls(parser.parse_args(argv)) def debug(self, item, message): self.log(log.debug, item, message) def info(self, item, message): self.log(log.info, item, message) def error(self, item, message): self.log(log.error, item, message) def log(self, fun, item, message): fun("http://wikidata.org/wiki/" + item.getID() + " " + message) def reset_cache(self): self.entities = { 'property': {}, 'item': {}, } def lookup_entity(self, name, **kwargs): type = kwargs['type'] found = self.entities[type].get(name) if found: return found found = self.search_entity(self.site, name, **kwargs) if found: if type == 'property': found = found['id'] self.entities[type][name] = found return found # # Hardcode the desired wikidata item when there are # multiple items with the same english label and no # trivial way to disambiguate them. # authoritative = { 'wikidata': { 'git': 'Q186055', 'Fossil': 'Q1439431', }, 'test': { }, } def search_entity(self, site, name, **kwargs): if name in Bot.authoritative[site.code]: candidate = pywikibot.ItemPage( site, Bot.authoritative[site.code][name], 0) if candidate.get()['labels']['en'] == name: return candidate candidates = [] for p in site.search_entities(name, 'en', **kwargs): # log.debug("looking for entity " + name + ", found " + str(p)) if p.get('label') == name: if kwargs['type'] == 'property': candidates.append(p) else: candidates.append(pywikibot.ItemPage(site, p['id'], 0)) if len(candidates) == 0: return None elif len(candidates) > 1 and kwargs['type'] == 'item': found = [] for candidate in candidates: item = candidate.get() ok = True for instance_of in item['claims'].get(self.P_instance_of, []): if (instance_of.getTarget() == self.Q_Wikimedia_disambiguation_page): log.debug("ignore disambiguation page " + candidate.getID() + " for " + name) ok = False break if ok: found.append(candidate) if len(found) != 1: raise ValueError("found multiple items for " + name + " " + str(found)) return found[0] else: return candidates[0] lookup_item = lookup_entity def lookup_property(self, name): return self.lookup_entity(self.site, name, type='property') def create_entity(self, type, name): found = self.search_entity(self.wikidata_site, name, type=type) entity = { "labels": { "en": { "language": "en", "value": name, } }, } if type == 'property': assert found, type + " " + name + " must exist in wikidata" id = found['id'] found = self.wikidata_site.loadcontent({'ids': id}, 'datatype') assert found, "datatype of " + id + " " + name + " is not found" entity['datatype'] = found[id]['datatype'] log.debug("create " + type + " " + str(entity)) self.site.editEntity({'new': type}, entity) def clear_entity_label(self, id): data = { "labels": { "en": { "language": "en", "value": "", } } } log.debug("clear " + id + " label") self.site.editEntity({'id': id}, data) self.reset_cache() def __getattribute__(self, name): if name.startswith('P_'): type = 'property' elif name.startswith('Q_'): type = 'item' else: return super(Bot, self).__getattribute__(name) label = " ".join(name.split('_')[1:]) found = self.lookup_entity(label, type=type) if not found and self.args.test: self.create_entity(type, label) for i in range(120): found = self.lookup_entity(label, type=type) if found is not None: break return found def need_verification(self, claim): now = datetime.utcnow() - if self.P_retrieved in claim.qualifiers: - previous = claim.qualifiers[self.P_retrieved][0] + if self.P_point_in_time in claim.qualifiers: + previous = claim.qualifiers[self.P_point_in_time][0] previous = previous.getTarget() previous = datetime(year=previous.year, month=previous.month, day=previous.day) return (now - previous >= timedelta(days=self.args.verification_delay)) else: return True - def set_retrieved(self, item, claim, now=datetime.utcnow()): + def set_point_in_time(self, item, claim, now=datetime.utcnow()): when = pywikibot.WbTime(now.year, now.month, now.day) - if self.P_retrieved in claim.qualifiers: - self.debug(item, "updating retrieved") - retrieved = claim.qualifiers[self.P_retrieved][0] - retrieved.setTarget(when) + if self.P_point_in_time in claim.qualifiers: + self.debug(item, "updating point-in-time") + point_in_time = claim.qualifiers[self.P_point_in_time][0] + point_in_time.setTarget(when) if not self.args.dry_run: self.site.save_claim(claim) else: - self.debug(item, "setting retrieved") - retrieved = pywikibot.Claim(self.site, - self.P_retrieved, - isQualifier=True) - retrieved.setTarget(when) + self.debug(item, "setting point-in-time") + point_in_time = pywikibot.Claim(self.site, + self.P_point_in_time, + isQualifier=True) + point_in_time.setTarget(when) if not self.args.dry_run: - claim.addQualifier(retrieved, bot=True) + claim.addQualifier(point_in_time, bot=True) diff --git a/FLOSSbot/repository.py b/FLOSSbot/repository.py index 5d4d9d0..5755c48 100644 --- a/FLOSSbot/repository.py +++ b/FLOSSbot/repository.py @@ -1,549 +1,549 @@ # # Copyright (C) 2016 Loic Dachary # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import argparse import logging import re import textwrap import time import pywikibot import requests from pywikibot import pagegenerators as pg from FLOSSbot import bot, util log = logging.getLogger(__name__) FLOSS_doc = ("https://www.wikidata.org/wiki/Wikidata:" "WikiProject_Informatics/FLOSS#source_code_repository") class Repository(bot.Bot): cache = None @staticmethod def get_parser(): parser = argparse.ArgumentParser(add_help=False) select = parser.add_mutually_exclusive_group() select.add_argument( '--filter', default='', choices=['no-protocol', 'no-preferred'], help='filter with a pre-defined query', ) select.add_argument( '--item', default=[], action='append', help='work on this QID (can be repeated)') return parser @staticmethod def set_subparser(subparsers): subparsers.add_parser( 'repository', formatter_class=util.CustomFormatter, description=textwrap.dedent("""\ Verify and fix the source code repository claim. The scope of the verifications and the associated modifications is explained below. By default all items that have at least one source code repository claim are considered. It can be restricted with the --filter or --item options. A) Protocol The source code repository responds to a protocol that depends on the VCS. If the protocol qualifier is missing, try a range of VCS to figure out which protocol it implements and set the protocol qualifier accordingly. For web sites that host many respositories (such as github or sourceforge), additional heuristics are implemented to figure out the URL of the repository or the protocol. For instance, since github only hosts git repositories, the protocol is always assumed to be git. For sourceforce, the URL of the web interface to the repository is fetched to get the instructions and figure out if it is subversion, mercurial or git. When everything fails and the protocol cannot be established with absolute certainty, an error is displayed and an editor should fix the item. --filter no-protocol select only the items for which there exists at least one claim with no protocol qualifier B) Preferred rank When there are multiple source code repository URLs one of them must have the preferred rank. The aim is to display it in an infobox therefore the URL with the http protocol should be preferred over another requiring a VCS software. --filter no-preferred select only the items for which there exists at more than one claim with no preferred rank [1] {doc} """.format(doc=FLOSS_doc)), epilog=textwrap.dedent(""" Examples: $ FLOSSbot --verbose repository INFO WORKING ON https://www.wikidata.org/wiki/Q403539 INFO IGNORE \ https://code.wireshark.org/review/gitweb?p=wireshark.git \ because it already has a protocol DEBUG trying all known protocols on \ https://code.wireshark.org/review/p/wireshark.git DEBUG :sh: timeout 30 git ls-remote \ https://code.wireshark.org/review/p/wireshark.git HEAD DEBUG b'e8f1d2abda939f37d99f272f8a76a191c9a752b4\tHEAD' INFO WORKING ON https://www.wikidata.org/wiki/Q4035967 DEBUG trying all known protocols on \ http://git.ceph.com/?p=ceph.git;a=summary DEBUG :sh: timeout 30 git ls-remote \ http://git.ceph.com/?p=ceph.git;a=summary HEAD DEBUG b"fatal: repository \ 'http://git.ceph.com/?p=ceph.git/' not found" DEBUG b'/bin/sh: 1: HEAD: not found' ... ERROR SKIP http://git.ceph.com/?p=ceph.git;a=summary The first item (https://www.wikidata.org/wiki/Q403539) has two source code repository. The first one already has a protocol qualifier and is left untouched. An attempt is made to retrieve it with the git command line and succeeds. The protocol qualifier is set to git. The second item (WORKING ON https://www.wikidata.org/wiki/Q4035967) has a source code repository URL which is a gitweb interface to a git repository. It is not useable wiht any protocol, including git, and the program fails with an error so the editor can manually edit the item. """), help='Set protocol of the source code repository', parents=[Repository.get_parser()], add_help=False, conflict_handler='resolve', ).set_defaults( func=Repository, ) @staticmethod def factory(argv): return bot.Bot.factory(Repository, argv) def run(self): if len(self.args.item) > 0: self.run_items() else: self.run_query() def run_items(self): for item in self.args.item: item = pywikibot.ItemPage(self.site, item, 0) self.fixup(item) self.verify(item) def run_query(self): if self.args.filter == 'no-protocol': query = """ SELECT DISTINCT ?item WHERE {{ ?item p:{source_code_repository} ?repo. ?repo ps:{source_code_repository} ?value. OPTIONAL {{ ?repo pq:{protocol} ?protocol }} # get the protocol FILTER(!BOUND(?protocol)) # and only keep those with no protocol }} ORDER BY ?item """.format(source_code_repository=self.P_source_code_repository, protocol=self.P_protocol) elif self.args.filter == 'no-preferred': query = """ SELECT ?item (COUNT(?value) AS ?count) WHERE {{ ?item p:{source_code_repository} [ ps:{source_code_repository} ?value; wikibase:rank wikibase:NormalRank ]. MINUS {{ ?item p:{source_code_repository}/wikibase:rank wikibase:PreferredRank. }} }} GROUP BY ?item HAVING(?count > 1) ORDER BY ?item """.format(source_code_repository=self.P_source_code_repository) else: query = """ SELECT DISTINCT ?item WHERE {{ ?item wdt:{source_code_repository} ?url. }} ORDER BY ?item """.format(source_code_repository=self.P_source_code_repository) query = query + " # " + str(time.time()) log.debug(query) for item in pg.WikidataSPARQLPageGenerator(query, site=self.site, result_type=list): self.fixup(item) self.verify(item) def verify(self, item): item_dict = item.get() clm_dict = item_dict["claims"] status = {} for claim in clm_dict[self.P_source_code_repository]: url = claim.getTarget() if not self.need_verification(claim): status[url] = 'no need' continue if self.P_protocol not in claim.qualifiers: status[url] = 'no protocol' continue protocol = claim.qualifiers[self.P_protocol][0].getTarget() self.debug(item, url + " protocol " + protocol.getID() + " " + protocol.get()['labels']['en']) credentials = self.get_credentials(claim) if self.verify_protocol(url, protocol, credentials): self.info(item, "VERIFIED " + url) status[url] = 'verified' - self.set_retrieved(item, claim) + self.set_point_in_time(item, claim) else: self.error(item, "VERIFY FAIL " + url) status[url] = 'fail' return status def fixup(self, item): self.fixup_protocol(item) self.fixup_rank(item) def fixup_rank(self, item): item_dict = item.get() clm_dict = item_dict["claims"] if len(clm_dict[self.P_source_code_repository]) == 1: return False if len(clm_dict[self.P_source_code_repository]) != 2: self.debug(item, "SKIP more than two URLs is too difficult to fix") return False http = [] for claim in clm_dict[self.P_source_code_repository]: if claim.getRank() == 'preferred': self.debug(item, "SKIP because there already is a preferred URL") return False if self.P_protocol not in claim.qualifiers: continue for protocol in claim.qualifiers[self.P_protocol]: if protocol.getTarget() == self.Q_Hypertext_Transfer_Protocol: http.append(claim) if len(http) != 1: self.debug(item, "SKIP because there are " + str(len(http)) + " URLs with the http protocol") return False if not self.args.dry_run: http[0].changeRank('preferred') self.info(item, "PREFERRED set to " + http[0].getTarget()) return True def fixup_protocol(self, item): item_dict = item.get() clm_dict = item_dict["claims"] urls = [] for claim in clm_dict[self.P_source_code_repository]: urls.append(claim.getTarget()) for claim in clm_dict[self.P_source_code_repository]: url = claim.getTarget() extracted = self.extract_repository(url) if extracted and extracted not in urls: self.debug(item, "ADDING " + extracted + " as a source repository discovered in " + url) source_code_repository = pywikibot.Claim( self.site, self.P_source_code_repository, 0) source_code_repository.setTarget(extracted) if not self.args.dry_run: item.addClaim(source_code_repository) if claim.getRank() == 'normal': if not self.args.dry_run: claim.changeRank('preferred') self.info(item, "PREFERRED set to " + url) for claim in clm_dict[self.P_source_code_repository]: self.fixup_url(claim) for claim in clm_dict[self.P_source_code_repository]: if self.P_protocol in claim.qualifiers: self.debug(item, "IGNORE " + claim.getTarget() + " because it already has a protocol") continue target_protocol = self.guess_protocol(claim) if not target_protocol: self.error(item, claim.getTarget() + " misses a protocol qualifier") continue protocol = pywikibot.Claim(self.site, self.P_protocol, 0) protocol.setTarget(target_protocol) if not self.args.dry_run: claim.addQualifier(protocol, bot=True) - self.set_retrieved(item, claim) + self.set_point_in_time(item, claim) self.info(item, "SET protocol of " + claim.getTarget()) def guess_protocol_from_url(self, url): if 'github.com' in url: return self.Q_git if 'code.launchpad.net' in url: return self.Q_GNU_Bazaar if 'bitbucket.org' in url: return self.Q_git if url.lower().startswith('http'): known = ( 'http://bxr.su/', 'http://openbsd.su/', 'http://svn.tuxfamily.org/viewvc.cgi/', 'http://svn.filezilla-project.org/filezilla/', 'http://svn.gna.org/viewcvs/', 'http://svn.apache.org/viewvc/', 'http://svn.savannah.gnu.org/viewvc/?root=', ) if url.lower().replace('https', 'http').startswith(known): return self.Q_Hypertext_Transfer_Protocol if (re.match('https?://sourceforge.net/p/' '.*/(svn|code|code-0)/HEAD/tree/', url) or re.match('https?://sourceforge.net/p/' '.*?/.*?/ci/(default|master)/tree/', url) or re.match('https?://.*.codeplex.com/SourceControl', url)): return self.Q_Hypertext_Transfer_Protocol if url.startswith('git://'): return self.Q_git if url.startswith('svn://'): return self.Q_Apache_Subversion if url.startswith('ftp://'): return self.Q_File_Transfer_Protocol return None def verify_git(self, url): return util.sh_bool("timeout 30 git ls-remote " + url + " HEAD") def verify_hg(self, url): return util.sh_bool(""" set -e timeout 30 hg identify {url} """.format(url=url)) def verify_svn(self, url, credentials): if credentials: user = '--username=' + credentials[0] else: user = '' if credentials and len(credentials) > 1: password = '--password=' + credentials[1] else: password = '' return util.sh_bool(""" set -e timeout 30 svn info {url} {user} {password} """.format(url=url, user=user, password=password)) def verify_fossil(self, url): return util.sh_bool(""" set -e rm -fr /tmp/tmpclone timeout 30 fossil clone {url} /tmp/tmpclone | grep -q -m 1 -e 'Round-trips' """.format(url=url)) def verify_bzr(self, url): # # prefer branches over version-info because # it fails on https://golem.ph.utexas.edu/~distler/code/instiki/svn/ # with bzr: ERROR: https://golem.ph... is not a local path. # return util.sh_bool(""" set -e timeout 30 bzr branches {url} """.format(url=url)) def verify_ftp(self, url): return util.sh_bool(""" set -e timeout 30 lftp -e 'dir; quit' {url} """.format(url=url)) def verify_http(self, url): try: # # although head() would be more light weight, some # servers do not respond to it. For instance # https://src.openvz.org/projects/OVZ/ returned 405 # # The user agent is required for some servers. For # instance http://marabunta.laotracara.com/descargas/ # returns 406 if no User-Agent header is set. # r = requests.get(url, headers={'User-Agent': 'FLOSSbot'}, verify=False) log.debug("GET " + url + " status " + str(r.status_code)) if r.status_code != requests.codes.ok: log.debug("GET " + url + " " + r.text) return r.status_code == requests.codes.ok except Exception as e: log.debug("GET failed with " + str(e)) return False def verify_protocol(self, url, protocol, credentials): if protocol == self.Q_git: return self.verify_git(url) elif protocol == self.Q_Mercurial: return self.verify_hg(url) elif protocol == self.Q_Fossil: return self.verify_fossil(url) elif protocol == self.Q_GNU_Bazaar: return self.verify_bzr(url) elif protocol == self.Q_Apache_Subversion: return self.verify_svn(url, credentials) elif (protocol == self.Q_Hypertext_Transfer_Protocol or protocol == self.Q_HTTPS): return self.verify_http(url) elif protocol == self.Q_File_Transfer_Protocol: return self.verify_ftp(url) return None def try_protocol(self, url, credentials): if self.verify_git(url): return self.Q_git elif self.verify_hg(url): return self.Q_Mercurial elif self.verify_svn(url, credentials): return self.Q_Apache_Subversion elif self.verify_bzr(url): return self.Q_GNU_Bazaar elif self.verify_fossil(url): return self.Q_Fossil return None def get_credentials(self, repository): if self.P_website_username in repository.qualifiers: credentials = repository.qualifiers[self.P_website_username][0] credentials = credentials.getTarget().split(':') else: credentials = None return credentials def guess_protocol(self, repository): url = repository.getTarget() credentials = self.get_credentials(repository) protocol = self.guess_protocol_from_url(url) if protocol: if not self.verify_protocol(url, protocol, credentials): return None else: return protocol return self.try_protocol(url, credentials) def fixup_url(self, repository): url = repository.getTarget() new_url = None if url.startswith('https://git-wip-us.apache.org/repos/asf?p='): new_url = url.replace('?p=', '/') m = re.match('http://(?:bazaar|code).launchpad.net/' '~[^/]+/([^/]+)', url) if m: new_url = "https://code.launchpad.net/" + m.group(1) if new_url: self.info(repository, "REPLACE " + url + " with " + new_url) repository.changeTarget(new_url) return True else: return False def extract_repository(self, url): m = re.match('https://(.*).codeplex.com/SourceControl/latest', url) if m: return "https://git01.codeplex.com/" + m.group(1) m = re.match('https?://svn.apache.org/viewvc/(.*)', url) if m: return "https://svn.apache.org/repos/asf/" + m.group(1) m = re.match('http://svn.savannah.gnu.org/viewvc/\?root=(.*)', url) if m: return "svn://svn.sv.gnu.org/" + m.group(1) m = re.match('https://svn.tuxfamily.org/viewvc.cgi/(\w+)_(\w+)/', url) if m: return ("svn://svn.tuxfamily.org/svnroot/" + m.group(1) + "/" + m.group(2)) m = re.match('https?://svn.filezilla-project.org/filezilla/(.*)/', url) if m: return "https://svn.filezilla-project.org/svn/" + m.group(1) m = re.match('http://svn.gna.org/viewcvs/(.*)', url) if m: return "svn://svn.gna.org/svn/" + m.group(1) if re.match('https?://sourceforge.net/p/' '.*/(git|code|code-git)/ci/(default|master)/tree/', url): try: r = requests.get(url) if r.status_code != requests.codes.ok: return None u = re.findall('git clone (git://git.code.sf.net/p/.*/' '(?:git|code(?:-git)?))', r.text) if len(u) == 1: return u[0] u = re.findall('hg clone (http://hg.code.sf.net/p/.*/code)', r.text) if len(u) >= 1: return u[0] except requests.ConnectionError as e: pass if re.match('https?://sourceforge.net/p/' '.*?/.*?/ci/(default|master)/tree/', url): try: r = requests.get(url) if r.status_code != requests.codes.ok: return None u = re.findall('hg clone (http://hg.code.sf.net/p/.*?) ', r.text) if len(u) >= 1: return u[0] except requests.ConnectionError as e: pass if re.match('https?://sourceforge.net/p/' '.*/(svn|code|code-0)/HEAD/tree/', url): try: r = requests.get(url) if r.status_code != requests.codes.ok: return None u = re.findall('svn checkout (svn://svn.code.sf.net.*/trunk)', r.text) if len(u) == 1: return u[0] except requests.ConnectionError as e: pass return None diff --git a/tests/test_bot.py b/tests/test_bot.py index d28537e..24fe02f 100644 --- a/tests/test_bot.py +++ b/tests/test_bot.py @@ -1,148 +1,148 @@ # -*- mode: python; coding: utf-8 -*- # # Copyright (C) 2016 Loic Dachary # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import argparse import logging from datetime import date import pytest import pywikibot from FLOSSbot.bot import Bot from tests.wikidata import TestWikidata class TestBot(object): def setup_class(cls): logging.getLogger('FLOSSbot').setLevel(logging.DEBUG) TestWikidata().login() def test_lookup_item(self): bot = Bot(argparse.Namespace( test=True, user='FLOSSbotCI', )) assert 0 == len(bot.entities['item']) git = bot.Q_git assert 1 == len(bot.entities['item']) assert git == bot.Q_git assert bot.Q_Concurrent_Versions_System assert 2 == len(bot.entities['item']) def test_create_entity(self): bot = Bot(argparse.Namespace( test=True, user='FLOSSbotCI', )) item = bot.Q_git assert 1 == len(bot.entities['item']) bot.clear_entity_label(item.getID()) assert 0 == len(bot.entities['item']) item = bot.Q_git assert 1 == len(bot.entities['item']) property2datatype = { 'P_source_code_repository': 'url', 'P_website_username': 'string', 'P_protocol': 'wikibase-item', } wikidata_bot = Bot(argparse.Namespace( test=False, user=None, )) for (attr, datatype) in property2datatype.items(): bot.reset_cache() property = bot.__getattribute__(attr) assert 1 == len(bot.entities['property']) bot.clear_entity_label(property) assert 0 == len(bot.entities['property']) for i in range(120): if (bot.lookup_entity( attr, type='property') is None): break property = bot.__getattribute__(attr) assert 1 == len(bot.entities['property']) new_content = bot.site.loadcontent({'ids': property}, 'datatype') wikidata_property = wikidata_bot.__getattribute__(attr) wikidata_content = wikidata_bot.site.loadcontent( {'ids': wikidata_property}, 'datatype') assert (wikidata_content[wikidata_property]['datatype'] == new_content[property]['datatype']), attr assert (datatype == wikidata_content[wikidata_property]['datatype']), attr - def test_set_retrieved(self): + def test_set_point_in_time(self): bot = Bot(argparse.Namespace( test=True, user='FLOSSbotCI', dry_run=False, verification_delay=30, )) item = bot.__getattribute__('Q_' + TestWikidata.random_name()) claim = pywikibot.Claim(bot.site, bot.P_source_code_repository, 0) claim.setTarget("http://repo.com/some") item.addClaim(claim) - bot.set_retrieved(item, claim) + bot.set_point_in_time(item, claim) assert bot.need_verification(claim) is False - bot.set_retrieved(item, claim, date(1965, 11, 2)) + bot.set_point_in_time(item, claim, date(1965, 11, 2)) assert bot.need_verification(claim) is True bot.clear_entity_label(item.getID()) def test_search_entity(self): bot = Bot(argparse.Namespace( test=True, user='FLOSSbotCI', )) name = TestWikidata.random_name() entity = { "labels": { "en": { "language": "en", "value": name, } }, } first = bot.site.editEntity({'new': 'item'}, entity) first = pywikibot.ItemPage(bot.site, first['entity']['id'], 0) second = bot.site.editEntity({'new': 'item'}, entity) second = pywikibot.ItemPage(bot.site, second['entity']['id'], 0) with pytest.raises(ValueError) as e: bot.search_entity(bot.site, name, type='item') assert "found multiple items" in str(e.value) claim = pywikibot.Claim(bot.site, bot.P_instance_of, 0) claim.setTarget(bot.Q_Wikimedia_disambiguation_page) first.addClaim(claim) found = bot.search_entity(bot.site, name, type='item') assert found.getID() == second.getID() bot.site.editEntity({'new': 'item'}, entity) with pytest.raises(ValueError) as e: bot.search_entity(bot.site, name, type='item') assert "found multiple items" in str(e.value) Bot.authoritative['test'][name] = second.getID() found = bot.search_entity(bot.site, name, type='item') assert found.getID() == second.getID()