apache-spark - 使用 pandas_udf 将 Spark Structured DataFrame 转换为 Pandas
问题描述
我需要将 csv 文件作为流读取,然后将其转换为pandas dataframe
.
这是我到目前为止所做的
DataShema = StructType([ StructField("TimeStamp", LongType(), True), \
StructField("Count", IntegerType(), True), \
StructField("Reading", FloatType(), True) ])
group_columns = ['TimeStamp','Count','Reading']
@pandas_udf(DataShema, PandasUDFType.GROUPED_MAP)
def get_pdf(pdf):
return pd.DataFrame([pdf[group_columns]],columns=[group_columns])
# getting Surge data from the files
SrgDF = spark \
.readStream \
.schema(DataShema) \
.csv("ProcessdedData/SurgeAcc")
mydf = SrgDF.groupby(group_columns).apply(get_pdf)
qrySrg = SrgDF \
.writeStream \
.format("console") \
.start() \
.awaitTermination()
我相信从另一个来源(将 Spark Structure Streaming DataFrames 转换为 Pandas DataFrame)直接将结构化流数据帧转换为 pandas 是不可能的,而且似乎 pandas_udf 是正确的方法,但无法确切地弄清楚如何实现这一点。我需要将 pandas 数据框传递给我的函数。
编辑
当我运行代码(将查询更改为mydf
而不是SrgDF
)时,我收到以下错误:pyspark.sql.utils.StreamingQueryException: 'Writing job aborted.\n=== Streaming Query ===\nIdentifier: [id = 18a15e9e-9762-4464-b6d1-cb2db8d0ac41, runId = e3da131e-00d1-4fed-82fc-65bf377c3f99]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {FileStreamSource[file:/home/mls5/Work_Research/Codes/Misc/Python/MachineLearning_ArtificialIntelligence/00_Examples/01_ApacheSpark/01_ComfortApp/ProcessdedData/SurgeAcc]: {"logOffset":0}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nFlatMapGroupsInPandas [Count#1], get_pdf(TimeStamp#0L, Count#1, Reading#2), [TimeStamp#10L, Count#11, Reading#12]\n+- Project [Count#1, TimeStamp#0L, Count#1, Reading#2]\n +- StreamingExecutionRelation FileStreamSource[file:/home/mls5/Work_Research/Codes/Misc/Python/MachineLearning_ArtificialIntelligence/00_Examples/01_ApacheSpark/01_ComfortApp/ProcessdedData/SurgeAcc], [TimeStamp#0L, Count#1, Reading#2]\n'
19/05/20 18:32:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
/usr/local/lib/python3.6/dist-packages/pyarrow/__init__.py:152: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
.
编辑-2
这是重现错误的代码
import sys
from pyspark import SparkContext
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.streaming import StreamingContext
from pyspark.sql.types import *
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyarrow as pa
import glob
#####################################################################################
if __name__ == '__main__' :
spark = SparkSession \
.builder \
.appName("RealTimeIMUAnalysis") \
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# reduce verbosity
sc = spark.sparkContext
sc.setLogLevel("ERROR")
##############################################################################
# using the saved files to do the Analysis
DataShema = StructType([ StructField("TimeStamp", LongType(), True), \
StructField("Count", IntegerType(), True), \
StructField("Reading", FloatType(), True) ])
group_columns = ['TimeStamp','Count','Reading']
@pandas_udf(DataShema, PandasUDFType.GROUPED_MAP)
def get_pdf(pdf):
return pd.DataFrame([pdf[group_columns]],columns=[group_columns])
# getting Surge data from the files
SrgDF = spark \
.readStream \
.schema(DataShema) \
.csv("SurgeAcc")
mydf = SrgDF.groupby('Count').apply(get_pdf)
#print(mydf)
qrySrg = mydf \
.writeStream \
.format("console") \
.start() \
.awaitTermination()
要运行,您需要创建一个名为SurgeAcc
code 所在的文件夹,并在其中创建一个 csv 文件,格式如下:
TimeStamp,Count,Reading
1557011317299,45148,-0.015494
1557011317299,45153,-0.015963
1557011319511,45201,-0.015494
1557011319511,45221,-0.015494
1557011315134,45092,-0.015494
1557011315135,45107,-0.014085
1557011317299,45158,-0.015963
1557011317299,45163,-0.015494
1557011317299,45168,-0.015024`
解决方案
您返回的 pandas_udf 数据框与指定的架构不匹配。
请注意,pandas_udf 的输入将是 pandas 数据帧,并且还会返回 pandas 数据帧。
您可以使用 pandas_udf 中的所有 pandas 函数。您唯一需要确保的是 ReturnDataShema 应该与函数的实际输出相匹配。
ReturnDataShema = StructType([StructField("TimeStamp", LongType(), True), \
StructField("Count", IntegerType(), True), \
StructField("Reading", FloatType(), True), \
StructField("TotalCount", FloatType(), True)])
@pandas_udf(ReturnDataShema, PandasUDFType.GROUPED_MAP)
def get_pdf(pdf):
# This following stmt is causing schema mismatch
# return pd.DataFrame([pdf[group_columns]],columns=[group_columns])
# If you want to return all the rows of pandas dataframe
# you can simply
# return pdf
# If you want to do any aggregations, you can do like the below, or use pandas query
# but make sure the return pandas dataframe complies with ReturnDataShema
total_count = pdf['Count'].sum()
return pd.DataFrame([(pdf.TimeStamp[0],pdf.Count[0],pdf.Reading[0],total_count)])
推荐阅读
- c# - 使用 Azure DevOps Build 从我的代码中看不到代码覆盖率
- python - 在 Docker 容器中运行`flake8 src`时没有输出
- roku - Roku 中是否有运动口音字符?
- android - 如何在 Cordova 中查找本机代码 - Android 项目
- javascript - xgettext 没有找到所有翻译的字符串
- angularjs - 有没有办法在angularjs中stateprovider的状态函数中添加$filter
- python - 在 pyspark DataFrame 中创建特定类型的空数组列
- c++ - stm32f4 - 在运行时将函数从闪存复制到 ram
- javascript - 如何将 HTML 元素定位在具有不同高度的固定 div 下?
- verilog - 在“=”附近:语法错误,意外的“=”,期待第 36、41、50、55 行的 IDENTIFIER 或 TYPE_IDENTIFIER