pyspark - AWS Glue 作业方法 pyWriteDynamicFrame 不存在
问题描述
我的目标是从现有目录表中读取数据框,进行一些转换并从中创建一个新表。所以根据https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html,我使用的sink.writeFrame
方法是:
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_db", table_name = "table1", transformation_ctx = "datasource0")
datasource1 = datasource0.toDF().withColumn("date", current_date().cast("string"))
datasource2 = DynamicFrame.fromDF(datasource1, glueContext, "datasource2")
sink = glueContext.getSink(connection_type="s3", path="s3://my_bucket/output", enableUpdateCatalog=True)
sink.setFormat("json")
sink.setCatalogInfo(catalogDatabase='my_db', catalogTableName='table2')
sink.writeFrame(datasource2)
job.commit()
但结果我得到一个误导性错误,该方法 pyWriteDynamicFrame 不存在:
Traceback (most recent call last):
File "/tmp/test", line 39, in <module>
sink.writeFrame(datasource1)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 332, in get_return_value
format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o75.pyWriteDynamicFrame. Trace:
py4j.Py4JException: Method pyWriteDynamicFrame([class org.apache.spark.sql.Dataset, class java.lang.String, class java.lang.String]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
版本:Spark:2.4,Python:3,胶水:2
解决方案
您可以使用 Glue 原生转换Map类,该类将通过将函数应用于输入 DynamicFrame 中的所有记录来构建新的 DynamicFrame。
因此,在您的情况下,您可以使用下面的代码片段来导出列日期。
from datetime import datetime
def addDate(d):
d["date"] = datetime.today()
return d
datasource1 = Map.apply(frame = datasource0, f = addDate)
推荐阅读
- sed - 从模式空间的开头重复搜索 sed
- angular - 使用打字稿创建泛型类以保存键值数组(字典)
- python - pip 抛出错误“utf-8”编解码器无法解码字节 0xec
- reactjs - 异步调度值的 TypeScript 类型
- ruby-on-rails - 如何接收非通用 facebook 个人资料图片?
- javascript - 如何根据文本匹配在区分大小写的情况下过滤和突出显示 JSON 对象的动态嵌套数组中的文本
- python - 从侧边栏小部件 selenium python 中抓取评论
- c# - C# 父子窗体
- c++ - 无法使用 setter 编辑存储在链表节点中的数据
- ios - Swift:如何处理只有返回类型不同且结果可丢弃的方法重载?