在本节,我们将进行以下过程:
设置输入和输出路径:
import os
import boto3
import json
import subprocess
emr_instance_info = subprocess.run(["curl","-s","169.254.169.254/latest/dynamic/instance-identity/document"], \
capture_output=True).stdout.decode('UTF-8')
emr_region = json.loads(emr_instance_info)["region"]
session = boto3.Session(region_name=emr_region)
ssm = session.client('ssm')
s3_client = boto3.client("s3")
bucket = ssm.get_parameter(Name="/aik/data-bucket")["Parameter"]["Value"]
bid_source = ssm.get_parameter(Name="/aik/bid_source")["Parameter"]["Value"]
imp_source = ssm.get_parameter(Name="/aik/imp_source")["Parameter"]["Value"]
output_train = ssm.get_parameter(Name="/aik/output_train")["Parameter"]["Value"]
output_test = ssm.get_parameter(Name="/aik/output_test")["Parameter"]["Value"]
output_verify = ssm.get_parameter(Name="/aik/output_verify")["Parameter"]["Value"]
output_transformed = ssm.get_parameter(Name="/aik/output_verify")["Parameter"]["Value"]
inference_data = ssm.get_parameter(Name="/aik/inference_data")["Parameter"]["Value"]
pipelineModelArtifactPath = ssm.get_parameter(Name="/aik/pipelineModelArtifactPath")["Parameter"]["Value"]
设置Bidding数据的Schema:
bid_columns = [
"BidID",
"Timestamp",
"iPinYouID",
"UserAgent",
"IP",
"RegionID",
"CityID",
"AdExchange",
"Domain",
"URL",
"AnonymousURL",
"AdSlotID",
"AdSlotWidth",
"AdSlotHeight",
"AdSlotVisibility",
"AdSlotFormat",
"AdSlotFloorPrice",
"CreativeID",
"BiddingPrice",
"AdvertiserID", # V
"UserProfileIDs"
]
bid_schema = ""
for col in bid_columns:
if bid_schema != "":
bid_schema += ", "
if col == 'Region ID':
bid_schema += f"`{col}` long"
elif col == 'City ID':
bid_schema += f"`{col}` long"
else:
bid_schema += f"`{col}` string"
print(bid_schema)
# `BidID` string, `Timestamp` string, `iPinYouID` string, `UserAgent` string, `IP` string, `RegionID` string, `CityID` string, `AdExchange` string, `Domain` string, `URL` string, `AnonymousURL` string, `AdSlotID` string, `AdSlotWidth` string, `AdSlotHeight` string, `AdSlotVisibility` string, `AdSlotFormat` string, `AdSlotFloorPrice` string, `CreativeID` string, `BiddingPrice` string, `AdvertiserID` string, `UserProfileIDs` string
读取数据:
print(bid_source)
bid_df = spark.read.option("delimiter", "\t").format("csv").load(
bid_source,
inferSchema=False,
header=False,
schema=bid_schema)
bid_df.show(2)
s3://aik-sagemaker-emr-sagemakeremrproducttrainingmodel-dzr1jnmdzj7r/raw/ipinyou-data/training1st/bid.20130311.txt.bz2
+--------------------+-----------------+--------------------+--------------------+-------------+--------+------+----------+--------------------+--------------------+------------+----------+-----------+------------+----------------+------------+----------------+--------------------+------------+------------+--------------+
| BidID| Timestamp| iPinYouID| UserAgent| IP|RegionID|CityID|AdExchange| Domain| URL|AnonymousURL| AdSlotID|AdSlotWidth|AdSlotHeight|AdSlotVisibility|AdSlotFormat|AdSlotFloorPrice| CreativeID|BiddingPrice|AdvertiserID|UserProfileIDs|
+--------------------+-----------------+--------------------+--------------------+-------------+--------+------+----------+--------------------+--------------------+------------+----------+-----------+------------+----------------+------------+----------------+--------------------+------------+------------+--------------+
|e3d962536ef3ac709...|20130311172101557|37a6259cc0c1dae29...|Mozilla/4.0 (comp...|219.232.120.*| 1| 1| 2|DF9blS9bQqsIFYB4uA5R|b6c5272dfc63032f6...| null|2006366309| 728| 90| 1| 0| 5|5aca4c5f29e59e425...| 300| null| null|
|f2ce7b51f499eae08...|20130311172101567|37a6259cc0c1dae29...|Mozilla/4.0 (comp...| 60.220.37.*| 15| 19| 2|trqRTJLILec7gspy5SqW|2244fa274b2900c2d...| null|3572180553| 728| 90| 2| 0| 5|5aca4c5f29e59e425...| 300| null| null|
+--------------------+-----------------+--------------------+--------------------+-------------+--------+------+----------+--------------------+--------------------+------------+----------+-----------+------------+----------------+------------+----------------+--------------------+------------+------------+--------------+
only showing top 2 rows
我们将仅使用可用数据的子集(20130311)。 此外,我们不会使用时间戳,而是希望将其分解为一周中的某一天和一天中的某一小时,这将提供对底层数据的更好洞察。 因此我们现在转换DataFrame:
imp_columns = ["BidID", # V
"Timestamp",
"LogType",
"iPinYouID",
"UserAgent",
"IP",
"RegionID",
"CityID",
"AdExchange",
"Domain",
"URL",
"AnonymousURL",
"AdSlotID",
"AdSlotWidth",
"AdSlotHeight",
"AdSlotVisibility",
"AdSlotFormat",
"AdSlotFloorPrice",
"CreativeID",
"BiddingPrice",
"PayingPrice",
"LandingPageURL",
"AdvertiserID",
"UserProfileIDs"]
imp_schema = ""
for col in imp_columns:
if imp_schema != "":
imp_schema += ", "
if col == 'BiddingPrice':
imp_schema += f"`{col}` long"
elif col == 'PayingPrice':
imp_schema += f"`{col}` long"
else:
imp_schema += f"`{col}` string"
print(imp_schema)
# `BidID` string, `Timestamp` string, `LogType` string, `iPinYouID` string, `UserAgent` string, `IP` string, `RegionID` string, `CityID` string, `AdExchange` string, `Domain` string, `URL` string, `AnonymousURL` string, `AdSlotID` string, `AdSlotWidth` string, `AdSlotHeight` string, `AdSlotVisibility` string, `AdSlotFormat` string, `AdSlotFloorPrice` string, `CreativeID` string, `BiddingPrice` long, `PayingPrice` long, `LandingPageURL` string, `AdvertiserID` string, `UserProfileIDs` string
读取数据:
再次对数据进行转换,只删除不再使用的列:
到目前为止,我们一直在处理各个DataFrame中的出价和展示数据。 现在我们要将它们连接到一个DateFrame中。
让我们看一下Bid和Impression数据的Schema:
我们可以看到两个DataFrame都有一个共同的列,即 BidID,因此我们将使用它来连接DataDrame:
下一步,Notebook将对剩余的数据列(在机器学习中称为特征)进行编码,以便它们可用于训练机器学习模型。
接下来,我们将对我们的特征进行编码,以便我们可以将它们用于模型训练, 这里我们使用 pyspark.ml
来序列化特征:
将Pipeline Model保存在本地:
pipelineModel = pipeline.fit(df)
df.write.parquet(output_transformed, mode="overwrite")
df.write.json(inference_data, mode="overwrite")
#transform the dataframe
df = pipelineModel.transform(df)
df.printSchema()
df.show(3)
root
|-- BidID: string (nullable = true)
|-- dow: integer (nullable = true)
|-- hour: string (nullable = true)
|-- RegionID: string (nullable = true)
|-- CityID: string (nullable = true)
|-- Domain: string (nullable = true)
|-- AdvertiserID: string (nullable = true)
|-- BiddingPrice: long (nullable = true)
|-- PayingPrice: long (nullable = true)
|-- UserAgent: string (nullable = true)
|-- IndexAdvertiserID: double (nullable = false)
|-- IndexDomain: double (nullable = false)
|-- IndexRegionID: double (nullable = false)
|-- IndexCityID: double (nullable = false)
+--------------------+---+----+--------+------+-------------------+------------+------------+-----------+--------------------+-----------------+-----------+-------------+-----------+
| BidID|dow|hour|RegionID|CityID| Domain|AdvertiserID|BiddingPrice|PayingPrice| UserAgent|IndexAdvertiserID|IndexDomain|IndexRegionID|IndexCityID|
+--------------------+---+----+--------+------+-------------------+------------+------------+-----------+--------------------+-----------------+-----------+-------------+-----------+
|1001fc699496978c8...| 2| 03| 0| 0|DFpETuxoGQdcFNKbuKz| null| null| null| null| 0.0| 3.0| 10.0| 3.0|
|100323a5105c9872d...| 2| 00| 164| 164| ersbQv1RdoTy1m58uG| null| 300| 160|Mozilla/5.0 (comp...| 0.0| 0.0| 8.0| 39.0|
|1007b252c0c8e5923...| 2| 08| 80| 85| trqRTua8XIl7gsz| null| null| null| null| 0.0| 63.0| 4.0| 7.0|
+--------------------+---+----+--------+------+-------------------+------------+------------+-----------+--------------------+-----------------+-----------+-------------+-----------+
only showing top 3 rows
使用mleap进行序列化,并将Pipeline Model上传到S3:
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer
pipelineModel.serializeToBundle("jar:file:/tmp/pipelineModel.zip", df)
# Upload pipeline model to the S3 bucket
from urllib.parse import urlparse
parsed_pipeline_model_path = urlparse(pipelineModelArtifactPath, allow_fragments=False)
s3_client.upload_file("/tmp/pipelineModel.zip", parsed_pipeline_model_path.netloc, parsed_pipeline_model_path.path.lstrip('/'))
另一组有价值的特征可以从数据中的 useragent 字段中得出。事实上,用户使用的设备类型对于确定出价请求是否可能成功非常重要。我们可以从用户代理信息中得出设备类型(https://github.com/woothee/woothee )。
import woothee
import numpy as np
from random import random
def parse_ua_to_device_type(user_agent_str):
ua = woothee.parse(user_agent_str)
category = ua['category']
if category =='smartphone':
return 0
if category =="mobilephone":
return 1
if category =="appliance":
return 2
if category =="pc":
return 3
if category =="crawler":
return 4
if category =="misc":
return 5
return int(random()*10)%6 # missing value imputed with random device_type_id
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
from pyspark.sql.functions import when
from pyspark.sql.functions import col
cols_parse_ua_to_device_type = udf(parse_ua_to_device_type, IntegerType())
df_new = df.select(["dow", "hour", "UserAgent", "PayingPrice", "IndexAdvertiserID", "IndexDomain", "IndexRegionID", "IndexCityID"])\
.withColumn('UserAgent', when(col('UserAgent').isNull(), "").otherwise(col('UserAgent')))\
.withColumn("hour", df["hour"].cast(IntegerType()))\
.withColumn("device_type_id", cols_parse_ua_to_device_type("UserAgent"))\
.withColumn('label', when(col('PayingPrice').isNull(), 0).otherwise(1))\
.drop("UserAgent").drop("PayingPrice")
查看设备类型的分布:
对于机器学习模型训练,我们需要将准备好的数据集分为三个不同的子集,所有三个数据集都作为 parquet 文件存储在 S3 中:
首先,我们确保label列是第一列,因为后面进行训练时xgboost要求如此:
For training with columnar input, the algorithm assumes that the target variable (label) is the first column https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html
现在,我们将在将数据分成三个不同的集合:
查看数据的分布:
最后,我们将数据集分为 80% 训练、10% 验证和 10% 测试。
splits = df_new_shuffled.randomSplit([0.8, 0.1, 0.1], 42)
上传完成后可以检查:
在本节结束时,我们已成功准备好用于模型训练的数据