首页 > 解决方案 > 如何计算 Apache Beam 的百分比变化?即pandas.DataFrame.pct_change

问题描述

我是 Apache Beam 的新手,在这个看似非常简单的事情上停留了几个小时:

如何在 Apache Beam 中完成pandas.DataFrame.pct_change ?

我正在从 CSV 读取数据(使用beam.io.ReadFromText),比如:

0    90 
1    91 
2    85

我想把它变成行之间的百分比变化,即

0         NaN
1    0.011111
2   -0.065934

如何在Apache Beam管道中做到这一点?

祝一切顺利!

标签: pandasapache-beam-ioapache-beam

解决方案


Beam 相对于 Pandas 的主要优势是能够并行化很多操作。并行性也发生在读取中,因此没有像 Pandas 中那样简单的“下一个”概念。

这就是为什么需要固定顺序的操作(例如 Pandas 的所有滚动功能)在 Beam(和其他并行 ETL 框架)中更难执行的主要原因。他们几乎需要将所有元素发送给同一个工作人员并在那里执行操作,因此您正在失去 Beam 的优势,并且使用 Pandas 可能会更好。

但是,由于您有一个row告诉我们顺序的字段,我们可以使用该row字段作为解决方法,timestamps并且SlidingWindows不会丢失并行性。

由于组合器(我们对事物进行分组的方式)不是可交换/关联的,因此我们需要高级组合器。在这两个答案中有更多关于这个概念的信息1 2

p = beam.Pipeline()

class RollingChange(beam.CombineFn):
    def create_accumulator(self):
        return []

    def add_input(self, list, input):
        list.append(input)
        return list

    def merge_accumulators(self, accumulators):
        final_list = []
        for list in accumulators:
            final_list += list
        return final_list

    def extract_output(self, list_of_list):
        if len(list_of_list) == 2:
            first = list_of_list[0]
            second = list_of_list[1]
            second["change"] = second["value"] / first["value"] - 1
            return second
        elif len(list_of_list) == 1 and list_of_list[0]['row'] == 0:
            list_of_list[0]["change"] = 0
            return list_of_list[0]
        else:
            pass

elements = [
    {"row": 0, "value": 90},
    {"row": 1, "value": 91},
    {"row": 2, "value": 85},
    {"row": 3, "value": 100},
    {"row": 4, "value": 200}
]

(p | Create(elements)
 | Map(lambda x: window.TimestampedValue(x, x['row'])) # adds row as timestamp for windows
 | WindowInto(window.SlidingWindows(2, 1))
 | beam.core.CombineGlobally(RollingChange()).without_defaults()
 | beam.core.Filter(lambda x: x != None) # filters the last row (4)
 | Map(print))

p.run()

这个的输出是(注意顺序可能会改变)

{'row': 1, 'value': 91, 'change': 0.011111111111111072}
{'row': 0, 'value': 90, 'change': 0}
{'row': 2, 'value': 85, 'change': -0.06593406593406592}
{'row': 3, 'value': 100, 'change': 0.17647058823529416}
{'row': 4, 'value': 200, 'change': 1.0}

推荐阅读