首页 > 解决方案 > 在 AWS SageMaker Ground Truth 自定义标签作业的后注释 lambda 中正确返回标签

问题描述

我正在使用自定义数据类型进行 SageMaker 标记工作。但由于某种原因,我在 AWS Web 控制台中没有得到正确的标签。它应该具有选定的标签,即“Native”,但相反,我得到的<labelattributename>是“new-test-14”。

在 Ground Truth 运行后注释 lambda 之后,它似乎在返回数据对象之前修改了元数据。它返回的数据对象在元数据属性中不包含类名键,即使我对 lambda 进行硬编码以返回包含它的对象。

我的清单文件如下所示:

{"source-ref" : "s3://<file-name>", "text" : "Hello world"}
{"source-ref" : "s3://"<file-name>", "text" : "Hello world"}

工作人员的响应如下所示:

{"answers":[{"acceptanceTime":"2021-05-18T16:08:29.473Z","answerContent":{"new-test-14":{"label":"Native"}},"submissionTime":"2021-05-18T16:09:15.960Z","timeSpentInSeconds":46.487,"workerId":"private.us-east-1.ea05a03fcd679cbb","workerMetadata":{"identityData":{"identityProviderType":"Cognito","issuer":"https://cognito-idp.us-east-1.amazonaws.com/us-east-1_XPxQ9txEq","sub":"edc59ce1-e09d-4551-9e0d-a240465ea14a"}}}]}

该工作人员响应由我的后注释 lambda 处理,该 lambda 以这个 aws sample ground truth recipe为模型。这是我的代码:

import json
import sys
import boto3
from datetime import datetime



def lambda_handler(event, context):


    # Event received
    print("Received event: " + json.dumps(event, indent=2))

    labeling_job_arn = event["labelingJobArn"]
    label_attribute_name = event["labelAttributeName"]

    label_categories = None
    if "label_categories" in event:
        label_categories = event["labelCategories"]
        print(" Label Categories are : " + label_categories)

    payload = event["payload"]
    role_arn = event["roleArn"]

    output_config = None # Output s3 location. You can choose to write your annotation to this location
    if "outputConfig" in event:
        output_config = event["outputConfig"]

    # If you specified a KMS key in your labeling job, you can use the key to write
    # consolidated_output to s3 location specified in outputConfig.
    # kms_key_id = None
    # if "kmsKeyId" in event:
    #     kms_key_id = event["kmsKeyId"]

    # # Create s3 client object
    # s3_client = S3Client(role_arn, kms_key_id)
    s3_client = boto3.client('s3')

    # Perform consolidation
    return do_consolidation(labeling_job_arn, payload, label_attribute_name, s3_client)


def do_consolidation(labeling_job_arn, payload, label_attribute_name, s3_client):
    """
        Core Logic for consolidation

    :param labeling_job_arn: labeling job ARN
    :param payload:  payload data for consolidation
    :param label_attribute_name: identifier for labels in output JSON
    :param s3_client: S3 helper class
    :return: output JSON string
    """

    # Extract payload data
    if "s3Uri" in payload:
        s3_ref = payload["s3Uri"]
        payload_bucket, payload_key = s3_ref.split('/',2)[-1].split('/',1)
        payload = json.loads(s3_client.get_object(Bucket=payload_bucket, Key=payload_key)['Body'].read())
#         print(payload)

    # Payload data contains a list of data objects.
    # Iterate over it to consolidate annotations for individual data object.
    consolidated_output = []
    success_count = 0  # Number of data objects that were successfully consolidated
    failure_count = 0  # Number of data objects that failed in consolidation

    for p in range(len(payload)):
        response = None

        dataset_object_id = payload[p]['datasetObjectId']
        log_prefix = "[{}] data object id [{}] :".format(labeling_job_arn, dataset_object_id)
        print("{} Consolidating annotations BEGIN ".format(log_prefix))

        annotations = payload[p]['annotations']
#             print("{} Received Annotations from all workers {}".format(log_prefix, annotations))

        # Iterate over annotations. Log all annotation to your CloudWatch logs
        annotationsFromAllWorkers = []
        for i in range(len(annotations)):
            worker_id = annotations[i]["workerId"]
            anotation_data = annotations[i]["annotationData"]
            annotation_content = anotation_data["content"]
            annotation_content_json = json.loads(annotation_content)
            annotation_job = annotation_content_json["new_test"]
            annotation_label = annotation_job["label"]
            consolidated_annotation= {
                "workerId": worker_id,
                "annotationData": {
                    "content": {
                        "annotatedResult": {
                            "instances": [{"label":annotation_label }]    
                        }
                    }
                }
            }
            annotationsFromAllWorkers.append(consolidated_annotation)

        consolidated_annotation = {"annotationsFromAllWorkers": annotationsFromAllWorkers} # TODO : Add your consolidation logic

        # Build consolidation response object for an individual data object
        response = {
            "datasetObjectId": dataset_object_id,
            "consolidatedAnnotation": {
                "content": {
                    label_attribute_name: consolidated_annotation,
                    label_attribute_name+ "-metadata": {
                        "class-name": "Native",
                        "confidence": 0.00,
                        "human-annotated": "yes",
                        "creation-date": datetime.strftime(datetime.now(), "%Y-%m-%dT%H:%M:%S"),
                        "type": "groundtruth/custom"
                    }
 
                }
            }
        }

        success_count += 1
#             print("{} Consolidating annotations END ".format(log_prefix))

        # Append individual data object response to the list of responses.
        if response is not None:
            consolidated_output.append(response)


        failure_count += 1
        print(" Consolidation failed for dataobject {}".format(p))
        print(" Unexpected error: Consolidation failed." + str(sys.exc_info()[0]))

    print("Consolidation Complete. Success Count {}  Failure Count {}".format(success_count, failure_count))

    print(" -- Consolidated Output -- ")
    print(consolidated_output)
    print(" ------------------------- ")
    return consolidated_output

正如您在上面看到的,该do_consolidation方法返回一个硬编码的对象以包含“Native”类名,并且该lambda_handler方法返回相同的对象。这是后注释函数响应:

[{
    "datasetObjectId": "4",
    "consolidatedAnnotation": {
        "content": {
            "new-test-14": {
                "annotationsFromAllWorkers": [{
                    "workerId": "private.us-east-1.ea05a03fcd679cbb",
                    "annotationData": {
                        "content": {
                            "annotatedResult": {
                                "instances": [{
                                    "label": "Native"
                                }]
                            }
                        }
                    }
                }]
            },
            "new-test-14-metadata": {
                "class-name": "Native",
                "confidence": 0,
                "human-annotated": "yes",
                "creation-date": "2021-05-19T07:06:06",
                "type": "groundtruth/custom"
            }
        }
    }
}]

如您所见,后注释函数返回值在元数据中具有“Native”类名,因此我希望类名出现在数据对象元数据中,但事实并非如此。这是数据对象摘要的屏幕截图:

标记对象摘要

似乎 Ground Truth 覆盖了元数据,现在该对象不包含正确的标签。我想也许这就是为什么我的标签作为标签属性名称“new-test-14”而不是正确的标签“Native”出现的原因。这是 AWS Web 控制台中标记作业的屏幕截图:

贴标作业

Web 控制台应该在“标签”列中显示标签“本机”,但我在标签列中得到了“new-test-14”<labelattributename>

下面是 Ground Truth 最后生成的 output.manifest 文件:

{
    "source-ref": "s3://<file-name>",
    "text": "Hello world",
    "new-test-14": {
        "annotationsFromAllWorkers": [{
            "workerId": "private.us-east-1.ea05a03fcd679ert",
            "annotationData": {
                "content": {
                    "annotatedResult": {
                        "label": "Native"
                    }
                }
            }
        }]
    },
    "new-test-14-metadata": {
        "type": "groundtruth/custom",
        "job-name": "new-test-14",
        "human-annotated": "yes",
        "creation-date": "2021-05-18T12:34:17.400000"
    }
}

我应该从 Post-Annotation 函数返回什么?我在回复中遗漏了什么吗?如何让正确的标签出现在 AWS Web 控制台中?

标签: amazon-web-servicesaws-lambdaamazon-sagemaker

解决方案


推荐阅读