pyspark - 将多行从 pyspark 插入到 cosmosdb
问题描述
我正在尝试在 pyspark 中的数据框中插入多行。这是我的代码:
首先我导入包:
import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents
然后,我定义了 connectionPolicy:
connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery
connectionPolicy.PreferredLocations = {"Western Europe"}
凭据:
masterKey = 'yourmasterkey'
host = 'https://testcosmosdbasdada.documents.azure.com:443/'
client = document_client.DocumentClient(host,{'masterKey': masterKey}, connectionPolicy)
然后我定义一个数据库和一个集合的名称:
databaseId = 'pruebadb'
collectionId = 'collection1'
dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId
注意:我应该在 Azure 套件中创建一个具有此名称的数据库和集合。然后我可以使用 CreateDocument 或 UpsertDocument。我将使用 UpsertDocument。
client.UpsertDocument(collLink,{'attribute1': 4}, options=None)
这行得通!正如您在文档中看到的: https ://docs.microsoft.com/en-us/python/api/pydocumentdb/pydocumentdb.document_client.documentclient?view=azure-python#upsertdocument
但是我不知道如何一次插入一些行。这些证明不起作用:
1)
client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)
'list' 对象没有属性 'get'
2)
client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)
'list' 对象没有属性 'get'
3)
df = spark.read.csv('/FileStore/tables/points.csv', sep=';', header=True)
client.UpsertDocument(collLink, df, options=None)
'list' 对象没有属性 'get'
这些证明不起作用,因为我需要一个字典作为 UpsertDocument() 的第二个参数。
为了做到这一点,pydocumentdb 或其他 python 库有什么功能吗?
使用 pyspark 将数据从数据帧插入 CosmosDB 的最佳性能方法是什么?
解决方案
您可以DataFrameWriter
为此使用 Spark MongoDB 连接器提供的 API,而不是依赖 CosmosDB API。
下面的代码应该可以工作:
df.write.format("com.mongodb.spark.sql.DefaultSource")\
.option("uri", "<CosmosDB URI>")\
.option("database","CosmosDB Database Name")\
.option("collection","CosmosDB Collection Name")\
.mode("append").save()
您需要通过在 spark-submit 命令中使用--jars
参数或参数将 Spark-MongoDB 连接器添加到类路径。--packages
前任:spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 <YOUR_SRC_FILE>.py
有关 DataFrameWriter API 的更多信息,请参见:http ://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter
推荐阅读
- javascript - 无法使用 three.js 更改我的 3D 模型的颜色
- c# - C#/Selenium .Net 转换 ro .Core 错误
- python - 来自 fortran 的数字太大:ValueError:无法将字符串转换为浮点数:'0.22738+109'
- javascript - For Loop Google App Script Javascript 的更快替代方案
- html - 当用户在日历上选择其他日期时,PrimeNG 日历清除当前/今天日期
- android - 带有片段的 NavigationDrawer 内的 Android TabLayout
- android - 谷歌 Android kotlin 代码实验室;我可以使用 RecyclerView.ViewHolder 替换所有 PhotoGridAdapter.MarsPropertyViewHolder 吗?
- reactjs - 如何在 Infinite Scroll 的电子商务产品负载上加载品牌
- kubernetes - 使用路径正则表达式重写在 EKS 和 nginx 入口控制器中不起作用
- angular - Angular 8 - 在 PUT 调用中修改 request.body 中的属性时,拦截器抛出错误