java - Apache Beam:未指定 Runner,并且在类路径中未找到 DirectRunner
问题描述
我正在使用 Apache Beam 代码构建一个 gradle java 项目(请参阅下文)并在 Eclipse Oxygen 上执行。
package com.xxxx.beam;
import java.io.IOException;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
public class ApacheBeamTestProject {
public void modelExecution(){
SparkContextOptions options = (SparkContextOptions) PipelineOptionsFactory.create();
options.setSparkMaster("xxxxxxxxx");
JavaSparkContext sc = options.getProvidedSparkContext();
JavaLinearRegressionWithSGDExample.runJavaLinearRegressionWithSGDExample(sc);
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches())
.apply(MapElements
// uses imports from TypeDescriptors
.via(
new SimpleFunction <ReadableFile, KV<String,String>>() {
private static final long serialVersionUID = -5715607038612883677L;
@SuppressWarnings("unused")
public KV<String,String> createKV(ReadableFile f) {
String temp = null;
try{
temp = f.readFullyAsUTF8String();
}catch(IOException e){
}
return KV.of(f.getMetadata().resourceId().toString(), temp);
}
}
))
.apply(FileIO.write())
;
SparkPipelineResult result = (SparkPipelineResult) p.run();
result.getState();
}
public static void main(String[] args) throws IOException {
System.out.println("Test log");
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches())
.apply(MapElements
// uses imports from TypeDescriptors
.via(
new SimpleFunction <ReadableFile, KV<String,String>>() {
private static final long serialVersionUID = -5715607038612883677L;
@SuppressWarnings("unused")
public KV<String,String> createKV(ReadableFile f) {
String temp = null;
try{
temp = f.readFullyAsUTF8String();
}catch(IOException e){
}
return KV.of(f.getMetadata().resourceId().toString(), temp);
}
}
))
.apply(FileIO.write());
p.run();
}
}
在 Eclipse 中执行此项目时,我观察到以下错误。
Test log
Exception in thread "main" java.lang.IllegalArgumentException: No Runner was specified and the DirectRunner was not found on the classpath.
Specify a runner by either:
Explicitly specifying a runner by providing the 'runner' property
Adding the DirectRunner to the classpath
Calling 'PipelineOptions.setRunner(PipelineRunner)' directly
at org.apache.beam.sdk.options.PipelineOptions$DirectRunner.create(PipelineOptions.java:291)
at org.apache.beam.sdk.options.PipelineOptions$DirectRunner.create(PipelineOptions.java:281)
at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:591)
at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:532)
at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:155)
at org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:95)
at org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:49)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:44)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:150)
该项目不包含 pom.xml 文件。Gradle 已设置所有链接。我不确定如何解决此错误?有人可以建议吗?
解决方案
似乎您正在尝试使用DirectRunner
并且它不在应用程序的类路径中。您可以通过将 beam-runners-direct-java 依赖项添加到您的应用程序来提供它:
https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java
编辑(在评论中回答):您正在尝试在 spark 上运行此代码,但未在 PipelineOptions 中指定它。默认情况下,Beam 尝试在 DirectRunner 上运行代码,所以我认为这就是您收到此错误的原因。指定:
options.setRunner(SparkRunner.class);
在创建管道之前设置正确的运行器并修复问题。
推荐阅读
- c# - 批量合并(添加和更新)行
- sql - 好与坏以及为什么在 SQL 查询连接中应用复杂查询?
- java - 如何使用 Java 使用 bodyPart.isMimeType("message/rfc822") 读取 .mgs 附件
- laravel - Laravel 按特定列获取下一条/上一条记录
- google-chrome-devtools - Google Chrome DevTools 中的元素选项卡行为不端
- graph - Cypher:在多个路径中循环不同的节点
- google-sheets-api - 在图表中切换行和列
- c++ - 在可能的情况下返回右值引用,或者在不可能的情况下只返回衰减的类型
- powershell - Powershell 函数 - 向我的函数添加参数时,DLL 调用、函数中断
- facebook - 使用测试应用时 pages_read_engagement 需要 Facebook 图形 api 集成权限