In [1]:
"""
Analysis of the differences in old and new pipeline logs for CentralNotice impressions,
for 20191105 21:00:00-22:59:59.

Commands run to generate data for analysis:
$ mkdir new
$ mkdir old
$ scp civi1001:/srv/archive/banner_logs/2019/centralnotice-impressions.20191105-2[0-3]* new/
$ scp civi1001:/srv/archive/banner_logs/2019/beaconImpressions-sampled10.tsv.20191105-2[0-3]* old/
$ gunzip new/*
$ gunzip old/*
$ cat new/*.log | jq -r 'select( .dt | test("2019-11-05T2[1-2]") ) | [ .dt, .ip, .event.campaign, .event.banner // null, .event.anonymous, .event.project, .event.db, .event.uselang, .event.device, .event.country, .event.region // null, .event.debug, .event.randomcampaign, .event.randombanner, .event.impressionEventSampleRate, .event.recordImpressionSampleRate, .event.statusCode, .event.bucket, .userAgent.browser_family, .userAgent.browser_major, .userAgent.browser_minor, .userAgent.device_family, .userAgent.is_bot, .userAgent.is_mediawiki, .userAgent.os_family, .userAgent.os_major, .userAgent.os_minor ] | @csv ' > new/single_file_new.csv
$ cat old/*.log | awk '$3 ~/^2019-11-05T2[1-2]/' > old/single_file_old.tsv
"""
In [2]:
# Imports

import re
import pandas as pd
from urllib import parse
import numpy as np
In [3]:
# Load the csv file of new logs as a Pandas data frame

col_names = [
    'dt',
    'ip',
    'campaign',
    'banner',
    'anonymous',
    'project',
    'db',
    'uselang',
    'device',
    'country',
    'region',
    'debug',
    'randomcampaign',
    'randombanner',
    'impressionEventSampleRate',
    'recordImpressionSampleRate',
    'statusCode',
    'bucket',
    'browser_family',
    'browser_major',
    'browser_minor',
    'device_family',
    'is_bot',
    'is_mediawiki',
    'os_family',
    'os_major',
    'os_minor'
]

new = pd.read_csv( 'new/single_file_new.csv', names = col_names, header = None )
In [4]:
# Add 'source' and 'id' columns
new[ 'source' ] = 'new'
new.insert( 0, 'id', 'new' + pd.Series( range( 0, len( new ) ) ).astype( str) )

# convert dt to datetime
new[ 'dt' ] = pd.to_datetime( new[ 'dt' ] )
In [5]:
# Load the tsv file of old logs as a Pandas data frame

# Columns of the log files come from this template:
#     %{hostname} %{sequence} %{dt} %{time_firstbyte} %{ip} %{cache_status}/%{http_status} %{response_size}
#     %{http_method} http://%{uri_host}%{uri_path}%{uri_query} - %{content_type} %{referer} %{x_forwarded_for}
#     %{user_agent} %{accept_language} %{x_analytics}

col_names = [
    'hostname',
    'sequence',
    'dt', 
    'time_firstbyte',
    'ip',
    'cache_status-http_status',
    'response_size',
    'http_method',
    'url',
    '(blank)',
    'content_type',
    'referer',
    'x_forwarded_for',
    'user_agent',
    'accept_language',
    'x_analytics'
]

old = pd.read_csv( 'old/single_file_old.tsv', names = col_names, header = None, sep='\t' )
In [6]:
# Get rid of columns we don't care about, and add 'source', 'nocookies' and 'id' columns
old.drop( [ 'hostname', 'sequence', 'time_firstbyte', '(blank)', ], inplace = True, axis = 1 )
old[ 'source' ] = 'old'
old[ 'nocookies' ] = old[ 'x_analytics' ].apply( lambda x: 'nocookies=1' in x )
old.insert( 0, 'id', 'old' + pd.Series( range( 0, len( old) ) ).astype( str) )

# convert dt to datetime
old[ 'dt' ] = pd.to_datetime( old[ 'dt' ] )
In [7]:
# Parse URL parameters to create detailed fields for old log

# As per ext.fundraiserLandingPage.LogPageview.js
url_params_to_columns = [
    'campaign',
    'banner',
    'anonymous',
    'project',
    'db',
    'uselang',
    'device',
    'country',
    'region',
    'debug',
    'randomcampaign',
    'randombanner',
    'impressionEventSampleRate',
    'recordImpressionSampleRate',
    'statusCode',
    'bucket'
]

def fields_from_url( url ):
    url_str = str( url )
    parsed_url = parse.urlsplit( url_str )
    query_dict = parse.parse_qs( parsed_url.query )
    event_data = {}

    for url_param in url_params_to_columns:
        if url_param in query_dict:
            event_data[ url_param ] = query_dict[ url_param ][ 0 ]

    return pd.Series( event_data )

old = old.join( old[ 'url' ].apply( fields_from_url ) )
In [8]:
# Fix various column types

old[ 'randomcampaign' ] = old.randomcampaign.astype( float )
old[ 'randombanner' ] = old.randombanner.astype( float )
old[ 'bucket' ] = old.bucket.astype( int )
old[ 'impressionEventSampleRate' ] = old.impressionEventSampleRate.astype( float )
old[ 'recordImpressionSampleRate' ] = old.recordImpressionSampleRate.astype( float )
old[ 'anonymous' ] = old[ 'anonymous' ].apply( lambda x: x == 'true' )
old[ 'debug' ] = old[ 'debug' ].apply( lambda x: x == 'True' )
In [9]:
# Purge events from identified bots from new log
new = new[ new[ 'is_bot'] == False ]

# Purge events with no cookies from old log
old = old[ old[ 'nocookies'] == False ]
In [10]:
# Total number of events in each log:

(
    "Total events in new log: " + str( len( new ) ),
    "Total events in old log: " + str( len( old ) ),
)
Out[10]:
('Total events in new log: 34680', 'Total events in old log: 9189')
In [11]:
# Check for possible bots/scripts by looking for IPs with a large number of hits in new logs

# This will output how many hits the top IP address has
new.groupby( 'ip', as_index = False ) \
    .count()[ [ 'ip', 'dt' ] ] \
    .rename( columns = { 'dt': 'count' } ) \
    .sort_values( [ 'count' ], ascending = False )[ 'count' ] \
    .max()
Out[11]:
25
In [12]:
# Check for possible bots/scripts by looking for IPs with a large number of hits in old logs

# This will output how many hits the top IP address has
old.groupby( 'ip', as_index = False ) \
    .count()[ [ 'ip', 'dt' ] ] \
    .rename( columns = { 'dt': 'count' } ) \
    .sort_values( [ 'count' ], ascending = False )[ 'count' ] \
    .max()
Out[12]:
10
In [13]:
# Look at campaigns and sample rates for both pipelines present in the new logs

new_by_campaign_and_rates = new.groupby( [ 'campaign', 'impressionEventSampleRate', 'recordImpressionSampleRate' ], as_index = False ) \
    .count()[ [ 'campaign', 'impressionEventSampleRate', 'recordImpressionSampleRate', 'dt' ] ] \
    .rename( columns = { 'dt': 'count' } ) \
    .sort_values( [ 'campaign' ] )

new_by_campaign_and_rates
In [14]:
# Look at campaigns and sample rates for both pipelines present in the old logs

old_by_campaign_and_rates = old.groupby( [ 'campaign', 'impressionEventSampleRate', 'recordImpressionSampleRate' ], as_index = False ) \
    .count()[ [ 'campaign', 'impressionEventSampleRate', 'recordImpressionSampleRate', 'dt' ] ] \
    .rename( columns = { 'dt': 'count' } ) \
    .sort_values( [ 'campaign' ] )

old_by_campaign_and_rates
In [15]:
# Merge on campaign and rates

merged_by_campaign_and_rates = new_by_campaign_and_rates.merge(
    old_by_campaign_and_rates,
    on = [ 'campaign', 'impressionEventSampleRate', 'recordImpressionSampleRate' ],
    suffixes = [ '_new', '_old' ]
)
In [16]:
# Add comparison columns

merged_by_campaign_and_rates[ 'new_old' ] = merged_by_campaign_and_rates.apply(
    lambda x: x[ 'count_new' ] / x[ 'count_old' ],
    axis = 1
)

merged_by_campaign_and_rates[ 'new_old_expected' ] = merged_by_campaign_and_rates.apply(
    lambda x: x[ 'impressionEventSampleRate' ] / x[ 'recordImpressionSampleRate' ],
    axis = 1
)

merged_by_campaign_and_rates[ 'divergence' ] = merged_by_campaign_and_rates.apply(
    lambda x: ( x[ 'new_old' ] - x[ 'new_old_expected' ] ) / x[ 'new_old_expected' ],
    axis = 1
)

merged_by_campaign_and_rates[ 'smallest_sample' ] = merged_by_campaign_and_rates.apply(
    lambda x: min( x[ 'count_new' ], x[ 'count_old' ] ) ,
    axis = 1
)

merged_by_campaign_and_rates[ 'weighted_divergence' ] = merged_by_campaign_and_rates.apply(
    lambda x: x[ 'count_old' ] * x[ 'divergence' ],
    axis = 1
)
In [17]:
# Filter out rows where we have too fiew samples in either log

merged_by_c_and_r_good = merged_by_campaign_and_rates[ merged_by_campaign_and_rates[ 'smallest_sample' ] > 100 ]
In [18]:
# Simple mean divergence without weighting based on the number of old events for each permutaiton

merged_by_c_and_r_good[ 'divergence' ].mean()
Out[18]:
-0.1485599516275263
In [19]:
# Calculate overall mean divergence, weighting divergence for each permuation based on number of old events

merged_by_c_and_r_good[ 'weighted_divergence' ].sum() / merged_by_c_and_r_good[ 'count_old' ].sum()
Out[19]:
-0.1256348340863063
In [20]:
merged_by_c_and_r_good
In [21]:
# Create dataframes for comparing event-by-event to check for orphan new logs

new_for_event_merge = new[
    ( new[ 'campaign' ] == 'WMDE_Authors_Campaign_2019_Austria_Switzerland' ) &
    ( new[ 'impressionEventSampleRate' ] == 1 ) &
    ( new[  'recordImpressionSampleRate' ] == 0.01) 
]

old_for_event_merge = old[
    ( old[ 'campaign' ] == 'WMDE_Authors_Campaign_2019_Austria_Switzerland' ) &
    ( old[ 'impressionEventSampleRate' ] == 1 ) &
    ( old[  'recordImpressionSampleRate' ] == 0.01) 
]
In [22]:
old_for_event_merge
In [23]:
# Merging on all the fields we can

# Note: I can't seem to get the merge to work if I include statusCode. It should still work, though
merged_events = new_for_event_merge.merge(
    old_for_event_merge,
    on = [
        'ip',
        'campaign',
        'anonymous',
        'project',
        'db',
        'uselang',
        'device',
        'country',
        'debug',
        'randomcampaign',
        'randombanner',
        'impressionEventSampleRate',
        'recordImpressionSampleRate',
        'bucket'
    ],
    how = 'outer',
    indicator = True,
    suffixes = [ '_new', '_old' ]
)

merged_events[ 'dt_merged' ] = merged_events.apply(
    lambda x: x[ 'dt_old' ] if pd.isnull( x[ 'dt_new' ] ) else x[ 'dt_new' ],
    axis = 1
)

merged_events.sort_values( 'dt_merged', inplace = True )
In [24]:
len( merged_events[ merged_events[ '_merge' ] == 'right_only' ] )