User:DeltaBot/source/noclaims

From Wikidata
Jump to navigation Jump to search
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# licensed under CC-Zero: https://creativecommons.org/publicdomain/zero/1.0

from dataclasses import dataclass, field
from json.decoder import JSONDecodeError
from os.path import expanduser
from time import sleep, strftime
from typing import Generator

import mariadb
import pywikibot as pwb
import requests


# pywikibot config
SITE = pwb.Site('wikidata', 'wikidata')
REPO = SITE.data_repository()
EDIT_SUMMARY = 'Bot: Updating database report'

# wdqs config
WDQS_ENDPOINT = 'https://query.wikidata.org/bigdata/namespace/wdq/sparql'
WDQS_USERAGENT = f'{requests.utils.default_user_agent()} (noclaims.py via User:DeltaBot at Wikidata; mailto:tools.deltabot@toolforge.org)'
WD = 'http://www.wikidata.org/entity/'

# Wikidata replica config
DB_DEFAULT_FILE = f'{expanduser("~")}/replica.my.cnf'
WIKIDATA_REPLICA_HOST = 'wikidatawiki.analytics.db.svc.wikimedia.cloud'
WIKIDATA_REPLICA_DB = 'wikidatawiki_p'

# templates for output
HEADER = 'A list of items with a sitelink to {lang_code} but without any statements. Data as of <only' + 'include>{timestamp}</onlyinclude>.\n\n'
TABLE_ROW = '* [[{qid}]] - [[{interwiki_prefix}{page_title}]]\n'
TABLE_ROW_OVERVIEW = '{{{{TR otherreport|{dbname}|{lang}|{family}|{url}|{iw}|{ps_lang}|{ps_family}|{cnt_without}|{idx}|{cnt}}}}}\n'
HEADER_OVERVIEW = '{{{{Wikidata:Database reports/without claims by site/header|{timestamp}}}}}\n'
FOOTER_OVERVIEW = '{{Wikidata:Database reports/without claims by site/footer}} __NOINDEX__'

# queries
QUERY_1 = """SELECT
    CONVERT(ips_site_id USING utf8) AS dbname,
    COUNT(*) AS cnt
FROM
    wb_items_per_site
GROUP BY
    ips_site_id
ORDER BY
    cnt DESC"""

QUERY_2 = """SELECT
    COUNT(*) AS cnt
FROM
    page_props,
    wb_items_per_site,
    page
WHERE
    pp_sortkey=0
    AND pp_propname='wb-claims'
    AND pp_page=page_id
    AND CONCAT("Q", ips_item_id)=page_title
    AND page_namespace=0
    AND ips_site_id=%(dbname)s"""

SPARQL_QUERY = """SELECT ?item ?lemma WHERE {{
  ?sitelink schema:about ?item;
            schema:isPartOf <{url}/>;
            schema:name ?lemma .
  ?item wikibase:statements "0"^^xsd:integer .
}} ORDER BY DESC(xsd:integer(SUBSTR(STR(?item), STRLEN("http://www.wikidata.org/entity/Q")+1))) LIMIT 1000"""


INTERWIKI_MAP = {
    'wikipedia' : '',
    'wikibooks' : 'b:',
    'wikiquote' : 'q:',
    'wiktionary' : 'wikt:',
    'wikinews' : 'n:',
    'wikisource' : 's:',
    'wikiversity' : 'v:',
    'wikivoyage' : 'voy:',
    'wikidata' : 'd:',
    'commons' : 'c:',
    'meta' : 'm:',
    'species' : 'species:',
}


@dataclass
class Project:
    dbname:str

    # from meta_p.wiki
    lang:str = field(init=False)
    family:str = field(init=False)
    url:str = field(init=False)

    # for petscan
    ps_params:tuple = field(init=False)

    # for interwikilinks
    interwiki_prefix:str = field(init=False)


    def __post_init__(self) -> None:
        if self.dbname in [ 'mediawikiwiki', 'sourceswiki', 'ruwikimedia', 'sewikimedia', 'incubatorwiki', 'wikimaniawiki', 'outreachwiki' ]:
            raise RuntimeWarning(f'Project {self.dbname} is being ignored')

        self._init_meta_params()
        self.ps_params = self._init_petscan_params()
        self.interwiki_prefix = self._init_interwiki_prefix()


    def _init_meta_params(self) -> None:
        query = """SELECT lang, family, url FROM wiki WHERE is_closed=0 AND has_wikidata=1 AND dbname=%(dbname)s LIMIT 1"""
        with Replica('meta.analytics.db.svc.wikimedia.cloud', 'meta_p') as (_, cur):
            cur.execute(query, {'dbname' : self.dbname })
            if cur.rowcount != 1:
                raise RuntimeError(f'Found {cur.rowcount} results for dbname {self.dbname}')
            result = cur.fetchall()

        self.lang = result[0].get('lang')  # TODO: what about nb -> no?
        self.family = result[0].get('family')
        self.url = result[0].get('url')


    def _init_petscan_params(self) -> tuple[str, str]:
        if self.family in [ 'wikipedia', 'wikibooks', 'wikiquote', 'wiktionary', 'wikinews', 'wikisource', 'wikiversity', 'wikivoyage' ]:
            return (self.lang, self.family)

        if self.dbname=='wikidatawiki':
            return ('wikidata', 'wikimedia')

        if self.dbname=='specieswiki':
            return ('species', 'wikimedia')

        if self.dbname=='commonswiki':
            return ('commons', 'wikimedia')

        if self.dbname=='metawiki':
            return ('meta', 'wikimedia')

        raise RuntimeError(f'No petscan parameters found for project {self.dbname}')


    def _init_interwiki_prefix(self) -> str:
        if self.family in [ 'wikipedia', 'wikibooks', 'wikiquote', 'wiktionary', 'wikinews', 'wikisource', 'wikiversity', 'wikivoyage' ]:
            return f':{INTERWIKI_MAP.get(self.family, "")}{self.lang}:'

        if self.dbname == 'wikidatawiki':
            return ''

        if self.dbname == 'specieswiki':
            return ':species:'

        if self.dbname == 'commonswiki':
            return ':c:'

        if self.dbname == 'metawiki':
            return ':m:'

        raise RuntimeError(f'No interwiki prefix found for project {self.dbname}')


class Replica:
    def __init__(self, host:str=WIKIDATA_REPLICA_HOST, dbname:str=WIKIDATA_REPLICA_DB) -> None:
        self.connection = mariadb.connect(
            host=host,
            database=dbname,
            default_file=DB_DEFAULT_FILE,
        )
        self.cursor = self.connection.cursor(dictionary=True)

    def __enter__(self):
        return (self.connection, self.cursor)

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.cursor.close()
        self.connection.close()


def query_wdqs(query:str, *, retry_credit:int=3) -> dict:
    if retry_credit <= 0:
        raise RuntimeError('No credit left for querying')

    response = requests.post(
        url=WDQS_ENDPOINT,
        data={
            'query' : query,
            'format' : 'json',
        },
        headers={
            'Accept' : 'application/sparql-results+json',
            'User-Agent': WDQS_USERAGENT,
        }
    )

    try:
        payload = response.json()
    except JSONDecodeError as exception:
        if response.elapsed.total_seconds() > 55 and retry_credit-1 > 0:
            sleep(60)
            return query_wdqs(query, retry_credit=retry_credit-1)

        raise RuntimeError('Cannot parse result from SPARQL endpoint') from exception

    return payload


def query_wdqs_generator(query:str) -> Generator[dict, None, None]:
    data = query_wdqs(query)

    for row in data.get('results', {}).get('bindings', []):
        yield row


def make_report(project:Project) -> str:
    text = ''

    for row in query_wdqs_generator(SPARQL_QUERY.format(url=project.url)):
        qid = row.get('item', {}).get('value', '').replace(WD, '')
        page_title = row.get('lemma', {}).get('value', '')
        text += TABLE_ROW.format(
            qid=qid,
            interwiki_prefix=project.interwiki_prefix,
            page_title=page_title
        )

    return text


def make_overview() -> tuple[str, int, int]:
    with Replica() as (_, cur):
        cur.execute(QUERY_1)
        results = cur.fetchall()

    idx = 0
    maxvalue = 0
    text = ''

    for row in results:
        dbname = row.get('dbname')
        cnt = row.get('cnt')
        if dbname is None or cnt is None:
            continue

        print(dbname, cnt)
        try:
            project = Project(dbname)
        except (RuntimeError, RuntimeWarning) as exception:
            print(exception)
            continue

        idx += 1
        with Replica() as (_, cur):
            cur.execute(QUERY_2, { 'dbname' : dbname })

            for cnt_without in cur:
                cnt = cnt_without.get('cnt')
                if cnt is None:
                    continue

                if cnt > maxvalue:
                    maxvalue = cnt

                text += TABLE_ROW_OVERVIEW.format(
                    dbname=dbname,
                    lang=project.lang,
                    family=project.family,
                    url=project.url,
                    iw=project.interwiki_prefix,
                    ps_lang=project.ps_params[0],
                    ps_family=project.ps_params[1],
                    cnt_without=cnt,
                    idx=idx,
                    cnt=cnt
                )

    return text, idx, maxvalue


def main():
    for dbname in [ 'dewiki', 'enwiki', 'eowiki', 'etwiki', 'frwiki', 'jawiki', 'nlwiki', 'ptwiki', 'ruwiki', 'svwiki', 'huwiki', 'simplewiki' ]:
        project = Project(dbname)

        report = make_report(project)
        text = f'{HEADER.format(lang_code=project.lang, timestamp=strftime("%Y-%m-%d %H:%M (%Z)"))}{report}'

        page = pwb.Page(SITE, f'Wikidata:Database reports/without claims by site/{dbname}')
        page.text = text
        page.save(summary=EDIT_SUMMARY, minor=False)


    report, idx, max_value = make_overview()
    stat = f'{{{{DR otherreport|max={max_value}|reportlength={idx}}}}}\n'
    text = stat + HEADER_OVERVIEW.format(timestamp=strftime("%Y-%m-%d %H:%M (%Z)")) + report + FOOTER_OVERVIEW
    summary = f'Bot: Updating database report: reportlength: {idx}; max: {max_value}'

    page = pwb.Page(SITE, 'Wikidata:Database reports/without claims by site')
    page.text = text
    page.save(summary=summary, minor=False)


if __name__=='__main__':
    main()