pandas - 如何计算 Apache Beam 的百分比变化?即pandas.DataFrame.pct_change
问题描述
我是 Apache Beam 的新手,在这个看似非常简单的事情上停留了几个小时:
如何在 Apache Beam 中完成pandas.DataFrame.pct_change ?
我正在从 CSV 读取数据(使用beam.io.ReadFromText),比如:
0 90
1 91
2 85
我想把它变成行之间的百分比变化,即
0 NaN
1 0.011111
2 -0.065934
如何在Apache Beam管道中做到这一点?
祝一切顺利!
解决方案
Beam 相对于 Pandas 的主要优势是能够并行化很多操作。并行性也发生在读取中,因此没有像 Pandas 中那样简单的“下一个”概念。
这就是为什么需要固定顺序的操作(例如 Pandas 的所有滚动功能)在 Beam(和其他并行 ETL 框架)中更难执行的主要原因。他们几乎需要将所有元素发送给同一个工作人员并在那里执行操作,因此您正在失去 Beam 的优势,并且使用 Pandas 可能会更好。
但是,由于您有一个row
告诉我们顺序的字段,我们可以使用该row
字段作为解决方法,timestamps
并且SlidingWindows
不会丢失并行性。
由于组合器(我们对事物进行分组的方式)不是可交换/关联的,因此我们需要高级组合器。在这两个答案中有更多关于这个概念的信息1 2
p = beam.Pipeline()
class RollingChange(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, list, input):
list.append(input)
return list
def merge_accumulators(self, accumulators):
final_list = []
for list in accumulators:
final_list += list
return final_list
def extract_output(self, list_of_list):
if len(list_of_list) == 2:
first = list_of_list[0]
second = list_of_list[1]
second["change"] = second["value"] / first["value"] - 1
return second
elif len(list_of_list) == 1 and list_of_list[0]['row'] == 0:
list_of_list[0]["change"] = 0
return list_of_list[0]
else:
pass
elements = [
{"row": 0, "value": 90},
{"row": 1, "value": 91},
{"row": 2, "value": 85},
{"row": 3, "value": 100},
{"row": 4, "value": 200}
]
(p | Create(elements)
| Map(lambda x: window.TimestampedValue(x, x['row'])) # adds row as timestamp for windows
| WindowInto(window.SlidingWindows(2, 1))
| beam.core.CombineGlobally(RollingChange()).without_defaults()
| beam.core.Filter(lambda x: x != None) # filters the last row (4)
| Map(print))
p.run()
这个的输出是(注意顺序可能会改变)
{'row': 1, 'value': 91, 'change': 0.011111111111111072}
{'row': 0, 'value': 90, 'change': 0}
{'row': 2, 'value': 85, 'change': -0.06593406593406592}
{'row': 3, 'value': 100, 'change': 0.17647058823529416}
{'row': 4, 'value': 200, 'change': 1.0}
推荐阅读
- javascript - “未定义的选择console.log(数据[])”
- algorithm - 递归调用和普通调用的区别
- xamarin - Xamarin:如何通过 USB 读取模拟音频?
- intellij-idea - 显示 UML 类图自动填充类依赖项
- python - Python matplotlib.animation Jupyter Notebook
- machine-learning - keras 适合解码器,然后适合自动编码器
- php - Excel同时导入将空行添加到PHP中的数据库中
- c# - .NET 5 和 SPA 路由
- android - 如何仅在下拉菜单中获取月份并将默认值设置为当前月份?
- java - Android Volley 在服务器上没有被识别为 POST 与 php