首页 > 解决方案 > 如何在 AWS Step Function 中重试已处理的故障

问题描述

我有一个 step 函数,它基本上执行以下操作:

  1. [START]运行 Lambda 以启动 Athena 查询,返回 QueryID
  2. [POLL]运行 Lambda 以检查查询的状态,返回状态信息
  3. 选择状态。
    • 如果STATUS = SUCCESS->[PROCESS_RESULTS]
    • 如果STATUS = RUNNING-> 等待 30 秒,[POLL]
    • 如果STATUS = FAILED->[FAILURE_STEP]

STATUS = FAILED不像 Lambda 失败,它返回了正确的结果,但该结果表示查询失败。我最近在 Athena 上看到了更多“查询以这种规模耗尽资源”的错误,但它们几乎总是在第二次出现。我想将其设置为[START]使用 Step Functions 文档中定义的暂停和指数回退重试,类似于:

   "Retry": [ {
      "ErrorEquals": [ "ErrorA", "ErrorB" ],
      "IntervalSeconds": 1,
      "BackoffRate": 2.0,
      "MaxAttempts": 2
   },

但这似乎只适用于真正的错误,如异常。有没有办法根据技术上无错误的结果来实现这一点?

标签: aws-step-functions

解决方案


您可以向流中添加另一个选择状态:

States:
  start_long_running_query:
    Type: Task
    Resource:
      Fn::GetAtt: [StartLongRunningQueryLambdaFunction, Arn]
    Next: wait_query_execution
  wait_query_execution:
    Type: Wait
    Seconds: 2
    Next: get_query_status
  get_query_status:
    Type: Task
    Resource:
      Fn::GetAtt: [GetLongRunningQueryStatusLambdaFunction, Arn]
    Next: check_query_status
  check_query_status:
    Type: Choice
    Default: wait_query_execution # RUNNING
    Choices:
      - Or:
          - Variable: "$.AthenaQueryStatusInformation.QueryState"
            StringEquals: FAILED
          - Variable: "$.AthenaQueryStatusInformation.QueryState"
            StringEquals: CANCELLED
        Next: maybe_retry
      - Variable: "$.AthenaQueryStatusInformation.QueryState"
        StringEquals: SUCCEEDED
        Next: query_succeded
  maybe_retry:
    Type: Choice
    Default: query_failed
    Choices:
      - Variable: "$.retry"
        BooleanEquals: true
        Next: start_long_running_query
  query_failed:
    Type: Pass
    Result: "This is a fallback for failed code"
    End: true
  query_succeded:
    Type: Pass
    Result: "This is a fallback for success code"
    End: true

然后在您的 lambda 脚本中,您必须处理一个计数器 (retry_cnt) 和一个标志 (retry) 变量,以便将其传递给其他步骤:

max_retry = os.getenv('max_retry', 1)

def start_long_running_query(event, context):
    
    # Your code here...

    # initilize the counter: must be taken from event if present
    # to prevent an infinite loop.
    event["retry_cnt"] = event["retry_cnt"] if "retry_cnt" in event else 0

    return event

def get_long_running_query_status(event, context):

    # Your code here...

    if status_information['QueryState'] == 'FAILED' or status_information['QueryState'] == 'CANCELLED':
        event["retry_cnt"] = int(event["retry_cnt"]) + 1
        event["retry"] = False if event["retry_cnt"] > int(max_retry) else True
    else:
        event["retry"] = False

    return event

推荐阅读