import datetime
import functools
from IPython.core.display import display, HTML, Markdown
import matplotlib.pyplot as plt
def displayHtml(self, limit=20):
display(HTML(self.limit(limit).toPandas().to_html(escape=False,index=False)))
DataFrame.displayHtml = displayHtml
filter_year = lambda df, v: df.where(F.col('year')==v)
filter_month = lambda df, v: df.where(F.col('month')==v)
filter_day = lambda df, v: df.where(F.col('day')==v)
filter_hour = lambda df, v: df.where(F.col('hour')==v)
def filter_date_pattern(df, date_pattern):
date = [int(v) for v in date_pattern.split('/')]
df = filter_year(df, date[0])
if len(date) >= 2:
df = filter_month(df, date[1])
if len(date) >= 3:
df = filter_day(df, date[2])
return df
event_revisions = spark.sql(f"""select * from event.mediawiki_revision_create""")
snapshot='2021-02'
mediawiki_revisions = spark.sql(f"""
select
rev_id,
rev_page,
rev_text_id,
rev_comment,
rev_user,
rev_user_text,
rev_timestamp,
rev_minor_edit,
rev_deleted,
rev_len,
rev_parent_id,
rev_sha1,
rev_content_model,
rev_content_format,
rev_actor,
rev_comment_id,
wiki_db,
false as archived
from wmf_raw.mediawiki_revision
where snapshot = '{snapshot}'
union all
select
ar_rev_id as rev_id,
ar_page_id as rev_page,
ar_text_id as rev_text_id,
ar_comment as rev_comment,
ar_user as rev_user,
ar_user_text as rev_user_text,
ar_timestamp as rev_timestamp,
ar_minor_edit as rev_minor_edit,
ar_deleted as rev_deleted,
ar_len as rev_len,
ar_parent_id as rev_parent_id,
ar_sha1 as rev_sha1,
ar_content_model as rev_content_model,
ar_content_format as rev_content_format,
ar_actor as rev_actor,
ar_comment_id as rev_comment_id,
wiki_db,
true as archived
from wmf_raw.mediawiki_archive
where snapshot = '{snapshot}'""")
# unused fields in archived table
# ar_namespace,
# ar_title,
# ar_text,
# ar_flags,
pattern = '2021/02'
events = filter_date_pattern(event_revisions, pattern)
revisions = mediawiki_revisions.where(F.col('rev_timestamp').like(f"{pattern.replace('/','')}%"))
#%%
# filter for mediawiki revisions that are not present in the kafka revision event streams
joined_mediakiki_revisions = (revisions
.join(events, on=['rev_id'], how='left')
.withColumn('missing_event', F.col('event.mediawiki_revision_create.rev_timestamp').isNull())
.withColumn('timestamp', F.to_timestamp('wmf_raw.mediawiki_revision.rev_timestamp', 'yyyyMMddHHmmss'))
).cache()
missing_mediakiki_revisions = joined_mediakiki_revisions.where(F.col('missing_event'))
def count_missing(df):
return (df
.agg(
F.sum(F.col('missing_event').cast('int')).alias('missing'),
F.count('*').alias('total')
)
.withColumn('missing %', F.col('missing') / F.col('total') * 100))
display(Markdown('## Missing kafka events present in mediawiki revisions'))
display(Markdown('### All projects'))
count_missing(joined_mediakiki_revisions).displayHtml()
display(Markdown('#### Without wikidata'))
count_missing(joined_mediakiki_revisions.where(F.col('wiki_db')!='wikidatawiki')).displayHtml()
display(Markdown('### Project '))
by_project = count_missing(joined_mediakiki_revisions.groupBy(['wiki_db']))
display(Markdown('#### Top by missing events'))
(by_project
.orderBy('missing',ascending=False)
.displayHtml(10))
display(Markdown('#### Top by total revisions'))
(by_project
.orderBy('total',ascending=False)
.displayHtml(10))
display(Markdown('#### Top by percentage of events missing'))
(by_project
.where(F.col('total')>1000)
.orderBy('missing %',ascending=False)
.displayHtml(10))
display(Markdown('### Minor edits '))
(count_missing(joined_mediakiki_revisions.where(F.col('wiki_db')!='wikidatawiki').groupBy(['wmf_raw.mediawiki_revision.rev_minor_edit']))
.orderBy('missing %',ascending=False)
.displayHtml())
display(Markdown('#### No parent revision '))
parent_0_col = (F.col('wmf_raw.mediawiki_revision.rev_parent_id')==0).alias('rev_parent_is_0')
(count_missing(joined_mediakiki_revisions.where(F.col('wiki_db')!='wikidatawiki').groupBy([parent_0_col]))
.orderBy('missing %',ascending=False)
.displayHtml())
(count_missing(joined_mediakiki_revisions.where(F.col('wiki_db')!='wikidatawiki').groupBy(['wmf_raw.mediawiki_revision.rev_minor_edit', parent_0_col]))
.orderBy('missing %',ascending=False)
.displayHtml())
display(Markdown('#### Archived'))
(count_missing(joined_mediakiki_revisions.where(F.col('wiki_db')!='wikidatawiki').groupBy(['archived']))
.orderBy('missing %',ascending=False)
.displayHtml())
(count_missing(joined_mediakiki_revisions.where(F.col('wiki_db')!='wikidatawiki').groupBy(['wmf_raw.mediawiki_revision.rev_minor_edit', 'archived']))
.orderBy('missing %',ascending=False)
.displayHtml())
(count_missing(joined_mediakiki_revisions.where(F.col('wiki_db')!='wikidatawiki').groupBy(['archived',parent_0_col]))
.orderBy('missing %',ascending=False)
.displayHtml())
top_wikis = ['commonswiki','enwiki','frwiki','dewiki','eswiki','zhwiki','itwiki','ruwiki','arzwiki']
groupby_cols = (joined_mediakiki_revisions
.select([
'missing_event',
'wiki_db',
'wmf_raw.mediawiki_revision.rev_minor_edit',
'archived',
'wmf_raw.mediawiki_revision.rev_page',
(F.unix_timestamp('timestamp')/60/60).cast('int').alias('hour_bucket'),
F.hour('timestamp').alias('hour'),
F.minute('timestamp').alias('minute'),
(F.col('wmf_raw.mediawiki_revision.rev_parent_id')==0).alias('rev_parent_is_0')])
.where(F.col('wiki_db')!='wikidatawiki')
.where(F.col('wiki_db').isin(top_wikis)))
def named_col(col):
name_udf = F.udf(lambda xx: f'{col} = {xx}', 'string')
return name_udf(col)
def ts_plot(xbucket, pivot_on):
to_plot = (groupby_cols
.withColumn(pivot_on, named_col(pivot_on))
.groupby(xbucket)
.pivot(pivot_on)
.agg(
F.sum(F.col('missing_event').cast('int')) / F.count('*') * 100
)
.orderBy(xbucket))
to_plot.toPandas().plot(title='% missing kafka events',x=xbucket)
display(Markdown('## Timeseries hourly buckets'))
plt.rcParams["figure.figsize"] = (20,15)
ts_plot('hour_bucket', 'rev_minor_edit')
ts_plot('hour_bucket', 'archived')
ts_plot('hour_bucket', 'rev_parent_is_0')
ts_plot('hour_bucket', 'wiki_db')
plt.rcParams["figure.figsize"] = (6,4)
display(Markdown('### hour of day, minute of hour)'))
ts_plot('hour', 'rev_minor_edit')
ts_plot('minute', 'rev_minor_edit')
ts_plot('hour', 'archived')
ts_plot('minute', 'archived')
ts_plot('hour', 'rev_parent_is_0')
ts_plot('minute', 'rev_parent_is_0')