# Get all revisions that are redirects

For a given wiki we want to find all revisions that are redirects.
We query [wmf.mediawiki_wikitext_history](https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Content/XMLDumps/Mediawiki_wikitext_history).

How:
- get all redirect-aliases
 - for any wiki we can extract all redirect-aliases from '%s-latest-siteinfo-namespaces.json.gz'
- query all revision-ids that are redirects
 - we look at the column 'revision_text'
 - we check whether the it starts with the string '#redirect [[*]]' or any of its aliases using sql's LIKE command
 - we only consider lower-case strings (syntax agnostic to capitalization)
- regexp-extract to get redirect-page from text-field

In [11]:
import os
from pyspark.sql.functions import col,regexp_extract ## for extracting redirect-page
from redirect import get_redirect_aliases ## for getting redirect-aliases

In [93]:
## Which wiki
wiki_date = '2019-07'
wiki_name = 'frwiki'

In [94]:
## select the wmf table
sqlContext.sql('USE wmf')

## we look up the json of the latest siteinfo-namespaces file
filename = os.path.join('/mnt','data','xmldatadumps','public','%s'%wiki_name,'latest','%s-latest-siteinfo-namespaces.json.gz'%wiki_name)
list_redirect_aliases = get_redirect_aliases(filename)
print(list_redirect_aliases)

## construct the string-matching condition
str_revision_redirect_match = '('
for i_redirect_alias, redirect_alias in enumerate(list_redirect_aliases):
 if i_redirect_alias > 0:
 str_revision_redirect_match += ' OR '
 str_revision_redirect_match += 'LOWER(revision_text) LIKE "%s [[%%]]%%"'%(redirect_alias.lower())
str_revision_redirect_match += ')'
# print(str_revision_redirect_match)

['#REDIRECT', '#REDIRECTION']
(LOWER(revision_text) LIKE "#redirect [[%]]%" OR LOWER(revision_text) LIKE "#redirection [[%]]%")


In [95]:
## query all revision which start with '#redirect [[*]]' or one of its redirect-aliases

# query with limit and without ordering
query = 'SELECT page_id, revision_id, \
substring_index(revision_text," [[",1) as redirect_command, \
substring(revision_text, locate("[[",revision_text) +2,locate("]]",revision_text) - locate("[[",revision_text)-2) as redirect_page_title \
FROM wmf.mediawiki_wikitext_history \
WHERE snapshot="%s" \
AND wiki_db="%s" \
AND %s'\
%(wiki_date,wiki_name,str_revision_redirect_match)
print(query)
result = sqlContext.sql(query)

SELECT page_id, revision_id, substring_index(revision_text," [[",1) as redirect_command, substring(revision_text, locate("[[",revision_text) +2,locate("]]",revision_text) - locate("[[",revision_text)-2) as redirect_page_title FROM wmf.mediawiki_wikitext_history WHERE snapshot="2019-07" AND wiki_db="frwiki" AND (LOWER(revision_text) LIKE "#redirect [[%]]%" OR LOWER(revision_text) LIKE "#redirection [[%]]%")


## Check whether we can make the queries

In [96]:
%%time
result.show(10)

+--------+-----------+----------------+--------------------+
| page_id|revision_id|redirect_command| redirect_page_title|
+--------+-----------+----------------+--------------------+
| 231994| 3775427| #redirect| Guépard|
| 2936717| 39899757| #REDIRECT| Rodolfo Siviero|
| 3138668| 31660615| #REDIRECT|Discuter:Dorothy ...|
| 4034735| 84625169| #REDIRECTION|Patrimoine cultur...|
| 5566576| 66821896| #REDIRECTION|Liste des ancienn...|
| 5576417| 66999144| #REDIRECTION|Aide:Comment défe...|
| 6807454| 86741693| #REDIRECTION| Kaartin kasarmi|
| 7097745| 92657854| #REDIRECTION| Viktor & Rolf|
| 7099859| 92688351| #REDIRECTION| Marcien (juriste)|
|10677616| 134963025| #REDIRECTION|Argyll and Bute (...|
+--------+-----------+----------------+--------------------+
only showing top 10 rows

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 18.3 s


In [97]:
%%time
result.count()

Py4JJavaError: An error occurred while calling o395.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8629 in stage 27.0 failed 4 times, most recent failure: Lost task 8629.3 in stage 27.0 (TID 10681, an-worker1088.eqiad.wmnet, executor 394): ExecutorLostFailure (executor 394 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.9 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
