首页 > 解决方案 > 使用 Spark 和本地 AWS Glue 实施从 Kinesis -> RDS 移动数据

问题描述

我有一个在本地运行 AWS Glue 实施的 Spark 项目。

我收听 Kinesis 流,因此当数据以 JSON 格式到达时,我可以正确存储到 S3。我想存储在 AWS RDS 中,而不是存储在 S3 中。

我曾尝试使用:

dataFrame.write
          .format("jdbc")
          .option("url","jdbc:mysql://aurora.cluster.region.rds.amazonaws.com:3306/database")
          .option("user","user")
          .option("password","password")
          .option("dbtable","test-table")
          .option("driver","com.mysql.jdbc.Driver")
          .save()

Spark 项目使用 AWS 粘合作业从 Kinesis 流中获取数据。

我想将数据添加到 Aurora 数据库。

它失败并出现错误

Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL
 server version for the right syntax to use near '-glue-table (`label2` TEXT , `customerid` TEXT , `sales` TEXT , `name` TEXT )' a
t line 1

这是我正在使用的测试数据帧dataFrame.show()

+------+----------+-----+--------------------+
|label2|customerid|sales|                name|
+------+----------+-----+--------------------+
| test6|      test| test|streamingtesttest...|
+------+----------+-----+--------------------+

标签: apache-sparkaws-glueamazon-aurora

解决方案


使用 Spark DynamicFrame 代替 DataFrame 并使用glueContext sink 发布到Aurora:

所以最终的代码可能是:

lazy val mysqlJsonOption = jsonOptions(MYSQL_AURORA_URI)

//Write to Aurora
val dynamicFrame = DynamicFrame(joined, glueContext)
glueContext.getSink("mysql", mysqlJsonOption).writeDynamicFrame(dynamicFrame)

推荐阅读