python - 如何在 pyspark 中创建具有 15 分钟存储桶的时间序列?
问题描述
我正在尝试创建一个报告,以 15 分钟为增量显示一组员工工作的总分钟数。
源表有进出时间和总工作分钟数,每个员工有一条记录。
我创建了一个 RDD 逐行映射函数来循环遍历一天中的小时数,然后每 15 分钟增量一个内部循环。
每个循环都应向 RDD 行字典添加一列。
我已经确认生成的架构包含这些新列,但我在最终输出中缺少大量数据。
我不确定这是否是行迭代或堆叠的问题。
有任何想法吗?
更新的代码 -
def create_time_block_columns(row_dict):
inhour = row_dict['inhour']
outhour = row_dict['outhour']
inminute = row_dict['inminute']
outminute = row_dict['outminute']
# loop through hours of day
for i in range(24):
# loop through quarter hour blocks
for j in range(1,5):
lowerBound = (j-1)*15
upperBound = j*15
# create column names like 't_0_0', 't_0_15', t_0_30', 't_0_45', 't_1_0', etc...
timeBlockColumnName = F't_{i}_{lowerBound}'
# Add a new key in the dictionary with the new column name and value.
# initialized to 0
row_dict[timeBlockColumnName] = 0
# if the employee was currently clocked in
if (inhour <= i) & (outhour >= i):
# if the inhour is the current time block hour and the outhour is in a future time block
# this means they worked the rest of the hour
# start_during_end_after
if (i == inhour) & (outhour > i):
if (inminute >= lowerBound):
row_dict[timeBlockColumnName] = (upperBound - inminute)
else:
row_dict[timeBlockColumnName] = 15
# if the current row is completely within the current time block [hour and minutes]
# this means they worked all 15 minutes of each hour quarter
elif (i < inhour) & (i > outhour):
row_dict[timeBlockColumnName] = 15
# if the inhour is before the current timeblock hour, and outhour is the current hour
# this means they worked all minutes in the current block up-to the outminute
elif (i < inhour) & (i == outhour):
if (outminute < lowerBound):
row_dict[timeBlockColumnName] = outminute - lowerBound
else:
row_dict[timeBlockColumnName] = 15
# if the inhour and outhour are the current timeblock hour, and they are the same hour,
# we'll calculated the difference between minutes
elif (i == inhour) & (i == outhour):
if (inminute >= lowerBound) & (outminute <= upperBound):
row_dict[timeBlockColumnName] = outminute - inminute
elif (inminute < lowerBound) & (outminute >= upperBound):
row_dict[timeBlockColumnName] = 15
elif (inminute >= lowerBound) & (outminute >= upperBound):
row_dict[timeBlockColumnName] = upperBound - inminute
elif (inminute < lowerBound) & (outminute <= upperBound):
row_dict[timeBlockColumnName] = outminute - lowerBound
# else: we don't do anything because the employee wasnt clocked in
return row_dict
mappedDF = Map.apply(frame = dyF, f = create_time_block_columns).toDF()
# output some interesting logs for debugging
mappedDF.printSchema()
# Build expression to stack new columns as rows
stack_expression = F"stack({24*4}"
for i in range(24):
for j in range(1,5):
stack_expression += F", 't_{i}_{(j-1)*15}', t_{i}_{(j-1)*15}"
stack_expression += ') as (time_block, minutes_worked)'
timeBlockDF = mappedDF.select('pos_key', 'p_dob', 'dob', 'employee', 'rate', 'jobcode', 'pay', 'overpay', 'minutes', F.expr(stack_expression))
timeBlockDF = timeBlockDF.filter('minutes_worked > 0') \
.withColumn("dob",F.col("dob").cast(DateType()))
# create time block identifier column
time_pattern = r't_(\d+)_(\d+)'
timeBlockDF = timeBlockDF.withColumn('time_block_hour', F.regexp_extract('time_block', time_pattern, 1).cast(IntegerType())) \
.withColumn('time_block_min', F.regexp_extract('time_block', time_pattern, 2).cast(IntegerType())) \
.drop('time_block') \
.withColumn('time_block_time', F.concat_ws(':', F.format_string("%02d", F.col('time_block_hour')), F.format_string("%02d", F.col('time_block_min')))) \
.withColumn('time_block_temp', F.concat_ws(' ', F.col('dob'), F.col('time_block_time'))) \
.withColumn('time_block_datetime', F.to_timestamp(F.col('time_block_temp'), 'yyyy-MM-dd HH:mm')) \
.withColumn('time_block_pay', ((F.col('pay') + F.col('overpay')) / F.col('minutes')) * F.col('minutes_worked')) \
.drop('time_block_temp', 'pay', 'overpay', 'minutes')
# output some interesting logs for debugging
timeBlockDF.printSchema()
解决方案
问题出在 udf 上。
有几种情况没有被条件处理,但堆栈表达式工作正常。
这是一个工作示例[不考虑跨越午夜的班次]。
def create_time_block_columns(row_dict):
inhour = row_dict['inhour']
outhour = row_dict['outhour']
inminute = row_dict['inminute']
outminute = row_dict['outminute']
# loop through hours of day
for i in range(24):
# loop through quarter hour blocks
for j in range(1,5):
lowerBound = (j-1)*15
upperBound = j*15
# create column names like 't_0_0', 't_0_15', t_0_30', 't_0_45', 't_1_0', etc...
timeBlockColumnName = F't_{i}_{lowerBound}'
# Add a new key in the dictionary with the new column name and value.
# initialized to 0
row_dict[timeBlockColumnName] = 0
# if the employee was currently clocked in
if (inhour <= i) & (outhour >= i):
# if the inhour is the current time block hour and the outhour is in a future time block
# this means they worked the rest of the hour
# start_during_end_after
if (i == inhour) & (outhour > i):
if (inminute >= lowerBound):
row_dict[timeBlockColumnName] = (upperBound - inminute)
else:
row_dict[timeBlockColumnName] = 15
# if the current row is completely within the current time block [hour and minutes]
# this means they worked all 15 minutes of each hour quarter
elif (inhour < i) & (i < outhour):
row_dict[timeBlockColumnName] = 15
# if the inhour is before the current timeblock hour, and outhour is the current hour
# this means they worked all minutes in the current block up-to the outminute
elif (i < inhour) & (i == outhour):
if (outminute < lowerBound):
row_dict[timeBlockColumnName] = outminute - lowerBound
else:
row_dict[timeBlockColumnName] = 15
# if the inhour and outhour are the current timeblock hour, and they are the same hour,
# we'll calculated the difference between minutes
elif (i == inhour) & (i == outhour):
if (inminute >= lowerBound) & (outminute <= upperBound):
row_dict[timeBlockColumnName] = outminute - inminute
elif (inminute < lowerBound) & (outminute >= upperBound):
row_dict[timeBlockColumnName] = 15
elif (inminute >= lowerBound) & (outminute >= upperBound):
row_dict[timeBlockColumnName] = upperBound - inminute
elif (inminute < lowerBound) & (outminute <= upperBound):
row_dict[timeBlockColumnName] = outminute - lowerBound
# else: we don't do anything because the employee wasnt clocked in
return row_dict
mappedDF = Map.apply(frame = dyF, f = create_time_block_columns).toDF()
# output some interesting logs for debugging
mappedDF.printSchema()
# Build expression to stack new columns as rows
stack_expression = F"stack({24*4}"
for i in range(24):
for j in range(1,5):
stack_expression += F", 't_{i}_{(j-1)*15}', t_{i}_{(j-1)*15}"
stack_expression += ') as (time_block, minutes_worked)'
timeBlockDF = mappedDF.select('pos_key', 'p_dob', 'dob', 'employee', 'rate', 'jobcode', 'pay', 'overpay', 'minutes', F.expr(stack_expression))
timeBlockDF = timeBlockDF.filter('minutes_worked > 0') \
.withColumn("dob",F.col("dob").cast(DateType()))
推荐阅读
- python - 为什么我不能从同一目录中的类中导入函数?
- c - 我可以通过 timeval 作为睡眠的参数吗?
- python - 如果我只有带有未校准相机的基本矩阵,是否可以估计现实世界中的物体相对高度
- javascript - 如何在不暴露数据包含的内容的情况下将变量从 Node.JS 后端传递到前端 Javascript
- reactjs - 在 React Native 中动态显示数据库查询的结果
- python - 没有名为“fbprophet”的模块?
- ansible - Ansible:将任务字典作为变量传递
- html - 侧边栏不展开到底部
- aws-glue - 当我再次运行 Glue Crawler 而不更改它指向的 S3 路径中的文件时会发生什么?
- c# - T 当前属性与 IEnumerator 当前 c#