text - 无法在 Pyspark 中执行用户定义的函数 RegexTokenizer
问题描述
我正在尝试使用 Pyspark 使用数据中的文本功能执行文本分类。下面是我的文本预处理代码,该代码无法执行用户定义的函数 RegexTokenizer。
tokenizer = RegexTokenizer(inputCol = "text", outputCol = "words", pattern = "\\W")
add_stopwords = StopWordsRemover.loadDefaultStopWords("english")
remover = StopWordsRemover(inputCol = "words", outputCol = "filtered").setStopWords(add_stopwords)
label_stringIdx = StringIndexer(inputCol = "label", outputCol = "target")
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=1000, minDF=5)
#pipleline for text pre-processing
pipeline = Pipeline(stages=[tokenizer,remover, countVectors, label_stringIdx])
#fit the dat for the pipeline
pipelineFit = pipeline.fit(dataset)
dataset = pipelineFit.transform(dataset)
dataset.show()
错误是:
/usr/local/lib/python3.6/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o589.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 92.0 failed 1 times, most recent failure: Lost task 2.0 in stage 92.0 (TID 1317, 1e1a151fa0f5, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(RegexTokenizer$$Lambda$3017/0x000000084123e040: (string) => array<string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
... 19 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:233)
at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:149)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(RegexTokenizer$$Lambda$3017/0x000000084123e040: (string) => array<string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.lang.NullPointerException
at org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc$2(Tokenizer.scala:146)
... 19 more
非常感谢任何帮助。
解决方案
这个问题是因为您null
在输入列中有值text
。
val tokenizer = new RegexTokenizer()
.setInputCol("text")
.setOutputCol("words")
.setPattern("\\W")
val inputDF = Seq("this is text", null).toDF("text")
inputDF.show(false)
/**
* +------------+
* |text |
* +------------+
* |this is text|
* |null |
* +------------+
*/
// this fails
tokenizer.transform(inputDF)
.show(false)
/**
* Caused by: java.lang.NullPointerException
* at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:143)
* at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:141)
*/
删除空行或估算空列
// replace nulls from text col to specified string as below
val nullImputedDF = inputDF.na.fill("I am null imputed", Seq("text"))
tokenizer.transform(nullImputedDF)
.show(false)
/**
* +-----------------+----------------------+
* |text |words |
* +-----------------+----------------------+
* |this is text |[this, is, text] |
* |I am null imputed|[i, am, null, imputed]|
* +-----------------+----------------------+
*/
推荐阅读
- vb.net - 在后台轻松运行功能
- spring-boot - 在微服务项目中使用 Spring Boot 云配置服务器的配置服务器配置无效
- javascript - 将 JavaScript 代码附加到 div 后如何运行它们
- reactjs - React toastify toast 每次调用都会出现 4 次
- powershell - 从段落powershell中过滤单词
- javascript - Javascript函数无法更新变量
- regex - 使用正则表达式匹配mongodb集合中文档中的项目数组的字符串
- flutter - 如何在 index.html 中为 Flutter Web 配置 href url
- authentication - 登录后我想重定向到 jitsiReact 页面
- css - 带有bs4的col中的中心元素