json - 将 json(或 csv)文档上传到 Azure 数据工厂并在 Cosmos DB 中创建图形
问题描述
我正在尝试使用 Azure 数据工厂将 json 或 csv 文件从我的 blob 存储移动到我的 Cosmos DB 图形数据库。
通过正确格式化 json 文件,我可以上传顶点,但不知道如何创建边。将边缘硬编码到 json 文件中不起作用。这是我的 json 文件中的顶点之一:
{
"id": "o0001",
"label": "Order",
"type": "vertex",
"Product2": 1.0,
"Product3": 1.0,
"Product4": 1.0,
"Product5": 1.0,
"Product6": 1.0,
"Product7": 1.0,
"Product8": 1.0,
"Product24": 1.0,
"Product25": 1.0,
"Product26": 1.0,
"Product27": 1.0
}
这是一个优势:
{
"label": "purchased",
"type": "edge",
"inVLabel": "Product",
"outVLabel": "Order",
"inV": "Product2",
"outV": "o0001"
}
一切都作为顶点导入。有人知道如何上传顶点和边吗?
解决方案
您可以将 json 转换为数据框,然后执行以下步骤在 Cosmos DB 中添加记录,其中 DataFrame 中的一行相当于 Cosmos 中的 1 个顶点。
单击下面的链接和下载选项,然后选择
uber.jar https://search.maven.org/artifact/com.microsoft.azure/azure-cosmosdb-spark_2.3.0_2.11/1.2.2/jar
然后添加您的依赖项。spark-shell --master yarn --executor-cores 5 --executor-memory 10g --num-executors 10 --driver-memory 10g --jars "path/to/jar/dependency/azure-cosmosdb-spark_2.3.0 _2.11-1.2.2-uber.jar" --packages "com.google.guava:guava:18.0,com.google.code.gson:gson:2.3.1,com.microsoft.azure:azure-documentdb: 1.16.1"
这是相同的代码:
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val data = Seq( Row(2, "Abb"), Row(4, "Bcc"), Row(6, "Cdd") ) val schema = List( StructField("partitionKey", IntegerType, true), StructField("name", StringType, true) ) val DF = spark.createDataFrame( spark.sparkContext.parallelize(data), StructType(schema) ) val writeConfig = Map("Endpoint" -> "https://*******.documents.azure.com:443/", "Masterkey" -> "**************", "Database" -> "db_name", "Collection" -> "collection_name", "Upsert" -> "true", "query_pagesize" -> "100000", "bulkimport"-> "true", "WritingBatchSize"-> "1000", "ConnectionMaxPoolSize"-> "100", "partitionkeydefinition"-> "/partitionKey") DF.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(writeConfig).save()
希望能帮助到你。
推荐阅读
- javascript - 解构具有相同名称属性的内部对象
- python - 如何将 pandas NaT 日期时间值正确插入到我的 postgresql 表中
- html - 无法与引导程序对齐到右边距
- python - 尝试创建循环遍历列表中的每个项目以在单独的列表中查找任何匹配项,然后替换匹配项
- javascript - 带有修改的查询字符串的 JS 重新加载页面
- c - fscanf 作为 while 语句
- javascript - Can't call a function inside another function Javascript
- django - How can i set pagination dynamically in Django Rest Framework?
- angular - Pass observable from one component to another via service
- sql - Combine CASE and IN