aws-glue - Py4JJavaError:调用 o67.getDynamicFrame 时出错。java.lang.reflect.InvocationTargetException
问题描述
在使用 DynamicFrame 处理数据结构类型的嵌套 json 文件时。当我运行作业时出现此错误
Py4JJavaError:调用 o67.getDynamicFrame 时出错。java.lang.reflect.InvocationTargetException.让我知道我在哪里犯错了关于这个的任何想法
以下是我在 GLUE JOB 中的代码
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame, DynamicFrameReader,
DynamicFrameWriter, DynamicFrameCollection
from pyspark.sql.functions import lit
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "experimentdb", table_name = "experiment",
transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"experimentdb", table_name = "experiment", transformation_ctx =
"datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "string", "id", "string"), ("identifier",
"string", "identifier", "string"), ("session_count", "long",
"session_count", "long"), ("language", "string", "language", "string"),
("timezone", "long", "timezone", "long"), ("game_version", "string",
"game_version", "string"), ("device_os", "string", "device_os", "string"),
("device_type", "long", "device_type", "long"), ("device_model", "string",
"device_model", "string"), ("ad_id", "string", "ad_id", "string"),
("tags.phone_number", "string", "`tags.phone_number`", "string"),
("tags.real_name", "string", "`tags.real_name`", "string"), ("tags.email",
"string", "`tags.email`", "string"), ("tags.onboardingStatus", "string",
"`tags.onboardingStatus`", "string"), ("tags.dfuStatus", "string",
"`tags.dfuStatus`", "string"), ("tags.activityStatus", "string",
"`tags.activityStatus`", "string"), ("tags.lastOperationPerformed",
"string", "`tags.lastOperationPerformed`", "string"), ("last_active",
"string", "last_active", "string"), ("playtime", "long", "playtime",
"long"), ("amount_spent", "double", "amount_spent", "double"),
("created_at", "string", "created_at", "string"), ("invalid_identifier",
"string", "invalid_identifier", "string"), ("badge_count", "long",
"badge_count", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id",
"string", "id", "string"), ("identifier", "string", "identifier",
"string"), ("session_count", "long", "session_count", "long"), ("language",
"string", "language", "string"), ("timezone", "long", "timezone", "long"),
("game_version", "string", "game_version", "string"), ("device_os",
"string", "device_os", "string"), ("device_type", "long", "device_type",
"long"), ("device_model", "string", "device_model", "string"), ("ad_id",
"string", "ad_id", "string"), ("tags.phone_number", "string",
"`tags.phone_number`", "string"), ("tags.real_name", "string",
"`tags.real_name`", "string"), ("tags.email", "string", "`tags.email`",
"string"), ("tags.onboardingStatus", "string", "`tags.onboardingStatus`",
"string"), ("tags.dfuStatus", "string", "`tags.dfuStatus`", "string"),
("tags.activityStatus", "string", "`tags.activityStatus`", "string"),
("tags.lastOperationPerformed", "string", "`tags.lastOperationPerformed`",
"string"), ("last_active", "string", "last_active", "string"), ("playtime",
"long", "playtime", "long"), ("amount_spent", "double", "amount_spent",
"double"), ("created_at", "string", "created_at", "string"),
("invalid_identifier", "string", "invalid_identifier", "string"),
("badge_count", "long", "badge_count", "long")], transformation_ctx =
"applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path":
"s3://output_data"}, format = "csv", transformation_ctx
= "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame =
applymapping1, connection_type = "s3", connection_options = {"path":
"s3://output_data"}, format = "csv", transformation_ctx
= "datasink2")
job.commit()
解决方案
您似乎遇到了连接错误。由于 S3 是您使用的唯一数据源,而且您没有创建 VPC S3 终端节点,我怀疑这就是问题所在。
不幸的是,Glue 错误日志并不能提供真正的信息,因此只能假设。我会要求您创建一个VPC S3 端点并再试一次。
推荐阅读
- amazon-cognito - aws cognito 在单个 api 调用中获取 jwt 令牌
- knex.js - 如何在 bookshelfjs 查询生成器中结合和使用?
- php - 如何找到键值对数组的下一个键
- algorithm - 删除任意行时更新列表中的位置数据
- javascript - 从控制台中删除空行
- entity-framework - Linq to Entities - 添加具有多个 ON 条件的第二个联接
- react-native - React Navigation:在组件外部导航
- apache-flink - 将值附加到 Apache Flink 中的数据集
- ionic-framework - 从服务 [Ionic 4] 在 html 中呈现 JSON
- sql-server - 计算当前行周数范围内的 N 行的数量总和