Aim:
For a given time-window we construct reading sessions in which users register a new account.
How:
We identify unique users by a standard fingerprint-technique by hashing client_ip and user_agent for desktop (and potentially mobile) users.
We first extract all users that made at least 1 request to a 'Special:CreateAccount' page.
We then filter those users for which the sesssion followed a pattern which suggests a registration-event:
For an example-case (1 hour):
# %load_ext autoreload
# %autoreload 2
import os, sys
import numpy as np
import datetime
from pyspark.sql import functions as F, types as T, Window
import calendar
import time
date_start = datetime.datetime(2019, 7, 15, 10)
date_end = datetime.datetime(2019, 7, 15, 11)
ts_start = calendar.timegm(date_start.timetuple())
ts_end = calendar.timegm(date_end.timetuple())
row_timestamp = F.unix_timestamp(F.concat(
F.col('year'), F.lit('-'), F.col('month'), F.lit('-'), F.col('day'),
F.lit(' '), F.col('hour'), F.lit(':00:00')))
# hash for user-fingerprinting
user_id = F.hash(F.concat(F.col('client_ip'),F.lit('-'),F.col('user_agent'))) ## only client and user
## seems to be the default way https://meta.wikimedia.org/wiki/Research:Unique_Devices/Other_Possible_Implementations#Fingerprinting
# user_id = F.hash(F.concat(F.col('client_ip'),F.lit('-'),F.col('user_agent'),F.lit('-'),F.col('accept_language')))
# whether request came from loggIn user: 0/1
logged_in = F.when(F.col('x_analytics_map.loggedIn')==1,1).otherwise(0)
# whether page == Special:CreateAccount
page_create_account = F.when(F.col('pageview_info.page_title')=='Special:CreateAccount',1).otherwise(0)
w = Window.partitionBy(F.col('user_id'))
w_user_ts = Window.partitionBy().orderBy(F.col('user_id'),F.col('ts'))
# maximum number of requests per session of an individual
n_p_max = 500
## dataframe with all webrequest of all users that visited at least once a createaccount page
df = (
## select table
spark.read.table('wmf.webrequest')
## user-hash as user_id
.withColumn('user_id',user_id)
.withColumn('logged_in',logged_in)
.withColumn('page_create_account',page_create_account)
.withColumn('page_title',F.col('pageview_info.page_title'))
## select time partition
.where(row_timestamp >= ts_start)
.where(row_timestamp < ts_end)
## select wiki project
.where(F.col('normalized_host.project_class') == "wikipedia")
.where(F.col('normalized_host.project') == "en")
## only requests marked as pageviews
.where(F.col('is_pageview') == 1)
## agent-type
.where(F.col('agent_type') == "user")
## user: desktop/mobile/mobile app; isaac filters != mobile app
.where(F.col('access_method') == "desktop")
## only traffic from inside wiki
# .where(F.col('referer_class') == "internal")
## unclear yet; done by isaac
.where(F.col('webrequest_source') == 'text')
## count requests per user
## n_pca_by_user: number of requests to page-create-account
## n_p_by_user: number of requests to pages
.withColumn('n_pca_by_user', F.sum( F.col('page_create_account') ).over(w) )
.withColumn('n_p_by_user', F.count(F.lit(1)).over(w) )
## filter:
## ## user rquested at least 1 page view for create account
.where(F.col('n_pca_by_user') >= 1 )
## ## user requested at most n_p_max pages in total
.where(F.col('n_p_by_user') <= n_p_max)
.orderBy('user_id','ts')
## puts a 1 if next webrequest is also
.withColumn('logged_in_next',
F.when( F.col('user_id') == F.lag('user_id',count=-1).over(w_user_ts), F.lag('logged_in',count=-1).over(w_user_ts)).otherwise(0) )
)
## identify user-sesssions which we identify as account-creation
## 1) visit the 'Special:CreateAccount' page
## 2) not being logged in when making that request
## 3) being logged in when visiting the next page
## for this we make an aggregation into user sessions.
## define an aggregation function on the level of users which we accept as new users
df_users = (
df.groupBy(F.col('user_id'))
.agg(
## whether there was one requst to Special:CreateAccount AND next request was logged in.
F.max(F.when( (F.col('logged_in') == 0) & (F.col('logged_in_next') == 1) & (F.col('page_title')=='Special:CreateAccount'),1).otherwise(0))
.alias('new_user')
)
## filter
.where(F.col('new_user')==1)
)
## use join to filter those users whose reading sessions can be considered as registration events
df_sessions = df.join(df_users, df['user_id'] == df_users['user_id'] , "left_semi" ).select('user_id',
'ts',
'page_title',
'logged_in')
t1 = time.time()
## execute query -- here: convert to pandas.
df_sessions = df_sessions.toPandas()
t2 = time.time()
print(t2-t1)
## get an over on the session
df_sessions.head()
## group the dataframe by users
df_sessions_by_user = df_sessions.groupby('user_id')
users = sorted(list(df_sessions_by_user.groups.keys()))
len(users) ## number of unique users
## session of an individual user (change index)
df_sessions_by_user.get_group(users[0])