python - 将 Pandas 数据框转换为 Spark 数据框的问题
问题描述
我有点困惑:
我有以下脚本。我已经将 spark DF 转换为 Pandas DF 来执行我的功能。
我现在有一个输出数据帧 DF6,这正是我所需要的。
我现在需要将数据写回 HDFS(这是 Pandas 无法做到的),所以我需要将 Pandas 数据帧转换回 Spark 并将其写入目录。
我已经使用下面的函数来做到这一点,但不幸的是,它不起作用
data_spark = spark_session.createDataFrame(df6)
data_spark.show()
我得到的错误是:
Traceback (most recent call last):
File "/home/keenek1/domainscript/another1.py", line 338, in <module>
spark_df.show()
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 350, in show
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o119.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 2080, .net, executor 81): org.apache.spark.SparkException:
Error from python worker:
/usr/bin/python: No module named pyspark
显然有一个名为 PySpark 的模块,因为我在标准输出中得到了正确的 df6 输出。这意味着,Spark 已经创建了一个数据框,将其转换为 Pandas 并且正在努力使用最后一个函数。
对此我能做些什么吗?
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
import pyspark.sql.functions as sqlfunc
import pandas as pd
import time
from datetime import datetime
import os
import glob
def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.master('yarn')\
.config("hive.metastore.uris", "thrift://domain.net:9083")\
.enableHiveSupport()\
.getOrCreate()
return spark_session
### START MAIN ###
if __name__ == '__main__':
spark_session = create_session('testing_files')
#import file into dataframe
start = time.time()
#--------------------------------------------------------------------------------------------
#-----------------------------CALCUALTE DATES AND TIMES FOR QUERY----------------------------
#--------------------------------------------------------------------------------------------
dt_now = datetime.now()
target_hour = int(dt_now.strftime('%s')) - 60*60*12
today_date = datetime.fromtimestamp(target_hour).strftime('%Y%m%d')
hour = datetime.fromtimestamp(target_hour).strftime('%H')
#--------------------------------------------------------------------------------------------
#-----------------------------------CREATE DF FROM FILES ------------------------------------
#--------------------------------------------------------------------------------------------
schema = [\
StructField('dmy',StringType(), True),\
StructField('hh',StringType(), True),\
very long list of fields....
]
final_structure = StructType(schema)
df = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://nameservice/data/data/dt=20181022/hour=11/*/*', final_structure)\
.select('domain', 'optimisedsize')
df2 = df.filter(df.domain != '----').groupby('domain').agg(sqlfunc.sum(df.optimisedsize).alias('sdsf'))
df2.show()
df3 = df2.toPandas()
#--------------------------------------------------------------------------------------------
#-----------------------------DEFINE REQUIRED LOOKUP LISTS-----------------------------------
#--------------------------------------------------------------------------------------------
tld = ('co.uk', 'com', 'org', 'gov.uk', 'co', 'net', 'news', 'it', 'in' 'es', 'tw', 'pe', 'io', 'ca', 'cat', 'com.au',
'com.ar', 'com.mt', 'com.co', 'ws', 'to', 'es', 'de', 'us', 'br', 'im', 'gr', 'cc', 'cn', 'org.uk', 'me', 'ovh', 'be',
'tv', 'tech', '..', 'life', 'com.mx', 'pl', 'uk', 'ru', 'cz', 'st', 'info', 'mobi', 'today', 'eu', 'fi', 'jp', 'life',
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'earth', 'ninja', 'ie', 'im', 'ai', 'at', 'ch', 'ly', 'market', 'click',
'fr', 'nl', 'se')
cdns = ('akamai', 'akamaized', 'maxcdn', 'cloudflare')
cleandomain = []
#--------------------------------------------------------------------------------------------
#-----------------------------SPLIT DOMAIN AT EVERY DOT--------------------------------------
#--------------------------------------------------------------------------------------------
index = df3.domain.str.split('.').tolist()
#--------------------------------------------------------------------------------------------
#------------------DEFINE FUNCTION FOR DOMAIN MANIPULATION-----------------------------------
#--------------------------------------------------------------------------------------------
def domfunction():
#if it isn't a string, then print the value directly in the cleandomain list
try:
if str(x[-1]).isdigit():
try:
cleandomain.append(str(x[0])+'.'+str(x[1])+'.*.*')
except IndexError:
cleandomain.append(str(x))
#if its in the CDN list, take a subdomain as well
elif len(x) > 3 and str(x[len(x)-2]).rstrip() in cdns:
try:
cleandomain.append(str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
elif len(x) > 3 and str(x[len(x)-3]).rstrip() in cdns:
try:
cleandomain.append(str(x[len(x)-4])+'.'+str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
#if its in the TLD list, do this
elif len(x) > 2 and str(x[len(x)-2]).rstrip()+'.'+ str(x[len(x)-1]).rstrip() in tld:
try:
cleandomain.append(str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
elif len(x) > 2 and str(x[len(x)-1]) in tld:
try:
cleandomain.append(str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
except IndexError:
cleandomain.append(str(x))
#if its not in the TLD list, do this
else:
cleandomain.append(str(x))
except IndexError:
cleandomain.append(str(x))
except TypeError:
cleandomain.append(str(x))
#--------------------------------------------------------------------------------------------
#-------------LOOP OVER ITEMS WITHIN THE INDEX & CONCAT REQUIRED ELEMENTS--------------------
#--------------------------------------------------------------------------------------------
for x in index:
domfunction()
#--------------------------------------------------------------------------------------------
#-------------------------------CONFIGURE OUTPUTS--------------------------------------------
#--------------------------------------------------------------------------------------------
#add the column to the dataframe
se = pd.Series(cleandomain)
df3['newdomain2'] = se.values
#select only the new domain column & usage & group by
df5 = df3.groupby(['newdomain2'],as_index = False)[['sdsf']].sum()
df6 = df5.sort_values(['sdsf'], ascending=["true"])
print(df6)
spark_df = spark_session.createDataFrame(df6)
spark_df.show()
spark_df.coalesce(100).write.format("com.databricks.spark.csv").option("header", "false").option('sep', '\t').mode('append').save('hdfs://nameservice/user/keenek1/domainlookup')
data_spark = spark_session.createDataFrame(df6)
data_spark.show()
print(df6)
end = time.time()
print("RunTime:")
print(end-start)
解决方案
推荐阅读
- angular - SELECT ERROR SQLITE.ALL - 准备错误 1
- django - 即使是现成的 mysql 数据库,我也需要 models.py 吗?
- javascript - javascript中的异步/等待并发函数调用(链函数)
- node.js - Nodejs 锐利库 toFile 方法不进入回调代码
- c# - 我可以避免找不到此成员吗?(来自 HRESULT 的异常:0x80020003 (DISP_E_MEMBERNOTFOUND))?
- java - 在 JAVA 中执行代码块时出现问题,第一次单击时代码未完全运行
- sqlite - 高效地将一个 SQLite 数据库拆分为多个文件
- flutter - 颤振 admob 移除后退按钮
- python - 使用 4D 数据进行 Python 缩放
- reactjs - React Hook - 仅在组件卸载时使用效果,而不是在依赖项更新时使用