spark.sql("ADD JAR hdfs://analytics-hadoop/wmf/refinery/current/artifacts/refinery-hive.jar").collect() spark.sql("CREATE TEMPORARY FUNCTION get_main_search_request AS 'org.wikimedia.analytics.refinery.hive.GetMainSearchRequestUDF'").collect() import datetime date_start = datetime.datetime(2019, 9, 10, 10) date_end = datetime.datetime(2019, 9, 10, 20) max_res = 2 wiki = 'enwiki' sample_size = 5000 max_q_by_day = 30 random_seed = 42 from pyspark.sql import functions as F, types as T, Window import calendar ts_start = calendar.timegm(date_start.timetuple()) ts_end = calendar.timegm(date_end.timetuple()) row_timestamp = F.unix_timestamp(F.concat( F.col('year'), F.lit('-'), F.col('month'), F.lit('-'), F.col('day'), F.lit(' '), F.col('hour'), F.lit(':00:00'))) w = Window.partitionBy(F.col('http.client_ip'), F.col('year'), F.col('month'), F.col('day')) w = Window.partitionBy(F.col('http.client_ip'), F.col('year'), F.col('month'), F.col('day')) df = ( spark.read.table('event.mediawiki_cirrussearch_request') .where(row_timestamp > ts_start) .where(row_timestamp < ts_end) .where(F.col('elasticsearch_requests.query_type')[0] == 'near_match') .withColumn('areq', F.expr('get_main_search_request(database, elasticsearch_requests)')) .where(F.col('areq').isNotNull()) .withColumn('q_by_day', F.count(F.lit(1)).over(w)) .where(F.col('q_by_day') <= max_q_by_day) .where(F.col('database') == 'enwiki') .groupBy(F.col('http.client_ip'), F.col('year'), F.col('month'), F.col('day')) .agg(F.collect_list(F.struct('database', 'areq.query', 'areq.hits_total', 'areq.suggestion')).alias('ip_requests')) .withColumn('random_req_index', F.floor(F.rand(random_seed) * F.size(F.col('ip_requests')))) .select(F.col('ip_requests')[F.col('random_req_index')].alias('req')) .select(F.col('req.query')) ) df.show(5)