首页 > 解决方案 > 按时间戳列过滤 Pyspark DataFrame 问题(IndexError:列表索引超出范围)

问题描述

我在尝试过滤的 Pyspark 数据框中有一个时间戳列,并且我不断收到索引超出范围错误。这似乎是一个非常简单的任务,我之前在多个其他数据帧上做过,所以我不明白这个问题。也没有 null/None 值...

当我将其转换为 Pandas DataFrame 并在那里执行操作时,它也可以正常工作......我很困惑

df.show()
+-------------------+--------------+
|               time|         data |
+-------------------+--------------+
|2019-04-15 11:04:03|           foo|
|2019-04-22 13:09:18|           bar|
|2019-04-22 07:10:10|           foo|
|2019-04-23 18:00:58|           bar|
|2019-04-18 06:17:45|           foo|
|2019-04-16 15:27:37|           bar|
+-------------------+--------------+

df = df.filter(df.time>pd.Timestamp('2019-04-20'))
df.show()

Py4JJavaError Traceback (most recent call last) <command-785596> in
<module>() ----> 1 df.filter(df.time>pd.to_datetime('2019-04
20')).show() /databricks/spark/python/pyspark/sql/dataframe.py in
show(self, n, truncate, vertical) 350 """ 351 if isinstance(truncate,
bool) and truncate: --> 352 print(self._jdf.showString(n, 20,
vertical)) 353 else: 354 print(self._jdf.showString(n, int(truncate),
vertical)) /databricks/spark/python/lib/py4j-0.10.7
src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer =
self.gateway_client.send_command(command) 1256 return_value =
get_return_value( -> 1257 answer, self.gateway_client, self.target_id,
self.name) 1258 1259 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def
deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except
py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name) 326 raise
Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". -->
328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError(
Py4JJavaError: An error occurred while calling o18822.showString. :
org.apache.spark.SparkException: Job aborted due to stage failure: Task
42 in stage 13206.0 failed 4 times, most recent failure: Lost task 42.3
in stage 13206.0 (TID 769924, 10.111.245.96, executor 397):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last): File "/databricks/spark/python/pyspark/worker.py", line
262, in main process() File
"/databricks/spark/python/pyspark/worker.py", line 257, in process
serializer.dump_stream(func(split_index, iterator), outfile) File
"/databricks/spark/python/pyspark/serializers.py", line 333, in
dump_stream self.serializer.dump_stream(self._batched(iterator),
stream) File "/databricks/spark/python/pyspark/serializers.py", line
144, in dump_stream for obj in iterator: File
"/databricks/spark/python/pyspark/serializers.py", line 322, in
_batched for item in iterator: File "<string>", line 1, in <lambda>
File "/databricks/spark/python/pyspark/worker.py", line 79, in <lambda>
return lambda *a: f(*a) File
"/databricks/spark/python/pyspark/util.py", line 55, in wrapper return
f(*args, **kwargs) File "<command-780757>", line 17, in <lambda>

IndexError: list index out of range 

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePyth
nException(PythonRunner.scala:317) at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(Pyt
onUDFRunner.scala:83) at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(Pyt
onUDFRunner.scala:66) at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(Py
honRunner.scala:271) at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.s
ala:37) at
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIter
torForCodegenStage4.processNext(Unknown Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRow
terator.java:43) at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon
1.hasNext(WholeStageCodegenExec.scala:620) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126) at
scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at
scala.collection.Iterator$class.foreach(Iterator.scala:893) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD
scala:249) at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIte
atorToStream(PythonUDFRunner.scala:52) at
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run
1.apply(PythonRunner.scala:234) at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRu
ner.scala:182)

标签: pythonpyspark

解决方案


由于df是连接的结果,因此您需要在执行和过滤之前缓存数据帧。如果我df.cache()在调用过滤器之前运行,它会完美运行


推荐阅读