{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Get all revisions that are redirects\n", "\n", "For a given wiki we want to find all revisions that are redirects.\n", "We query [wmf.mediawiki_wikitext_history](https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Content/XMLDumps/Mediawiki_wikitext_history).\n", "\n", "How:\n", "- get all redirect-aliases\n", " - for any wiki we can extract all redirect-aliases from '%s-latest-siteinfo-namespaces.json.gz'\n", "- query all revision-ids that are redirects\n", " - we look at the column 'revision_text'\n", " - we check whether the it starts with the string '#redirect [[*]]' or any of its aliases using sql's LIKE command\n", " - we only consider lower-case strings (syntax agnostic to capitalization)\n", "- regexp-extract to get redirect-page from text-field" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "import os\n", "from pyspark.sql.functions import col,regexp_extract ## for extracting redirect-page\n", "from redirect import get_redirect_aliases ## for getting redirect-aliases" ] }, { "cell_type": "code", "execution_count": 93, "metadata": {}, "outputs": [], "source": [ "## Which wiki\n", "wiki_date = '2019-07'\n", "wiki_name = 'frwiki'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": 94, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['#REDIRECT', '#REDIRECTION']\n", "(LOWER(revision_text) LIKE \"#redirect [[%]]%\" OR LOWER(revision_text) LIKE \"#redirection [[%]]%\")\n" ] } ], "source": [ "## select the wmf table\n", "sqlContext.sql('USE wmf')\n", "\n", "## we look up the json of the latest siteinfo-namespaces file\n", "filename = os.path.join('/mnt','data','xmldatadumps','public','%s'%wiki_name,'latest','%s-latest-siteinfo-namespaces.json.gz'%wiki_name)\n", "list_redirect_aliases = get_redirect_aliases(filename)\n", "print(list_redirect_aliases)\n", "\n", "## construct the string-matching condition\n", "str_revision_redirect_match = '('\n", "for i_redirect_alias, redirect_alias in enumerate(list_redirect_aliases):\n", " if i_redirect_alias > 0:\n", " str_revision_redirect_match += ' OR '\n", " str_revision_redirect_match += 'LOWER(revision_text) LIKE \"%s [[%%]]%%\"'%(redirect_alias.lower())\n", "str_revision_redirect_match += ')'\n", "# print(str_revision_redirect_match)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": 95, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "SELECT page_id, revision_id, substring_index(revision_text,\" [[\",1) as redirect_command, substring(revision_text, locate(\"[[\",revision_text) +2,locate(\"]]\",revision_text) - locate(\"[[\",revision_text)-2) as redirect_page_title FROM wmf.mediawiki_wikitext_history WHERE snapshot=\"2019-07\" AND wiki_db=\"frwiki\" AND (LOWER(revision_text) LIKE \"#redirect [[%]]%\" OR LOWER(revision_text) LIKE \"#redirection [[%]]%\")\n" ] } ], "source": [ "## query all revision which start with '#redirect [[*]]' or one of its redirect-aliases\n", "\n", "# query with limit and without ordering\n", "query = 'SELECT page_id, revision_id, \\\n", "substring_index(revision_text,\" [[\",1) as redirect_command, \\\n", "substring(revision_text, locate(\"[[\",revision_text) +2,locate(\"]]\",revision_text) - locate(\"[[\",revision_text)-2) as redirect_page_title \\\n", "FROM wmf.mediawiki_wikitext_history \\\n", "WHERE snapshot=\"%s\" \\\n", "AND wiki_db=\"%s\" \\\n", "AND %s'\\\n", "%(wiki_date,wiki_name,str_revision_redirect_match)\n", "print(query)\n", "result = sqlContext.sql(query)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Check whether we can make the queries" ] }, { "cell_type": "code", "execution_count": 96, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-----------+----------------+--------------------+\n", "| page_id|revision_id|redirect_command| redirect_page_title|\n", "+--------+-----------+----------------+--------------------+\n", "| 231994| 3775427| #redirect| Guépard|\n", "| 2936717| 39899757| #REDIRECT| Rodolfo Siviero|\n", "| 3138668| 31660615| #REDIRECT|Discuter:Dorothy ...|\n", "| 4034735| 84625169| #REDIRECTION|Patrimoine cultur...|\n", "| 5566576| 66821896| #REDIRECTION|Liste des ancienn...|\n", "| 5576417| 66999144| #REDIRECTION|Aide:Comment défe...|\n", "| 6807454| 86741693| #REDIRECTION| Kaartin kasarmi|\n", "| 7097745| 92657854| #REDIRECTION| Viktor & Rolf|\n", "| 7099859| 92688351| #REDIRECTION| Marcien (juriste)|\n", "|10677616| 134963025| #REDIRECTION|Argyll and Bute (...|\n", "+--------+-----------+----------------+--------------------+\n", "only showing top 10 rows\n", "\n", "CPU times: user 4 ms, sys: 4 ms, total: 8 ms\n", "Wall time: 18.3 s\n" ] } ], "source": [ "%%time\n", "result.show(10)" ] }, { "cell_type": "code", "execution_count": 97, "metadata": {}, "outputs": [ { "ename": "Py4JJavaError", "evalue": "An error occurred while calling o395.count.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8629 in stage 27.0 failed 4 times, most recent failure: Lost task 8629.3 in stage 27.0 (TID 10681, an-worker1088.eqiad.wmnet, executor 394): ExecutorLostFailure (executor 394 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.9 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:938)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)\n\tat org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)\n\tat org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)\n\tat org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)\n\tat org.apache.spark.sql.Dataset.count(Dataset.scala:2769)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\n", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mget_ipython\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrun_cell_magic\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'time'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m''\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'result.count()'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[0;32m/usr/lib/python3/dist-packages/IPython/core/interactiveshell.py\u001b[0m in \u001b[0;36mrun_cell_magic\u001b[0;34m(self, magic_name, line, cell)\u001b[0m\n\u001b[1;32m 2113\u001b[0m \u001b[0mmagic_arg_s\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvar_expand\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mline\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstack_depth\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2114\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mbuiltin_trap\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 2115\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfn\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmagic_arg_s\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcell\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 2116\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2117\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m\u001b[0m in \u001b[0;36mtime\u001b[0;34m(self, line, cell, local_ns)\u001b[0m\n", "\u001b[0;32m/usr/lib/python3/dist-packages/IPython/core/magic.py\u001b[0m in \u001b[0;36m\u001b[0;34m(f, *a, **k)\u001b[0m\n\u001b[1;32m 186\u001b[0m \u001b[0;31m# but it's overkill for just that one bit of state.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 187\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mmagic_deco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0marg\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 188\u001b[0;31m \u001b[0mcall\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mlambda\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mk\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mk\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 189\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 190\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mcallable\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0marg\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/usr/lib/python3/dist-packages/IPython/core/magics/execution.py\u001b[0m in \u001b[0;36mtime\u001b[0;34m(self, line, cell, local_ns)\u001b[0m\n\u001b[1;32m 1174\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mmode\u001b[0m\u001b[0;34m==\u001b[0m\u001b[0;34m'eval'\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1175\u001b[0m \u001b[0mst\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mclock2\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1176\u001b[0;31m \u001b[0mout\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0meval\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcode\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mglob\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mlocal_ns\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1177\u001b[0m \u001b[0mend\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mclock2\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1178\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m()\u001b[0m\n", "\u001b[0;32m/usr/lib/spark2/python/pyspark/sql/dataframe.py\u001b[0m in \u001b[0;36mcount\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 453\u001b[0m \u001b[0;36m2\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 454\u001b[0m \"\"\"\n\u001b[0;32m--> 455\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jdf\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 456\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 457\u001b[0m \u001b[0;34m@\u001b[0m\u001b[0mignore_unicode_prefix\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/usr/lib/spark2/python/lib/py4j-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1255\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1256\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1257\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1258\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1259\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/usr/lib/spark2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 61\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 65\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/usr/lib/spark2/python/lib/py4j-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 326\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 327\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 328\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 329\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 330\u001b[0m raise Py4JError(\n", "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o395.count.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8629 in stage 27.0 failed 4 times, most recent failure: Lost task 8629.3 in stage 27.0 (TID 10681, an-worker1088.eqiad.wmnet, executor 394): ExecutorLostFailure (executor 394 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.9 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:938)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)\n\tat org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)\n\tat org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)\n\tat org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)\n\tat org.apache.spark.sql.Dataset.count(Dataset.scala:2769)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\n" ] } ], "source": [ "%%time\n", "result.count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark - YARN (large)", "language": "python", "name": "spark_yarn_pyspark_large" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.3" } }, "nbformat": 4, "nbformat_minor": 2 }