数据处理和特征工程

在本节,我们将进行以下过程:

数据管道

提取原始数据

设置输入和输出路径:

import os
import boto3
import json
import subprocess

emr_instance_info = subprocess.run(["curl","-s","169.254.169.254/latest/dynamic/instance-identity/document"], \
                        capture_output=True).stdout.decode('UTF-8')
emr_region = json.loads(emr_instance_info)["region"]

session = boto3.Session(region_name=emr_region) 
ssm = session.client('ssm')
s3_client = boto3.client("s3")

bucket = ssm.get_parameter(Name="/aik/data-bucket")["Parameter"]["Value"]

bid_source = ssm.get_parameter(Name="/aik/bid_source")["Parameter"]["Value"]
imp_source = ssm.get_parameter(Name="/aik/imp_source")["Parameter"]["Value"]

output_train = ssm.get_parameter(Name="/aik/output_train")["Parameter"]["Value"]
output_test = ssm.get_parameter(Name="/aik/output_test")["Parameter"]["Value"]
output_verify = ssm.get_parameter(Name="/aik/output_verify")["Parameter"]["Value"] 
output_transformed = ssm.get_parameter(Name="/aik/output_verify")["Parameter"]["Value"] 
inference_data = ssm.get_parameter(Name="/aik/inference_data")["Parameter"]["Value"] 
pipelineModelArtifactPath = ssm.get_parameter(Name="/aik/pipelineModelArtifactPath")["Parameter"]["Value"] 

image-20240202105824946

准备Bidding数据

设置Bidding数据的Schema:

bid_columns = [
    "BidID", 
    "Timestamp",  
    "iPinYouID",
    "UserAgent",
    "IP", 
    "RegionID",  
    "CityID", 
    "AdExchange",
    "Domain",  
    "URL",
    "AnonymousURL",
    "AdSlotID",
    "AdSlotWidth",
    "AdSlotHeight",
    "AdSlotVisibility",
    "AdSlotFormat",
    "AdSlotFloorPrice",
    "CreativeID",
    "BiddingPrice",
    "AdvertiserID",  # V
    "UserProfileIDs"
]

bid_schema = ""
for col in bid_columns:
    if bid_schema != "":
        bid_schema += ", "
    if col == 'Region ID':
        bid_schema += f"`{col}` long"
    elif col == 'City ID':
        bid_schema += f"`{col}` long"
    else:
        bid_schema += f"`{col}` string"
print(bid_schema)

# `BidID` string, `Timestamp` string, `iPinYouID` string, `UserAgent` string, `IP` string, `RegionID` string, `CityID` string, `AdExchange` string, `Domain` string, `URL` string, `AnonymousURL` string, `AdSlotID` string, `AdSlotWidth` string, `AdSlotHeight` string, `AdSlotVisibility` string, `AdSlotFormat` string, `AdSlotFloorPrice` string, `CreativeID` string, `BiddingPrice` string, `AdvertiserID` string, `UserProfileIDs` string

读取数据:

print(bid_source)
bid_df = spark.read.option("delimiter", "\t").format("csv").load(
    bid_source,
    inferSchema=False,
    header=False,
    schema=bid_schema)

bid_df.show(2)


s3://aik-sagemaker-emr-sagemakeremrproducttrainingmodel-dzr1jnmdzj7r/raw/ipinyou-data/training1st/bid.20130311.txt.bz2
+--------------------+-----------------+--------------------+--------------------+-------------+--------+------+----------+--------------------+--------------------+------------+----------+-----------+------------+----------------+------------+----------------+--------------------+------------+------------+--------------+
|               BidID|        Timestamp|           iPinYouID|           UserAgent|           IP|RegionID|CityID|AdExchange|              Domain|                 URL|AnonymousURL|  AdSlotID|AdSlotWidth|AdSlotHeight|AdSlotVisibility|AdSlotFormat|AdSlotFloorPrice|          CreativeID|BiddingPrice|AdvertiserID|UserProfileIDs|
+--------------------+-----------------+--------------------+--------------------+-------------+--------+------+----------+--------------------+--------------------+------------+----------+-----------+------------+----------------+------------+----------------+--------------------+------------+------------+--------------+
|e3d962536ef3ac709...|20130311172101557|37a6259cc0c1dae29...|Mozilla/4.0 (comp...|219.232.120.*|       1|     1|         2|DF9blS9bQqsIFYB4uA5R|b6c5272dfc63032f6...|        null|2006366309|        728|          90|               1|           0|               5|5aca4c5f29e59e425...|         300|        null|          null|
|f2ce7b51f499eae08...|20130311172101567|37a6259cc0c1dae29...|Mozilla/4.0 (comp...|  60.220.37.*|      15|    19|         2|trqRTJLILec7gspy5SqW|2244fa274b2900c2d...|        null|3572180553|        728|          90|               2|           0|               5|5aca4c5f29e59e425...|         300|        null|          null|
+--------------------+-----------------+--------------------+--------------------+-------------+--------+------+----------+--------------------+--------------------+------------+----------+-----------+------------+----------------+------------+----------------+--------------------+------------+------------+--------------+
only showing top 2 rows

我们将仅使用可用数据的子集(20130311)。 此外,我们不会使用时间戳,而是希望将其分解为一周中的某一天和一天中的某一小时,这将提供对底层数据的更好洞察。 因此我们现在转换DataFrame:

image-20240202110344139

准备Impression数据

imp_columns = ["BidID", # V
"Timestamp",
"LogType", 
"iPinYouID",
"UserAgent",
"IP",
"RegionID", 
"CityID", 
"AdExchange",
"Domain",
"URL",
"AnonymousURL",
"AdSlotID",
"AdSlotWidth",
"AdSlotHeight",
"AdSlotVisibility",
"AdSlotFormat",
"AdSlotFloorPrice",
"CreativeID",
"BiddingPrice",
"PayingPrice",
"LandingPageURL",
"AdvertiserID",
"UserProfileIDs"]

imp_schema = ""
for col in imp_columns:
    if imp_schema != "":
        imp_schema += ", "
    if col == 'BiddingPrice':
        imp_schema += f"`{col}` long"
    elif col == 'PayingPrice':
        imp_schema += f"`{col}` long"
    else:
        imp_schema += f"`{col}` string"
        
print(imp_schema)
# `BidID` string, `Timestamp` string, `LogType` string, `iPinYouID` string, `UserAgent` string, `IP` string, `RegionID` string, `CityID` string, `AdExchange` string, `Domain` string, `URL` string, `AnonymousURL` string, `AdSlotID` string, `AdSlotWidth` string, `AdSlotHeight` string, `AdSlotVisibility` string, `AdSlotFormat` string, `AdSlotFloorPrice` string, `CreativeID` string, `BiddingPrice` long, `PayingPrice` long, `LandingPageURL` string, `AdvertiserID` string, `UserProfileIDs` string

读取数据:

image-20240202110455125

再次对数据进行转换,只删除不再使用的列:

image-20240202110542890

连接DataFrame

到目前为止,我们一直在处理各个DataFrame中的出价和展示数据。 现在我们要将它们连接到一个DateFrame中。

让我们看一下Bid和Impression数据的Schema:

image-20240202110707732

我们可以看到两个DataFrame都有一个共同的列,即 BidID,因此我们将使用它来连接DataDrame:

image-20240202110855685

特征编码 - Feature Encoding

下一步,Notebook将对剩余的数据列(在机器学习中称为特征)进行编码,以便它们可用于训练机器学习模型。

接下来,我们将对我们的特征进行编码,以便我们可以将它们用于模型训练, 这里我们使用 pyspark.ml来序列化特征:

image-20240202111055156

将Pipeline Model保存在本地:

pipelineModel = pipeline.fit(df)

df.write.parquet(output_transformed, mode="overwrite")
df.write.json(inference_data, mode="overwrite")
#transform the dataframe

df = pipelineModel.transform(df)
df.printSchema()
df.show(3)

root
 |-- BidID: string (nullable = true)
 |-- dow: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- RegionID: string (nullable = true)
 |-- CityID: string (nullable = true)
 |-- Domain: string (nullable = true)
 |-- AdvertiserID: string (nullable = true)
 |-- BiddingPrice: long (nullable = true)
 |-- PayingPrice: long (nullable = true)
 |-- UserAgent: string (nullable = true)
 |-- IndexAdvertiserID: double (nullable = false)
 |-- IndexDomain: double (nullable = false)
 |-- IndexRegionID: double (nullable = false)
 |-- IndexCityID: double (nullable = false)

+--------------------+---+----+--------+------+-------------------+------------+------------+-----------+--------------------+-----------------+-----------+-------------+-----------+
|               BidID|dow|hour|RegionID|CityID|             Domain|AdvertiserID|BiddingPrice|PayingPrice|           UserAgent|IndexAdvertiserID|IndexDomain|IndexRegionID|IndexCityID|
+--------------------+---+----+--------+------+-------------------+------------+------------+-----------+--------------------+-----------------+-----------+-------------+-----------+
|1001fc699496978c8...|  2|  03|       0|     0|DFpETuxoGQdcFNKbuKz|        null|        null|       null|                null|              0.0|        3.0|         10.0|        3.0|
|100323a5105c9872d...|  2|  00|     164|   164| ersbQv1RdoTy1m58uG|        null|         300|        160|Mozilla/5.0 (comp...|              0.0|        0.0|          8.0|       39.0|
|1007b252c0c8e5923...|  2|  08|      80|    85|    trqRTua8XIl7gsz|        null|        null|       null|                null|              0.0|       63.0|          4.0|        7.0|
+--------------------+---+----+--------+------+-------------------+------------+------------+-----------+--------------------+-----------------+-----------+-------------+-----------+
only showing top 3 rows

使用mleap进行序列化,并将Pipeline Model上传到S3:

import mleap.pyspark

from mleap.pyspark.spark_support import SimpleSparkSerializer
pipelineModel.serializeToBundle("jar:file:/tmp/pipelineModel.zip", df)


# Upload pipeline model to the S3 bucket
from urllib.parse import urlparse

parsed_pipeline_model_path = urlparse(pipelineModelArtifactPath, allow_fragments=False)
s3_client.upload_file("/tmp/pipelineModel.zip", parsed_pipeline_model_path.netloc, parsed_pipeline_model_path.path.lstrip('/'))

从UserAgent字段提取特征

另一组有价值的特征可以从数据中的 useragent 字段中得出。事实上,用户使用的设备类型对于确定出价请求是否可能成功非常重要。我们可以从用户代理信息中得出设备类型(https://github.com/woothee/woothee )。

import woothee
import numpy as np
from random import random

def parse_ua_to_device_type(user_agent_str):
    ua = woothee.parse(user_agent_str)
    category = ua['category']
    if category =='smartphone':
        return 0
    if category =="mobilephone":
        return 1
    if category =="appliance":
        return 2
    if category =="pc":
        return 3
    if category =="crawler":
        return 4
    if category =="misc":
        return 5
    return int(random()*10)%6 # missing value imputed with random device_type_id
    
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
from pyspark.sql.functions import when
from pyspark.sql.functions import col

cols_parse_ua_to_device_type = udf(parse_ua_to_device_type, IntegerType())

df_new = df.select(["dow", "hour", "UserAgent", "PayingPrice", "IndexAdvertiserID", "IndexDomain", "IndexRegionID", "IndexCityID"])\
        .withColumn('UserAgent', when(col('UserAgent').isNull(), "").otherwise(col('UserAgent')))\
        .withColumn("hour", df["hour"].cast(IntegerType()))\
        .withColumn("device_type_id", cols_parse_ua_to_device_type("UserAgent"))\
        .withColumn('label', when(col('PayingPrice').isNull(), 0).otherwise(1))\
        .drop("UserAgent").drop("PayingPrice")

image-20240202111617852

查看设备类型的分布:

image-20240202111742322

生成训练、验证和测试数据集

对于机器学习模型训练,我们需要将准备好的数据集分为三个不同的子集,所有三个数据集都作为 parquet 文件存储在 S3 中:

  • 训练数据集用于训练模型
  • 验证数据集用于在训练期间验证模型
  • 测试数据集用于测试训练步骤后的最终模型

首先,我们确保label列是第一列,因为后面进行训练时xgboost要求如此:

For training with columnar input, the algorithm assumes that the target variable (label) is the first column https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html

image-20240202111902350

现在,我们将在将数据分成三个不同的集合:

image-20240202111947822

查看数据的分布:

image-20240202112048579

最后,我们将数据集分为 80% 训练、10% 验证和 10% 测试。

splits = df_new_shuffled.randomSplit([0.8, 0.1, 0.1], 42)

image-20240202112224932

将三部分数据上传到S3

image-20240202112328199

上传完成后可以检查:

image-20240202112554359

在本节结束时,我们已成功准备好用于模型训练的数据