首页 > 解决方案 > 为转换提供资源提示时,Dataflow Prime 作业失败

问题描述

我有一个使用 Apache Beam Java SDK v2.32.0 编写的经典数据流模板。该模板仅使用来自 Pub/Sub 订阅的消息并将它们写入 Google Cloud Storage。

该模板可成功用于运行启用了Dataflow Prime实验功能的作业,--additional-experiments enable_prime并使用以下方法提供管道级资源提示--parameters=resourceHints=min_ram=8GiB

gcloud dataflow jobs run my-job-name \
  --additional-experiments enable_prime \
  --disable-public-ips \
  --gcs-location gs://bucket/path/to/template \
  --num-workers 1  \
  --max-workers 16 \
  --parameters=resourceHints=min_ram=8GiB,other_pipeline_options=true \
  --project my-project \
  --region us-central1 \
  --service-account-email my-service-account@my-project.iam.gserviceaccount.com \
  --staging-location gs://bucket/path/to/staging
  --subnetwork https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnet

为了尝试使用 Dataflow Prime 的Right Fitting功能,我更改了管道代码以在 FileIO 转换中包含资源提示:

class WriteGcsFileTransform
    extends PTransform<PCollection<Input>, WriteFilesResult<Destination>> {

  private static final long serialVersionUID = 1L;

  @Override
  public WriteFilesResult<Destination> expand(PCollection<Input> input) {

    return input.apply(
        FileIO.<Destination, Input>writeDynamic()
            .by(myDynamicDestinationFunction)
            .withDestinationCoder(Destination.coder())
            .withNumShards(8)
            .withNaming(myDestinationFileNamingFunction)
            .withTempDirectory("gs://bucket/path/to/temp")
            .withCompression(Compression.GZIP)
            .setResourceHints(ResourceHints.create().withMinRam("32GiB"))
        );
  }

尝试从基于新代码的模板运行作业会导致连续的崩溃循环,并且作业永远不会成功运行。唯一的重复错误日志条目是:

{
  "insertId": "s=97e1ecd30e0243609d555685318325b4;i=4e1;b=6c7f5d65f3994eada5f20672dab1daf1;m=912f16c;t=5d024689cb030;x=b36751718b3d80c1",
  "jsonPayload": {
    "line": "pod_workers.go:191",
    "message": "Error syncing pod 4cf7cbf98df4b5e2d054abce7da1262b (\"df-df-hvm-my-job-name-11061310-qn51-harness-jb9f_default(4cf7c6bf982df4b5eb2d054abce7da12)\"), skipping: failed to \"StartContainer\" for \"artifact\" with CrashLoopBackOff: \"back-off 40s restarting failed container=artifact pod=df-df-hvm-my-job-name-11061310-qn51-harness-jb9f_default(4cf7c6bf982df4b5eb2d054abce7da12)\"",
    "thread": "807"
  },
  "resource": {
    "type": "dataflow_step",
    "labels": {
      "project_id": "my-project",
      "region": "us-central1",
      "step_id": "",
      "job_id": "2021-11-06_12_10_27-510057810808146686",
      "job_name": "my-job-name"
    }
  },
  "timestamp": "2021-11-06T20:14:36.052491Z",
  "severity": "ERROR",
  "labels": {
    "compute.googleapis.com/resource_type": "instance",
    "dataflow.googleapis.com/log_type": "system",
    "compute.googleapis.com/resource_id": "4695846446965678007",
    "dataflow.googleapis.com/job_name": "my-job-name",
    "dataflow.googleapis.com/job_id": "2021-11-06_12_10_27-510057810808146686",
    "dataflow.googleapis.com/region": "us-central1",
    "dataflow.googleapis.com/service_option": "prime",
    "compute.googleapis.com/resource_name": "df-hvm-my-job-name-11061310-qn51-harness-jb9f"
  },
  "logName": "projects/my-project/logs/dataflow.googleapis.com%2Fkubelet",
  "receiveTimestamp": "2021-11-06T20:14:46.471285909Z"
}

我是否在转换中错误地使用了资源提示?

标签: javagoogle-cloud-dataflowapache-beamgoogle-cloud-dataflow-prime

解决方案


推荐阅读