首页 > 解决方案 > 如何在 pyspark 中创建具有 15 分钟存储桶的时间序列?

问题描述

我正在尝试创建一个报告,以 15 分钟为增量显示一组员工工作的总分钟数。

源表有进出时间和总工作分钟数,每个员工有一条记录。

我创建了一个 RDD 逐行映射函数来循环遍历一天中的小时数,然后每 15 分钟增量一个内部循环。

每个循环都应向 RDD 行字典添加一列。
我已经确认生成的架构包含这些新列,但我在最终输出中缺少大量数据。
我不确定这是否是行迭代或堆叠的问题。

这是起始模式 -
在此处输入图像描述

有任何想法吗?

最终模式 -
最终模式

更新的代码 -

def create_time_block_columns(row_dict):
    inhour = row_dict['inhour']
    outhour = row_dict['outhour']
    inminute = row_dict['inminute']
    outminute = row_dict['outminute']
    # loop through hours of day
    for i in range(24):
        # loop through quarter hour blocks
        for j in range(1,5):
            lowerBound = (j-1)*15
            upperBound = j*15
            # create column names like 't_0_0', 't_0_15', t_0_30', 't_0_45', 't_1_0', etc...
            timeBlockColumnName = F't_{i}_{lowerBound}'
            # Add a new key in the dictionary with the new column name and value. 
            # initialized to 0
            row_dict[timeBlockColumnName] = 0

            # if the employee was currently clocked in
            if (inhour <= i) & (outhour >= i):
                # if the inhour is the current time block hour and the outhour is in a future time block
                # this means they worked the rest of the hour
                # start_during_end_after
                if (i == inhour) & (outhour > i):
                    if (inminute >= lowerBound):
                        row_dict[timeBlockColumnName] = (upperBound - inminute)
                    else:
                        row_dict[timeBlockColumnName] = 15

                # if the current row is completely within the current time block [hour and minutes]
                # this means they worked all 15 minutes of each hour quarter
                elif (i < inhour) & (i > outhour):
                    row_dict[timeBlockColumnName] = 15

                # if the inhour is before the current timeblock hour, and outhour is the current hour
                # this means they worked all minutes in the current block up-to the outminute
                elif (i < inhour) & (i == outhour):
                    if (outminute < lowerBound):
                        row_dict[timeBlockColumnName] = outminute - lowerBound
                    else:
                        row_dict[timeBlockColumnName] = 15
                
                # if the inhour and outhour are the current timeblock hour, and they are the same hour, 
                # we'll calculated the difference between minutes
                elif (i == inhour) & (i == outhour):
                    if (inminute >= lowerBound) & (outminute <= upperBound):
                        row_dict[timeBlockColumnName] = outminute - inminute
                    elif (inminute < lowerBound) & (outminute >= upperBound):
                        row_dict[timeBlockColumnName] = 15
                    elif (inminute >= lowerBound) & (outminute >=  upperBound):
                        row_dict[timeBlockColumnName] = upperBound - inminute
                    elif (inminute < lowerBound) & (outminute <= upperBound):
                        row_dict[timeBlockColumnName] = outminute - lowerBound 
            # else: we don't do anything because the employee wasnt clocked in
    return row_dict

mappedDF = Map.apply(frame = dyF, f = create_time_block_columns).toDF()

# output some interesting logs for debugging
mappedDF.printSchema()


# Build expression to stack new columns as rows
stack_expression = F"stack({24*4}"
for i in range(24):
    for j in range(1,5):
        stack_expression += F", 't_{i}_{(j-1)*15}', t_{i}_{(j-1)*15}"

stack_expression += ') as (time_block, minutes_worked)'

timeBlockDF = mappedDF.select('pos_key', 'p_dob', 'dob', 'employee', 'rate', 'jobcode', 'pay', 'overpay', 'minutes', F.expr(stack_expression))
timeBlockDF = timeBlockDF.filter('minutes_worked > 0') \
    .withColumn("dob",F.col("dob").cast(DateType())) 

# create time block identifier column
time_pattern = r't_(\d+)_(\d+)'
timeBlockDF = timeBlockDF.withColumn('time_block_hour', F.regexp_extract('time_block', time_pattern, 1).cast(IntegerType())) \
    .withColumn('time_block_min', F.regexp_extract('time_block', time_pattern, 2).cast(IntegerType())) \
    .drop('time_block') \
    .withColumn('time_block_time', F.concat_ws(':', F.format_string("%02d", F.col('time_block_hour')), F.format_string("%02d", F.col('time_block_min')))) \
    .withColumn('time_block_temp', F.concat_ws(' ', F.col('dob'), F.col('time_block_time'))) \
    .withColumn('time_block_datetime', F.to_timestamp(F.col('time_block_temp'), 'yyyy-MM-dd HH:mm')) \
    .withColumn('time_block_pay', ((F.col('pay') + F.col('overpay')) / F.col('minutes')) * F.col('minutes_worked')) \
    .drop('time_block_temp', 'pay', 'overpay', 'minutes')

# output some interesting logs for debugging
timeBlockDF.printSchema()

标签: pythonapache-sparkpysparkapache-spark-sqlaws-glue

解决方案


问题出在 udf 上。
有几种情况没有被条件处理,但堆栈表达式工作正常。

这是一个工作示例[不考虑跨越午夜的班次]。

def create_time_block_columns(row_dict):
    inhour = row_dict['inhour']
    outhour = row_dict['outhour']
    inminute = row_dict['inminute']
    outminute = row_dict['outminute']
    # loop through hours of day
    for i in range(24):
        # loop through quarter hour blocks
        for j in range(1,5):
            lowerBound = (j-1)*15
            upperBound = j*15
            # create column names like 't_0_0', 't_0_15', t_0_30', 't_0_45', 't_1_0', etc...
            timeBlockColumnName = F't_{i}_{lowerBound}'
            # Add a new key in the dictionary with the new column name and value. 
            # initialized to 0
            row_dict[timeBlockColumnName] = 0

            # if the employee was currently clocked in
            if (inhour <= i) & (outhour >= i):
                # if the inhour is the current time block hour and the outhour is in a future time block
                # this means they worked the rest of the hour
                # start_during_end_after
                if (i == inhour) & (outhour > i):
                    if (inminute >= lowerBound):
                        row_dict[timeBlockColumnName] = (upperBound - inminute)
                    else:
                        row_dict[timeBlockColumnName] = 15

                # if the current row is completely within the current time block [hour and minutes]
                # this means they worked all 15 minutes of each hour quarter
                elif (inhour < i) & (i < outhour):
                    row_dict[timeBlockColumnName] = 15

                # if the inhour is before the current timeblock hour, and outhour is the current hour
                # this means they worked all minutes in the current block up-to the outminute
                elif (i < inhour) & (i == outhour):
                    if (outminute < lowerBound):
                        row_dict[timeBlockColumnName] = outminute - lowerBound
                    else:
                        row_dict[timeBlockColumnName] = 15
                
                # if the inhour and outhour are the current timeblock hour, and they are the same hour, 
                # we'll calculated the difference between minutes
                elif (i == inhour) & (i == outhour):
                    if (inminute >= lowerBound) & (outminute <= upperBound):
                        row_dict[timeBlockColumnName] = outminute - inminute
                    elif (inminute < lowerBound) & (outminute >= upperBound):
                        row_dict[timeBlockColumnName] = 15
                    elif (inminute >= lowerBound) & (outminute >=  upperBound):
                        row_dict[timeBlockColumnName] = upperBound - inminute
                    elif (inminute < lowerBound) & (outminute <= upperBound):
                        row_dict[timeBlockColumnName] = outminute - lowerBound 
            # else: we don't do anything because the employee wasnt clocked in
    return row_dict

mappedDF = Map.apply(frame = dyF, f = create_time_block_columns).toDF()

# output some interesting logs for debugging
mappedDF.printSchema()

# Build expression to stack new columns as rows
stack_expression = F"stack({24*4}"
for i in range(24):
    for j in range(1,5):
        stack_expression += F", 't_{i}_{(j-1)*15}', t_{i}_{(j-1)*15}"

stack_expression += ') as (time_block, minutes_worked)'

timeBlockDF = mappedDF.select('pos_key', 'p_dob', 'dob', 'employee', 'rate', 'jobcode', 'pay', 'overpay', 'minutes', F.expr(stack_expression))
timeBlockDF = timeBlockDF.filter('minutes_worked > 0') \
    .withColumn("dob",F.col("dob").cast(DateType())) 


推荐阅读