首页 > 解决方案 > 通过 java sdk 将 AWS Datapipe 作业提交到非默认队列失败

问题描述

我已将我的 EMR 集群配置为具有第二个 hadoop 队列。“默认”队列和我命名为“黄金”的队列。当我的 EMR 集群启动时,队列存在并运行,正如我在 Yarn 的 Hadoop Web 界面中看到的那样。

我的 java 代码创建了一个管道作业(在本例中为 HiveActivity),并且我将一个 Field 对象列表添加到 Pipeline 对象。这些字段是管道的设置。

一个这样的字段(根据https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-hiveactivity.html上的文档是可选的)是“hadoopQueue”。当我不设置此字段时,默认情况下使用“默认”队列。工作完成,一切都很好。

当我使用此代码设置字段时

//hadoopQueue
field = new Field();
field.setKey("hadoopQueue");
field.setStringValue("gold");
fields.add(field);

作业未完成,我在数据管道日志中收到以下错误

Exception in thread "main" java.lang.RuntimeException: Local file does not exist.
at com.amazon.elasticmapreduce.scriptrunner.ScriptRunner.fetchFile(ScriptRunner.java:30)
at com.amazon.elasticmapreduce.scriptrunner.ScriptRunner.main(ScriptRunner.java:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:239)
at org.apache.hadoop.util.RunJar.main(RunJar.java:153)

如果我明确地将队列设置为“默认”,我也会收到此错误

//hadoopQueue
  field = new Field();
  field.setKey("hadoopQueue");
  field.setStringValue("default");
  fields.add(field);

只有当我不设置此选项但不使用它时,它似乎才有效,这意味着我无法指定另一个队列。

有没有人能够以这种方式成功使用“hadoopQueue”选项?

public static String scheduleDataPipelineActivity(DataPipeline dpl, String pipelineId, String script, String stepName,
  Map<String,String> params) {

PutPipelineDefinitionRequest putDefReq = new PutPipelineDefinitionRequest();
putDefReq.setPipelineId(pipelineId);
List<PipelineObject> objects = new ArrayList<>();

List<Field> fields = new ArrayList<>();

Field field = new Field();
field.setKey("failureAndRerunMode");
field.setStringValue("CASCADE");
fields.add(field);

field = new Field();
field.setKey("resourceRole");
field.setStringValue("DataPipelineDefaultResourceRole");
fields.add(field);

field = new Field();
field.setKey("role");
field.setStringValue("DataPipelineDefaultRole");
fields.add(field);

field = new Field();
field.setKey("pipelineLogUri");
field.setStringValue("s3://" + getBucketName() + "/logs");
fields.add(field);

field = new Field();
field.setKey("scheduleType");
field.setStringValue("ONDEMAND");
fields.add(field);

if ((params != null) && (params.size() > 0)) {
  for (Map.Entry<String,String> entry : params.entrySet()) {
    field = new Field();
    field.setKey("scriptVariable");
    field.setStringValue(entry.getKey() + "=" + entry.getValue());
    fields.add(field);
  }
}

PipelineObject po = new PipelineObject().withName("Default").withId("Default");
po.setFields(fields);
objects.add(po);

fields = new ArrayList<>();
field = new Field();
field.setKey("stage");
field.setStringValue("false");
fields.add(field);

field = new Field();
if (script.startsWith("s3://")) {
  field.setKey("scriptUri");
}
else if (script.length() > 2048) {

  field.setKey("scriptUri");
  Writer writer = null;
  try {
    String hiveQL = UUID.randomUUID().toString() + ".q";
    File localFile = new File(hiveQL);
    writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localFile), "UTF-8"));
    writer.write(script);
    writer.flush();
    writer.close();
    writer = null;

    String s3Key = "working/" + hiveQL;
    script = "s3://" + getBucketName() + "/" + s3Key;

    AmazonS3 s3 = getS3Client(null);
    s3.putObject(getBucketName(), s3Key, localFile);
    if (!localFile.delete()) {
      LOGGER.error("Unable to delete temporary file: " + hiveQL);
    }
  }
  catch (IOException e) {
    LOGGER.error(e.getLocalizedMessage(), e);
    throw new RuntimeException(e.getLocalizedMessage(), e);
  }
  finally {
    if (writer != null) {
      try {
        writer.close();
      }
      catch(IOException e) {
        LOGGER.error(e.getLocalizedMessage(), e);
      }
    }
  }
}
else {
  field.setKey("hiveScript");
}
field.setStringValue(script);
fields.add(field);

field = new Field();
field.setKey("workerGroup");
field.setStringValue(EMR_INSTANCE);
fields.add(field);

hadoopQueue
field = new Field();
field.setKey("hadoopQueue");
field.setStringValue("gold");
fields.add(field);

field = new Field();
field.setKey("type");
field.setStringValue("HiveActivity");
fields.add(field);

String hiveId = UUID.randomUUID().toString();
po = new PipelineObject().withName(stepName).withId(hiveId);
po.setFields(fields);
objects.add(po);

putDefReq.setPipelineObjects(objects);
PutPipelineDefinitionResult putPipelineResult = dpl.putPipelineDefinition(putDefReq);
List<ValidationError> errors = putPipelineResult.getValidationErrors();
int errorCount = 0;
if ((errors != null) && (errors.size() > 0)) {
  for (ValidationError error : errors) {
    List<String> errorStrs = error.getErrors();
    for (String errorStr : errorStrs) {
      LOGGER.error(errorStr);
      errorCount++;
    }
  }
}

List<ValidationWarning> warnings = putPipelineResult.getValidationWarnings();
if ((warnings != null) && (warnings.size() > 0)) {
  for (ValidationWarning warning : warnings) {
    List<String> warningStrs = warning.getWarnings();
    for (String warningStr : warningStrs) {
      LOGGER.warn(warningStr);
    }
  }
}

if (errorCount > 0) {
  LOGGER.fatal("BAD STUFF HAPPENED!!!!");
  throw new DataPipelineValidationException("Validation errors detected for hive activity (check log file for details): " +
      "step=" + stepName + "\terror count=" + Integer.toString(errorCount) + "\t" + pipelineId);
}

ActivatePipelineRequest activateReq = new ActivatePipelineRequest();
activateReq.setPipelineId(pipelineId);
dpl.activatePipeline(activateReq);

try {
  Thread.sleep(2500L);
}
catch (InterruptedException e) {
  LOGGER.error(e.getLocalizedMessage(), e);
}

return hiveId;

标签: amazon-web-serviceshadoophiveamazon-emramazon-data-pipeline

解决方案


推荐阅读