python - Pyspark 数据框或镶木地板文件到 DynamoDB
问题描述
我想将 pyspark 数据框或 parquet 文件放入 DynamoDB 表中
我拥有的 pyspark 数据框有 30MM 行和 20 列
解决方案 1:使用 boto3、pandas 和批量写入(Amazon DynamoDB)
有了这个,我读取了 parquet 文件并将其传递给 pandas,然后我将逐行放入 DynamoDB 表中,但这花费了太长时间,非常慢
import boto3
dynamodb = boto3.resource('dynamodb', region_name='name')
table = dynamodb.Table('DynamoDB_table_name')
with table.batch_writer() as batch:
for index, row in pandas_dataframe.iterrows():
batch.put_item(
Item = {
'column_name_DynamoDB_table': int(row['column_name_in_pandas_dataframe']),
...
}
)
解决方案 2:使用 boto3、pyspark 和 SQL(how-to-write-pyspark-dataframe-to-dynamodb-table)
在这里,我在解决方案中描述的第 3 步中不断收到错误,ParseException错误,我查看了亚马逊文档,我看到代码是正确的(EMR_Hive_Commands.html),也许它不是 SQL 代码,那是我的错误,但如果不是,我不知道是哪种语言
-- Step 1
DROP TABLE IF EXISTS TEMP;
CREATE TABLE TEMP(
column_name_DynamoDB_table type,
... )
STORED AS ORC;
--step 2.1
pyspark_dataframe.createOrReplaceTempView("df")
--step 2.2
INSERT INTO temp
SELECT *
FROM df
--step 3
CREATE TABLE TEMPTODYNAMO(
column_name_DynamoDB_table type,
... )
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ( "dynamodb.table.name" ="temp-to-dynamo" ,
"dynamodb.column.mapping" = "col1:column_name_DynamoDB_table,...");
我不断收到的错误:
Error in SQL statement: ParseException:
Operation not allowed: STORED BY(line 22, pos 0)
== SQL ==
CREATE TABLE TEMPTODYNAMO(
column_name_DynamoDB_table type,
... )
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
^^^
TBLPROPERTIES ( "dynamodb.table.name" ="temp-to-dynamo" ,
"dynamodb.column.mapping" = "col1:column_name_DynamoDB_table,...")
解决方案 3:使用 boto3、pyspark 和 com.audienceproject ( Spark+DynamoDB )
我不明白在代码中放什么,页面中显示的python代码是:
# Python
# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
dynamoDf = spark.read.option("tableName", "SomeTableName") \
.format("dynamodb") \
.load() # <-- DataFrame of Row objects with inferred schema.
# Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)
# write to some other table overwriting existing item with same keys
dynamoDf.write.option("tableName", "SomeOtherTable") \
.format("dynamodb") \
.save()
但我真的不知道把我的 DynamoDB 表的名称和我的 pyspark 数据框放在哪里
更新:我试过了
pysaprk_dataframe.write.option("tableName", "name_DynamoDB_table") \
.format("dynamodb") \
.save()
并得到这个错误:
AnalysisException: TableProvider implementation dynamodb cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead
问候
解决方案
已尝试解决方案#3,使用以下代码片段并使其正常工作
代码更改是添加模式('append')
dynamoDf.write.mode('append').option("tableName","db_dev_users_v2") \
.option("region",region) \
.format("dynamodb") \
.save()
推荐阅读
- excel - 从 Excel 进行 Outlook 查询时不显示事件
- sql - Oracle SQL:Oracle 可以解释的最大日期差异是否存在
- form-submit - formSubmitReply 函数,错误提示“Exception: Service requested too many times in the same day: email”
- spring-boot - 使用 okhttp3 Mockwebserver 进行 Spring webclient 测试
- python - 绘制具有不同大小数组的多行的最小值数组
- c - c 函数 log_init 中的段错误
- jquery - 如何从 MVC 中的视图打印整个表格
- ios - SwiftUI:iCloud 文档和 CloudKit
- google-chrome - 如何仅使用 chrome 导出响应正文
- javascript - 如何在 jqplot 图表中绘制确切的 Xaxis?