首页 > 解决方案 > pyspark的parallelize()出错

问题描述

使用 pyspark 的 parallelize() 函数并行化数据时出现错误。我正在使用 spark 2.4.3 和 python 3.7

data = [("James","Smith","USA","CA"),
        ("Michael","Rose","USA","NY"),
        ("Robert","Williams","USA","CA"),
        ("Maria","Jones","USA","FL")
      ]
    
columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
# df = spark.createDataFrame(data = data)
    
out = sc.parallelize(df)\
        .map(lambda x: (x,1))\
        .collect()

这是错误堆栈:

    330                 raise Py4JError(
    331                     "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
--> 332                     format(target_id, ".", name, value))
    333         else:
    334             raise Py4JError(

Py4JError: An error occurred while calling o93.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

标签: python-3.xapache-sparkpyspark

解决方案


您收到该错误的原因是因为您正在尝试并行化 pyspark DataFrame,但它应该是本地 python 集合(例如列表、数组等)。这里将并行化方法描述为分发本地 Python 集合以形成 RDD。如果输入表示性能范围,建议使用 xrange。.

您可能会尝试这样做:

out = df.rdd\
        .map(lambda x: (x,1))\
        .collect()

推荐阅读