10 - Groupby/Aggregate


标题: “10 - 分组/聚合” 权重: 12


此时,我们已经通过删除异常值、无效值并添加新特征来清理了数据集。在开始训练预测模型之前,还有几个步骤需要完成。由于我们对每小时的预测感兴趣,我们需要统计每小时每个站点的行程次数,并聚合(取平均值)所有指标,如距离、持续时间、小费和总金额。

截断时间戳

我们不需要时间戳中的分钟和秒,所以我们将它们删除。Data Wrangler中没有内置的过滤转换,所以我们创建一个自定义转换。

要创建自定义转换,请按照以下步骤操作:

  • 单击转换元素集合旁边的加号,选择"添加转换”。
  • 在TRANSFORMS菜单中单击橙色的"+ 添加步骤"按钮。
  • 选择自定义转换。
  • 在下拉菜单中选择"Python (PySpark)",并使用下面提供的代码片段。此代码将创建一个新列,其中包含截断的时间戳,并删除原始的pickup列。
from pyspark.sql.functions import col date_trunc
df = df.withColumn('pickup_time' date_trunc("hour"col("tpep_pickup_datetime")))
df = df.drop("tpep_pickup_datetime")
  • 选择预览。
  • 选择添加以保存步骤。

将此转换应用于数据集时,我们可以看到到目前为止的所有当前步骤,并预览包含新列"pickup_time"且没有旧列"tpep_pickup_datetime"的结果数据集。

统计每小时每个站点的行程次数

目前,我们只有关于每次行程的信息,但我们不知道每小时从每个站点进行了多少次行程。最简单的方法是统计每个站点ID每小时的记录数。虽然Data Wrangler提供了GroupBy转换,但内置转换不支持多列分组,所以我们使用自定义转换。

要创建自定义转换,请按照以下步骤操作:

  • 单击转换元素集合旁边的加号,选择"添加转换”。
  • 在TRANSFORMS菜单中单击橙色的"+ 添加步骤"按钮。
  • 选择自定义转换。
  • 在下拉菜单中选择"Python (PySpark)",并使用下面提供的代码片段。此代码将创建一个新列,其中包含每个位置每个时间戳的行程次数。
from pyspark.sql import functions as f
from pyspark.sql import Window
df = df.withColumn('count' f.count('duration').over(Window.partitionBy([f.col("pickup_time") f.col("PULocationID")])))
  • 选择预览。
  • 选择添加以保存步骤。

将此转换应用于数据集时,我们可以看到到目前为止的所有当前步骤,并预览包含新列"count"的结果数据集。

单击"返回数据流"以返回到块图编辑器窗口。