首页 > 解决方案 > 将多行从 pyspark 插入到 cosmosdb

问题描述

我正在尝试在 pyspark 中的数据框中插入多行。这是我的代码:

首先我导入包:

import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents

然后,我定义了 connectionPolicy:

connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery
connectionPolicy.PreferredLocations = {"Western Europe"}

凭据:

masterKey = 'yourmasterkey'
host = 'https://testcosmosdbasdada.documents.azure.com:443/'
client = document_client.DocumentClient(host,{'masterKey': masterKey}, connectionPolicy)

然后我定义一个数据库和一个集合的名称:

databaseId = 'pruebadb'
collectionId = 'collection1'

dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId

注意:我应该在 Azure 套件中创建一个具有此名称的数据库和集合。然后我可以使用 CreateDocument 或 UpsertDocument。我将使用 UpsertDocument。

client.UpsertDocument(collLink,{'attribute1': 4}, options=None)

这行得通!正如您在文档中看到的: https ://docs.microsoft.com/en-us/python/api/pydocumentdb/pydocumentdb.document_client.documentclient?view=azure-python#upsertdocument

但是我不知道如何一次插入一些行。这些证明不起作用:

1)

client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)

'list' 对象没有属性 'get'

2)

client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)

'list' 对象没有属性 'get'

3)

df = spark.read.csv('/FileStore/tables/points.csv', sep=';', header=True)
client.UpsertDocument(collLink, df, options=None)

'list' 对象没有属性 'get'

这些证明不起作用,因为我需要一个字典作为 UpsertDocument() 的第二个参数。

为了做到这一点,pydocumentdb 或其他 python 库有什么功能吗?

使用 pyspark 将数据从数据帧插入 CosmosDB 的最佳性能方法是什么?

标签: pysparkazure-cosmosdb

解决方案


您可以DataFrameWriter为此使用 Spark MongoDB 连接器提供的 API,而不是依赖 CosmosDB API。

下面的代码应该可以工作:

df.write.format("com.mongodb.spark.sql.DefaultSource")\
        .option("uri", "<CosmosDB URI>")\
        .option("database","CosmosDB Database Name")\
        .option("collection","CosmosDB Collection Name")\
        .mode("append").save()

您需要通过在 spark-submit 命令中使用--jars参数或参数将 Spark-MongoDB 连接器添加到类路径。--packages

前任:spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 <YOUR_SRC_FILE>.py

有关 DataFrameWriter API 的更多信息,请参见:http ://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter


推荐阅读