python - 一周期傅立叶窗优化。我的代码效率低下
问题描述
再会
编辑: 我想要什么:从电力系统(PS)上的任何电流/电压波形中,我想要过滤后的 50Hz(基本)RMS 值幅度(以及实际上它们的角度)。测量的电流包含从 100Hz 到 1250Hz 的所有谐波,具体取决于设备。无法使用具有这些谐波的波形进行分析和计算,您的误差会变得如此之大(取决于设备),以至于 PS 保护设备计算出不正确的数量。附加的信号还涉及许多其他频率分量。
我的目标:PS保护继电器很特殊,可以在很短的时间内计算出20ms的窗口。我不是想得到这个。我正在使用外部记录技术并测试继电器看到的内容是否真实并且它们是否正常运行。因此,我需要做他们所做的事情,只保持 50Hz 的值,没有任何谐波和 DC。
重要的预期结果:给定信号中可能存在的任何频率分量,我想查看任何给定谐波的幅度(150,250 - 基波的第 3 次谐波幅度和第 5 次谐波)以及 DC 的幅度。这将告诉我哪种类型的 PS 设备可能注入这些频率。重要的是我提供了一个频率,答案是该频率的向量,只有所有其他值被过滤掉。RMS-of-the-fundal 与 RMS 不同,因数为 4000A(仅 50Hz)和 4500A(包括其他频率)
此代码计算给定频率的单周期傅立叶值 (RMS)。我认为有时称为傅立叶滤波器?我将它用于电力系统 50Hz/0Hz/150Hz 类似物分析。(答案已经过测试并且是正确的基本 RMS 值。(https://users.wpi.edu/~goulet/Matlab/overlap/trigfs.html)
对于大样本,代码非常慢。对于 55000 个数据点,需要 12 秒。对于 3 个电压和 3 个电流,这变得非常缓慢。我每天看 100 条记录。
我该如何增强它?有哪些 Python 提示和技巧/库可以附加我的列表/数组。(也可以随意重写或使用代码)。我使用代码在给定时间从信号中挑选出某些值。(这就像从一个专门的电力系统分析程序中读取值) 编辑:我如何加载文件并使用它们,代码可以粘贴它:
import matplotlib.pyplot as plt
import csv
import math
import numpy as np
import cmath
# FILES ATTACHED TO POST
filenamecfg = r"E:/Python_Practise/2019-10-21 13-54-38-482.CFG"
filename = r"E:/Python_Practise/2019-10-21 13-54-38-482.DAT"
t = []
IR = []
newIR=[]
with open(filenamecfg,'r') as csvfile1:
cfgfile = [row for row in csv.reader(csvfile1, delimiter=',')]
numberofchannels=int(np.array(cfgfile)[1][0])
scaleval = float(np.array(cfgfile)[3][5])
scalevalI = float(np.array(cfgfile)[8][5])
samplingfreq = float(np.array(cfgfile)[numberofchannels+4][0])
numsamples = int(np.array(cfgfile)[numberofchannels+4][1])
freq = float(np.array(cfgfile)[numberofchannels+2][0])
intsample = int(samplingfreq/freq)
#TODO neeeed to get number of samples and frequency and detect
#automatically
#scaleval = np.array(cfgfile)[3]
print('multiplier:',scaleval)
print('SampFrq:',samplingfreq)
print('NumSamples:',numsamples)
print('Freq:',freq)
with open(filename,'r') as csvfile:
plots = csv.reader(csvfile, delimiter=',')
for row in plots:
t.append(float(row[1])/1000000) #get time from us to s
IR.append(float(row[6]))
newIR = np.array(IR) * scalevalI
t = np.array(t)
def mag_and_theta_for_given_freq(f,IVsignal,Tsignal,samples): #samples are the sample window size you want to caclulate for (256 in my case)
# f in hertz, IVsignal, Tsignal in numpy.array
timegap = Tsignal[2]-Tsignal[3]
pi = math.pi
w = 2*pi*f
Xr = []
Xc = []
Cplx = []
mag = []
theta = []
#print("Calculating for frequency:",f)
for i in range(len(IVsignal)-samples):
newspan = range(i,i+samples)
timewindow = Tsignal[newspan]
#print("this is my time: ",timewindow)
Sig20ms = IVsignal[newspan]
N = len(Sig20ms) #get number of samples of my current Freq
RealI = np.multiply(Sig20ms, np.cos(w*timewindow)) #Get Real and Imaginary part of any signal for given frequency
ImagI = -1*np.multiply(Sig20ms, np.sin(w*timewindow)) #Filters and calculates 1 WINDOW RMS (root mean square value).
#calculate for whole signal and create a new vector. This is the RMS vector (used everywhere in power system analysis)
Xr.append((math.sqrt(2)/N)*sum(RealI)) ### TAKES SO MUCH TIME
Xc.append((math.sqrt(2)/N)*sum(ImagI)) ## these steps make RMS
Cplx.append(complex(Xr[i],Xc[i]))
mag.append(abs(Cplx[i]))
theta.append(np.angle(Cplx[i]))#th*180/pi # this can be used to get Degrees if necessary
#also for freq 0 (DC) id the offset is negative how do I return a negative to indicate this when i'm using MAGnitude or Absolute value
return Cplx,mag,theta #mag[:,1]#,theta # BUT THE MAGNITUDE WILL NEVER BE zero
myZ,magn,th = mag_and_theta_for_given_freq(freq,newIR,t,intsample)
plt.plot(newIR[0:30000],'b',linewidth=0.4)#, label='CFG has been loaded!')
plt.plot(magn[0:30000],'r',linewidth=1)
plt.show()
鉴于附加的文件,粘贴的代码运行顺利
编辑:请在此处找到测试 csvfile 和 COMTRADE TEST 文件:CSV: https ://drive.google.com/open?id=18zc4Ms_MtYAeTBm7tNQTcQkTnFWQ4LUu
比较https://drive.google.com/file/d/1j3mcBrljgerqIeJo7eiwWo9eDu_ocv9x/view?usp=sharing https://drive.google.com/file/d/1pwYm2yj2x8sKYQUcw3dPy_a9GrqAgFtD/view?usp=sharing
解决方案
前言
正如我在之前的评论中所说:
您的代码主要依赖于一个
for
包含大量索引和标量操作的循环。您已经导入numpy
,因此您应该利用矢量化。
这个答案是您解决方案的开始。
轻量级 MCVE
首先,我们为 MCVE 创建一个试验信号:
import numpy as np
# Synthetic signal sampler: 5s sampled as 400 Hz
fs = 400 # Hz
t = 5 # s
t = np.linspace(0, t, fs*t+1)
# Synthetic Signal: Amplitude is about 325V @50Hz
A = 325 # V
f = 50 # Hz
y = A*np.sin(2*f*np.pi*t) # V
然后我们可以使用通常的公式计算这个信号的RMS :
# Actual definition of RMS:
yrms = np.sqrt(np.mean(y**2)) # 229.75 V
或者,我们可以使用DFT来计算它(rfft
在 中实现numpy.fft
):
# RMS using FFT:
Y = np.fft.rfft(y)/y.size
Yrms = np.sqrt(np.real(Y[0]**2 + np.sum(Y[1:]*np.conj(Y[1:]))/2)) # 229.64 V
可以在此处找到最后一个公式为何有效的演示。这是有效的,因为Parseval 定理意味着傅里叶变换确实节约了能量。
两个版本都使用向量化函数,无需将实部和虚部分开进行计算,然后重新组合成一个复数。
MCVE:开窗
我怀疑您想将此函数用作 RMS 值即将发生变化的长期时间序列的窗口。pandas
然后我们可以使用提供时间序列商品的库来解决这个问题。
import pandas as pd
我们封装 RMS 函数:
def rms(y):
Y = 2*np.fft.rfft(y)/y.size
return np.sqrt(np.real(Y[0]**2 + np.sum(Y[1:]*np.conj(Y[1:]))/2))
我们生成一个阻尼信号(可变 RMS)
y = np.exp(-0.1*t)*A*np.sin(2*f*np.pi*t)
我们将试验信号包装到数据帧中以利用rolling
orresample
方法:
df = pd.DataFrame(y, index=t*pd.Timedelta('1s'), columns=['signal'])
信号的滚动 RMS 是:
df['rms'] = df.rolling(int(fs/f)).agg(rms)
周期性采样的 RMS 为:
df['signal'].resample('1s').agg(rms)
后者返回:
00:00:00 2.187840e+02
00:00:01 1.979639e+02
00:00:02 1.791252e+02
00:00:03 1.620792e+02
00:00:04 1.466553e+02
信号调理
为了满足您只保留基波 (50 Hz) 的需求,一个简单的解决方案可能是线性去趋势(以消除恒定和线性趋势),然后是巴特沃斯 滤波器(带通滤波器)。
我们生成具有其他频率和线性趋势的合成信号:
y = np.exp(-0.1*t)*A*(np.sin(2*f*np.pi*t) \
+ 0.2*np.sin(8*f*np.pi*t) + 0.1*np.sin(16*f*np.pi*t)) \
+ A/20*t + A/100
然后我们调节信号:
from scipy import signal
yd = signal.detrend(y, type='linear')
filt = signal.butter(5, [40,60], btype='band', fs=fs, output='sos', analog=False)
yfilt = signal.sosfilt(filt, yd)
从图形上看,它导致:
它继续在 RMS 计算之前应用信号调理。
推荐阅读
- java - 如何使用打开的相机android在相机预览屏幕中添加镜像效果
- sql - 防止在表更新时同时触发多个触发器
- java - 使用 Apache POI 在 Excel 中嵌入 PDF 和 Word 文档
- javascript - 如何将数据传递到 chartjs 标签?
- cypress - 赛普拉斯:如果条件满足,我需要退出 for 循环
- java - 在java Android中检查字符串是否仅包含数字和空格
- rabbitmq - 当我们使用 RabbitMQ 发布消息时如何打印订阅者的状态
- jupyter-notebook - 如何像我们在 pycharm 或可视代码中一样在 jupyter notebook 中拥有自动完成功能?
- kubernetes - 通过 Helm 安装 ingress-nginx - 检索密码时出错
- python - 在 Python 中使用文件属性时在控制台上显示打印