import org.apache.spark.sql.{ Encoders, SaveMode } val readPath = s"/wmf/data/event/PrefUpdate/year=*/month=*/day=*/hour=*/*.parquet" val propertyWhitelistFilter = s"event.property in ('skin', 'mfMode', 'mf_amc_optin', 'VectorSkinVersion', 'popups', 'popupsreferencepreviews', 'discussiontools-betaenable', 'betafeatures-auto-enroll' , 'echo-notifications-blacklist', 'email-blacklist', 'growthexperiments-help-panel-tog-help-panel', 'growthexperiments-homepage-enable', 'growthexperiments-homepage-pt-link')" case class UserAgent( browser_family: String, browser_major: String, browser_minor: String, device_family: String, is_bot: Option[Boolean], is_mediawiki: Option[Boolean], os_family: String, os_major: String, os_minor: String, wmf_app_version: String ) case class Event( isDefault: Boolean, property: String, saveTimestamp: String, userId: Option[Long], value: String, version: String, bucketedUserEditCount: Option[String] = None ) case class PrefUpdate( dt: String, event: Event, recvfrom: String, revision: Long, schema: String, seqid: Long, useragent: UserAgent, uuid: String, webhost: String, wiki: String, ip: Option[String] = None, geocoded_data: Option[Map[String, String]] = None, topic: Option[String] = None, year: Long, month: Long, day: Long, hour: Long ) { def sanitize(): PrefUpdate = { PrefUpdate( dt, Event( event.isDefault, event.property, null, // saveTimestamp null, // userId null, // value event.version, null // bucketedUserEditCount ), recvfrom, revision, schema, seqid, UserAgent(null, null, null, null, null, null, null, null, null, null), uuid, webhost, wiki, null, // ip Some(Map[String, String]()), // geocoded_data topic, year, month, day, hour ) } } val schema = Encoders.product[PrefUpdate].schema spark.read.schema(schema).parquet(readPath).where(propertyWhitelistFilter).as[PrefUpdate]. map(_.sanitize). write. partitionBy("year", "month", "day", "hour"). mode(SaveMode.Overwrite). saveAsTable("analytics.prefupdate_filtered") // *** Scratch Pad *** // //val readPath = s"/wmf/data/event_sanitized/PrefUpdate/year=*/month=*/day=*/hour=*/*.parquet" // //saveAsTable("milimetric.prefupdate_sanitized_filtered") // val readPath = s"/wmf/data/event_sanitized/PrefUpdate/year=2017/month=11/day=30/hour=10/*.parquet" val readPathRecent = s"/wmf/data/event/PrefUpdate/year=2020/month=8/day=1/hour=10/*.parquet" // repartition(tempPartitions, col("wiki"), col("time_bucket")). // sortWithinPartitions(col("wiki"), col("time_bucket"), col("timestamp")). df.select("*", "event.*"). drop('event). withColumn("saveTimestamp", lit(null).cast(String)). withColumn("userId", lit(null).cast(LongType)). withColumn("value", lit(null).cast(String)). // TODO: //null useragent (struct) //null ip (String) //empty geocoded_data (map) withColumn("event", struct("isDefault", "property", "saveTimestamp", "userId", "value", "version")). drop("isDefault", "property", "saveTimestamp", "userId", "value", "version"). printSchema()