python - 向 dask 数据框添加一列,通过滚动窗口计算它
问题描述
假设我有以下代码来生成一个虚拟的 dask 数据帧:
import pandas as pd
import dask.dataframe as dd
pandas_dataframe = pd.DataFrame({'A' : [0,500,1000], 'B': [-100, 200, 300] , 'C' : [0,0,1.0] } )
test_data_frame = dd.from_pandas( pandas_dataframe, npartitions= 1 )
理想情况下,我想知道向数据框中添加另一列的推荐方法是什么,通过滚动窗口以惰性方式计算列内容。
我想出了以下方法:
import numpy as np
import dask.delayed as delay
@delay
def coupled_operation_example(dask_dataframe,
list_of_input_lbls,
fcn,
window_size,
init_value,
output_lbl):
def preallocate_channel_data(vector_length, first_components):
vector_out = np.zeros(len(dask_dataframe))
vector_out[0:len(first_components)] = first_components
return vector_out
def create_output_signal(relevant_data, fcn, window_size , initiated_vec):
## to be written; fcn would be a fcn accepting the sliding window
initiatied_vec = preallocate_channel_data(len(dask_dataframe, init_value))
relevant_data = dask_dataframe[list_of_input_lbls]
my_output_signal = create_output_signal(relevant_data, fcn, window_size, initiated_vec)
我正在写这篇文章,确信 dask 数据框将允许我进行一些切片:他们没有。因此,我的第一个选择是将计算中涉及的列提取为 numpy 数组,但它们会被热切地评估。我认为性能上的损失将是巨大的。目前我使用 h5py 从 h5 数据创建 dask 数据帧:所以一切都是惰性的,直到我编写输出文件。
到目前为止,我只处理某一行的数据;所以我一直在使用:
test_data_frame .apply(fcn, axis =1, meta = float)
我认为滚动窗口没有等效的功能方法;我对吗?我想要 F# 或 Haskell 中的 Seq.windowed 之类的东西。任何建议高度赞赏。
解决方案
我试图通过关闭来解决它。一旦我完成了代码,我将发布一些数据的基准测试。现在我有以下玩具示例,它似乎有效:因为 dask 数据框的应用方法似乎保留了行顺序。
import numpy as np
import pandas as pd
import dask.dataframe as dd
number_of_components = 30
df = pd.DataFrame(np.random.randint(0,number_of_components,size=(number_of_components, 2)), columns=list('AB'))
my_data_frame = dd.from_pandas(df, npartitions = 1 )
def sumPrevious( previousState ) :
def getValue(row):
nonlocal previousState
something = row['A'] - previousState
previousState = row['A']
return something
return getValue
given_func = sumPrevious(1 )
out = my_data_frame.apply(given_func, axis = 1 , meta = float)
df['computed'] = out.compute()
现在坏消息是,我试图通过这个新函数将它抽象出来,传递状态并使用任意宽度的滚动窗口:
def generalised_coupled_computation(previous_state , coupled_computation, previous_state_update) :
def inner_function(actual_state):
nonlocal previous_state
actual_value = coupled_computation(actual_state , previous_state )
previous_state = previous_state_update(actual_state, previous_state)
return actual_value
return inner_function
假设我们用以下方式初始化函数:
init_state = df.loc[0]
coupled_computation = lambda act,prev : act['A'] - prev['A']
new_update = lambda act, prev : act
given_func3 = generalised_coupled_computation(init_state , coupled_computation, new_update )
out3 = my_data_frame.apply(given_func3, axis = 1 , meta = float)
尝试运行它并为意外做好准备:第一个元素是错误的,可能是一些指针的问题,给定奇怪的结果。有什么见解吗?
无论如何,如果一个人通过原始类型,它似乎可以工作。
更新:
the solution is in using copy:
import copy as copy
def new_update(act, previous):
return copy.copy(act)
Now the functions behaves as expected; of course it is necessary to adapt the function updates and the coupled computation function if one needs a more coupled logic
推荐阅读
- hadoop - 启用kerberos后无法启动journalnode
- r - 绘图中的垂直偏移 - ggplot 和 geom_line
- android - StatefulWidget 作为列表项在 ListView.builder 中的每个滚动上重新创建
- flutter - 提供者状态管理实施。将函数放入 create: property
- python-3.x - tensorflow 和 keras - 从 SQS 进行生产中的快速推理
- android - 使用 ViewModelProvider(this) 初始化 ViewModel 实例时出现问题
- ubuntu - Letsencrypt 证书的续订看起来不错,但页面上的证书不会更新
- javascript - 为什么错误处理在 nodemailer 中不起作用?
- python - * 在 python 的打印函数中
- java - 为什么我的按钮在 Android Studio xml 中没有水平对齐?