pyspark - TypeError:StructType 不能接受类型中的对象“”pyspark 模式
问题描述
我正在尝试将 rdd 转换为 spark 中的数据帧。我的 rdd 是由整数列表的并行化制成的,在转换为数据帧时我被卡住了。它返回“TypeError:StructType 不能接受类型 <class 'int'> 中的对象 60651”。
在这里你可以看得更清楚:
# Create a schema for the dataframe
schema = StructType([StructField('zipcd', IntegerType(), True)] )
# Convert list to RDD
rdd = sc.parallelize(zip_cd) #solution: close within []. Another problem for the solution, if I do that, the problem 'lenght does not match: 29275 against 1' arises
#rdd=rdd.map(lambda x:int(x))
# Create data frame
zip_cd1 = spark.createDataFrame(rdd,schema)
#print(zip_cd1.schema)
zip_cd1.show()
它返回给我以下信息:
Py4JJavaError Traceback (most recent call last)
<ipython-input-59-13ef33f842e4> in <module>
9 zip_cd1 = spark.createDataFrame(rdd,schema)
10 #print(zip_cd1.schema)
---> 11 zip_cd1.show()
~\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
482 """
483 if isinstance(truncate, bool) and truncate:
--> 484 print(self._jdf.showString(n, 20, vertical))
485 else:
486 print(self._jdf.showString(n, int(truncate), vertical))
~\Anaconda3\envs\pyspark_env\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
1307
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1311
~\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
~\Anaconda3\envs\pyspark_env\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o900.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 1240) (MTYCURB-HOLAP.ACS-JRZ.com executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main
File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 596, in process
File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 73, in wrapper
return f(*args, **kwargs)
File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\session.py", line 682, in prepare
verify_func(obj)
File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\types.py", line 1409, in verify
verify_value(obj)
File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\types.py", line 1396, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 60651 in type <class 'int'>
zip_cd 只是一个整数列表,我不知道为什么它会给我带来很多麻烦:
zip_cd
[60651,
60623,
60077,
60626,
60077,
0,
60651,
60644,
解决方案
(n,1)
您的架构需要输入形状为 not 的集合(1,n)
。
zip_cd = [60651, 60623, 60077, 60626, 60077, 0, 60651, 60644]
schema = StructType([StructField('zipcd', IntegerType(), True)])
rdd = sc.parallelize(zip_cd)
rdd = rdd.map(lambda x:[x]) # transform the rdd
zip_cd1 = spark.createDataFrame(rdd,schema)
# zip_cd1 = spark.createDataFrame([[x] for x in zip_cd], schema) # list to dataframe directly
zip_cd1.show()
结果
+-----+
|zipcd|
+-----+
|60651|
|60623|
|60077|
|60626|
|60077|
| 0|
|60651|
|60644|
+-----+
推荐阅读
- node.js - 如何从 node.js 中未分页的 API 获取数据
- talend - 在 Talend 中的 Joblet 完成之前运行的 PostJob 和 Main 作业
- javascript - 如何使用 javascript 显示 google recaptcha 错误消息
- python - 启动内核时发生错误。蜘蛛,张量流
- javascript - 如何找出这个 div 标签的反应数字?
- svelte - 如何从 DOM 结构中删除随机的 svelte-preprocess 类?
- html -
线高和高度子像素差
- mysql-8.0 - 如何为子查询设置每行主键
- reactjs - 如何在 DraftJS 中使光标位置跟踪 SelectionState?
- r - ggplot2 vs base-r 绘图及其冗长的性质