"""
Analysis of the differences in old and new pipeline logs for CentralNotice impressions,
for 20191105 21:00:00-22:59:59.
Commands run to generate data for analysis:
$ mkdir new
$ mkdir old
$ scp civi1001:/srv/archive/banner_logs/2019/centralnotice-impressions.20191105-2[0-3]* new/
$ scp civi1001:/srv/archive/banner_logs/2019/beaconImpressions-sampled10.tsv.20191105-2[0-3]* old/
$ gunzip new/*
$ gunzip old/*
$ cat new/*.log | jq -r 'select( .dt | test("2019-11-05T2[1-2]") ) | [ .dt, .ip, .event.campaign, .event.banner // null, .event.anonymous, .event.project, .event.db, .event.uselang, .event.device, .event.country, .event.region // null, .event.debug, .event.randomcampaign, .event.randombanner, .event.impressionEventSampleRate, .event.recordImpressionSampleRate, .event.statusCode, .event.bucket, .userAgent.browser_family, .userAgent.browser_major, .userAgent.browser_minor, .userAgent.device_family, .userAgent.is_bot, .userAgent.is_mediawiki, .userAgent.os_family, .userAgent.os_major, .userAgent.os_minor ] | @csv ' > new/single_file_new.csv
$ cat old/*.log | awk '$3 ~/^2019-11-05T2[1-2]/' > old/single_file_old.tsv
"""
# Imports
import re
import pandas as pd
from urllib import parse
import numpy as np
# Load the csv file of new logs as a Pandas data frame
col_names = [
'dt',
'ip',
'campaign',
'banner',
'anonymous',
'project',
'db',
'uselang',
'device',
'country',
'region',
'debug',
'randomcampaign',
'randombanner',
'impressionEventSampleRate',
'recordImpressionSampleRate',
'statusCode',
'bucket',
'browser_family',
'browser_major',
'browser_minor',
'device_family',
'is_bot',
'is_mediawiki',
'os_family',
'os_major',
'os_minor'
]
new = pd.read_csv( 'new/single_file_new.csv', names = col_names, header = None )
# Add 'source' and 'id' columns
new[ 'source' ] = 'new'
new.insert( 0, 'id', 'new' + pd.Series( range( 0, len( new ) ) ).astype( str) )
# convert dt to datetime
new[ 'dt' ] = pd.to_datetime( new[ 'dt' ] )
# Load the tsv file of old logs as a Pandas data frame
# Columns of the log files come from this template:
# %{hostname} %{sequence} %{dt} %{time_firstbyte} %{ip} %{cache_status}/%{http_status} %{response_size}
# %{http_method} http://%{uri_host}%{uri_path}%{uri_query} - %{content_type} %{referer} %{x_forwarded_for}
# %{user_agent} %{accept_language} %{x_analytics}
col_names = [
'hostname',
'sequence',
'dt',
'time_firstbyte',
'ip',
'cache_status-http_status',
'response_size',
'http_method',
'url',
'(blank)',
'content_type',
'referer',
'x_forwarded_for',
'user_agent',
'accept_language',
'x_analytics'
]
old = pd.read_csv( 'old/single_file_old.tsv', names = col_names, header = None, sep='\t' )
# Get rid of columns we don't care about, and add 'source', 'nocookies' and 'id' columns
old.drop( [ 'hostname', 'sequence', 'time_firstbyte', '(blank)', ], inplace = True, axis = 1 )
old[ 'source' ] = 'old'
old[ 'nocookies' ] = old[ 'x_analytics' ].apply( lambda x: 'nocookies=1' in x )
old.insert( 0, 'id', 'old' + pd.Series( range( 0, len( old) ) ).astype( str) )
# convert dt to datetime
old[ 'dt' ] = pd.to_datetime( old[ 'dt' ] )
# Parse URL parameters to create detailed fields for old log
# As per ext.fundraiserLandingPage.LogPageview.js
url_params_to_columns = [
'campaign',
'banner',
'anonymous',
'project',
'db',
'uselang',
'device',
'country',
'region',
'debug',
'randomcampaign',
'randombanner',
'impressionEventSampleRate',
'recordImpressionSampleRate',
'statusCode',
'bucket'
]
def fields_from_url( url ):
url_str = str( url )
parsed_url = parse.urlsplit( url_str )
query_dict = parse.parse_qs( parsed_url.query )
event_data = {}
for url_param in url_params_to_columns:
if url_param in query_dict:
event_data[ url_param ] = query_dict[ url_param ][ 0 ]
return pd.Series( event_data )
old = old.join( old[ 'url' ].apply( fields_from_url ) )
# Fix various column types
old[ 'randomcampaign' ] = old.randomcampaign.astype( float )
old[ 'randombanner' ] = old.randombanner.astype( float )
old[ 'bucket' ] = old.bucket.astype( int )
old[ 'impressionEventSampleRate' ] = old.impressionEventSampleRate.astype( float )
old[ 'recordImpressionSampleRate' ] = old.recordImpressionSampleRate.astype( float )
old[ 'anonymous' ] = old[ 'anonymous' ].apply( lambda x: x == 'true' )
old[ 'debug' ] = old[ 'debug' ].apply( lambda x: x == 'True' )
# Purge events from identified bots from new log
new = new[ new[ 'is_bot'] == False ]
# Purge events with no cookies from old log
old = old[ old[ 'nocookies'] == False ]
# Total number of events in each log:
(
"Total events in new log: " + str( len( new ) ),
"Total events in old log: " + str( len( old ) ),
)
# Check for possible bots/scripts by looking for IPs with a large number of hits in new logs
# This will output how many hits the top IP address has
new.groupby( 'ip', as_index = False ) \
.count()[ [ 'ip', 'dt' ] ] \
.rename( columns = { 'dt': 'count' } ) \
.sort_values( [ 'count' ], ascending = False )[ 'count' ] \
.max()
# Check for possible bots/scripts by looking for IPs with a large number of hits in old logs
# This will output how many hits the top IP address has
old.groupby( 'ip', as_index = False ) \
.count()[ [ 'ip', 'dt' ] ] \
.rename( columns = { 'dt': 'count' } ) \
.sort_values( [ 'count' ], ascending = False )[ 'count' ] \
.max()
# Look at campaigns and sample rates for both pipelines present in the new logs
new_by_campaign_and_rates = new.groupby( [ 'campaign', 'impressionEventSampleRate', 'recordImpressionSampleRate' ], as_index = False ) \
.count()[ [ 'campaign', 'impressionEventSampleRate', 'recordImpressionSampleRate', 'dt' ] ] \
.rename( columns = { 'dt': 'count' } ) \
.sort_values( [ 'campaign' ] )
new_by_campaign_and_rates
# Look at campaigns and sample rates for both pipelines present in the old logs
old_by_campaign_and_rates = old.groupby( [ 'campaign', 'impressionEventSampleRate', 'recordImpressionSampleRate' ], as_index = False ) \
.count()[ [ 'campaign', 'impressionEventSampleRate', 'recordImpressionSampleRate', 'dt' ] ] \
.rename( columns = { 'dt': 'count' } ) \
.sort_values( [ 'campaign' ] )
old_by_campaign_and_rates
# Merge on campaign and rates
merged_by_campaign_and_rates = new_by_campaign_and_rates.merge(
old_by_campaign_and_rates,
on = [ 'campaign', 'impressionEventSampleRate', 'recordImpressionSampleRate' ],
suffixes = [ '_new', '_old' ]
)
# Add comparison columns
merged_by_campaign_and_rates[ 'new_old' ] = merged_by_campaign_and_rates.apply(
lambda x: x[ 'count_new' ] / x[ 'count_old' ],
axis = 1
)
merged_by_campaign_and_rates[ 'new_old_expected' ] = merged_by_campaign_and_rates.apply(
lambda x: x[ 'impressionEventSampleRate' ] / x[ 'recordImpressionSampleRate' ],
axis = 1
)
merged_by_campaign_and_rates[ 'divergence' ] = merged_by_campaign_and_rates.apply(
lambda x: ( x[ 'new_old' ] - x[ 'new_old_expected' ] ) / x[ 'new_old_expected' ],
axis = 1
)
merged_by_campaign_and_rates[ 'smallest_sample' ] = merged_by_campaign_and_rates.apply(
lambda x: min( x[ 'count_new' ], x[ 'count_old' ] ) ,
axis = 1
)
merged_by_campaign_and_rates[ 'weighted_divergence' ] = merged_by_campaign_and_rates.apply(
lambda x: x[ 'count_old' ] * x[ 'divergence' ],
axis = 1
)
# Filter out rows where we have too fiew samples in either log
merged_by_c_and_r_good = merged_by_campaign_and_rates[ merged_by_campaign_and_rates[ 'smallest_sample' ] > 100 ]
# Simple mean divergence without weighting based on the number of old events for each permutaiton
merged_by_c_and_r_good[ 'divergence' ].mean()
# Calculate overall mean divergence, weighting divergence for each permuation based on number of old events
merged_by_c_and_r_good[ 'weighted_divergence' ].sum() / merged_by_c_and_r_good[ 'count_old' ].sum()
merged_by_c_and_r_good
# Create dataframes for comparing event-by-event to check for orphan new logs
new_for_event_merge = new[
( new[ 'campaign' ] == 'WMDE_Authors_Campaign_2019_Austria_Switzerland' ) &
( new[ 'impressionEventSampleRate' ] == 1 ) &
( new[ 'recordImpressionSampleRate' ] == 0.01)
]
old_for_event_merge = old[
( old[ 'campaign' ] == 'WMDE_Authors_Campaign_2019_Austria_Switzerland' ) &
( old[ 'impressionEventSampleRate' ] == 1 ) &
( old[ 'recordImpressionSampleRate' ] == 0.01)
]
old_for_event_merge
# Merging on all the fields we can
# Note: I can't seem to get the merge to work if I include statusCode. It should still work, though
merged_events = new_for_event_merge.merge(
old_for_event_merge,
on = [
'ip',
'campaign',
'anonymous',
'project',
'db',
'uselang',
'device',
'country',
'debug',
'randomcampaign',
'randombanner',
'impressionEventSampleRate',
'recordImpressionSampleRate',
'bucket'
],
how = 'outer',
indicator = True,
suffixes = [ '_new', '_old' ]
)
merged_events[ 'dt_merged' ] = merged_events.apply(
lambda x: x[ 'dt_old' ] if pd.isnull( x[ 'dt_new' ] ) else x[ 'dt_new' ],
axis = 1
)
merged_events.sort_values( 'dt_merged', inplace = True )
len( merged_events[ merged_events[ '_merge' ] == 'right_only' ] )