首页 > 解决方案 > Neo4j 基于具有某些属性的其他标签创建新标签

问题描述

我需要基于点创建轨迹。

我读了很多书,但是我无法按应有的方式完成这项工作。

这是我到目前为止所尝试的:

MATCH (p:Point)
WHERE NOT (:Trajectory)-[:CONTAINS]->(p)
WITH p.cameraSid AS cameraSid, p.trajectoryId AS trajectoryId, p.classType AS classType, p.classQual AS classQual, COLLECT(p) AS points
UNWIND points AS point
MERGE (trajectory:Trajectory{trajectoryId:point.trajectoryId, cameraSid: point.cameraSid, classType: point.classType, classQual: point.classQual, date: date(datetime(point.at))})
MERGE (trajectory)-[:CONTAINS{at:point.at}]->(point)

我不知道如何在 MERGE 子句中创建这种条件(1 小时或更少)。

这是用于创建一些数据的 neo4j 查询

// Create points
LOAD CSV FROM 'https://uca54485eb4c5d2a6869053af475.dl.dropboxusercontent.com/cd/0/get/AmR2pn0hC0c-CQW_mSS-TDqHQyi7MNVjPvqffQHhSIyMP37D7UMtfODdHDkNWi6-HqzQdp4ob2Q3326g6imEd26F3sdNJyJuAeNa8wJA2o_E6A/file?dl=1#' AS line
CREATE (:Point{trajectoryId: line[0],at: line[1],cameraSid: line[2],activity: line[3],x: line[4],atEpochMilli: line[5],y: line[6],control: line[7],classQual: line[8],classType: line[9],uniqueIdentifier: line[10]})

// Create Trajectory based on Points
MATCH (p:Point)
WHERE NOT (:Trajectory)-[:CONTAINS]->(p)
WITH p.cameraSid AS cameraSid, p.trajectoryId AS trajectoryId, p.classType AS classType, p.classQual AS classQual, COLLECT(p) AS points
UNWIND points AS point
MERGE (trajectory:Trajectory{trajectoryId:point.trajectoryId, cameraSid: point.cameraSid, classType: point.classType, classQual: point.classQual, date: date(datetime(point.at))})
MERGE (trajectory)-[:CONTAINS{at:point.at}]->(point)

如果指向 CSV 文件的链接不起作用,这里有一个替代方法,在这种情况下,您必须下载该文件,然后从 Neo4j 实例本地导入它。

标签: databasegraphneo4jcypher

解决方案


我认为这是其中之一,因为您可以在一个 Cypher 语句中执行此操作并不意味着您应该这样做,而且您几乎肯定会发现在应用程序代码中执行此操作更容易。

无论如何,它可以使用 APOC 并通过instanceId在您的 Trajectory 节点上引入一个独特的属性来完成。

可能的解决方案

这几乎肯定不会扩展,并且您需要索引(稍后根据有根据的猜测进行讨论)。

首先,我们需要更改您的导入脚本,以确保该at属性是 adatetime而不仅仅是一个字符串(否则我们最终会在查询中添加datetime()调用:

LOAD CSV FROM 'file:///export.csv' AS line
CREATE (:Point{trajectoryId: line[0], at: datetime(line[1]), cameraSid: line[2], activity: line[3],x: line[4], atEpochMilli: line[5], y: line[6], control: line[7], classQual: line[8], classType: line[9], uniqueIdentifier: line[10]})

然后,以下内容似乎会获取您的样本数据集并根据您的要求添加轨迹(并且可以在添加新点时运行)。

CALL apoc.periodic.iterate( 
'
MATCH (p: Point) 
WHERE NOT (:Trajectory)-[:CONTAINS]->(p)
RETURN p
ORDER BY p.at
',
'
OPTIONAL MATCH (t: Trajectory { trajectoryId: p.trajectoryId, cameraSid: p.cameraSid, classQual: p.classQual, classType: p.classType })-[:CONTAINS]-(trajPoint:Point)
WITH p, t, max(trajPoint.at) as maxAt, min(trajPoint.at) as minAt
WITH p, max(case when t is not null AND (
      (p.at <= datetime(maxAt) + duration({ hours: 1 })) 
      AND 
      (p.at >= datetime(minAt) - duration({ hours: 1 }))
  ) 
  THEN t.instanceId ELSE NULL END) as instanceId
MERGE (tActual: Trajectory { trajectoryId: p.trajectoryId, cameraSid: p.cameraSid, classQual: p.classQual, classType: p.classType, instanceId: COALESCE(instanceId, randomUUID()) })
ON CREATE SET tActual.date = date(datetime(p.at))
MERGE (tActual)-[:CONTAINS]->(p)
RETURN instanceId
',
{ parallel: false, batchSize: 1 })

解释

提出的问题很棘手,因为关于是否创建新轨迹或将点添加到现有轨迹的决定完全取决于我们如何处理所有先前的点。这意味着两件事:

  • 我们需要处理这些点以确保我们创建可靠的轨迹 - 我们从最早的开始,然后工作
  • 我们需要对轨迹的每次创建或修改都立即可见,以便处理下一个点——也就是说,我们需要单独处理每个点,就好像它是一个小型交易一样

我们将使用apoc.periodic.iteratewith a batchSizeof 1 来提供我们需要的行为。

第一个参数构建要处理的节点集——所有那些当前不属于轨迹的点,按它们的时间戳排序。

的第二个参数apoc.periodic.iterate是魔术发生的地方,所以让我们分解一下 - 给定一个到目前为止p不属于轨迹的点:

OPTIONAL MATCH (t: Trajectory { trajectoryId: p.trajectoryId, cameraSid: p.cameraSid, classQual: p.classQual, classType: p.classType })-[:CONTAINS]-(trajPoint:Point)
WITH p, t, max(trajPoint.at) as maxAt, min(trajPoint.at) as minAt
WITH p, max(case when t is not null AND (
      (p.at <= datetime(maxAt) + duration({ hours: 1 })) 
      AND 
      (p.at >= datetime(minAt) - duration({ hours: 1 }))
  ) 
  THEN t.instanceId ELSE NULL END) as instanceId
  • 查找与关键字段匹配的任何轨迹,并且包含一个在传入点一小时内的点pinstanceId如果我们找到合适的匹配项(或者如果有多个匹配项,则选择我们找到的最大的匹配项 - 我们只是想确保此时有零行或一行)
  • 我们将在一分钟内看到instanceId所有内容,但将其视为给定的唯一标识符Trajectory
MERGE (tActual: Trajectory { trajectoryId: p.trajectoryId, cameraSid: p.cameraSid, classQual: p.classQual, classType: p.classType, instanceId: COALESCE(instanceId, randomUUID()) })
ON CREATE SET tActual.date = date(datetime(p.at))
  • 确保有一个与传入点的关键字段匹配的轨迹 - 如果前面的代码找到匹配的轨迹,则MERGE没有工作要做。否则创建新的 Trajectory -instanceId如果我们之前没有匹配 Trajectory,我们将向新的随机 UUID 添加一个属性以强制MERGE创建一个新节点(因为该 UUID 不会存在其他节点,即使一个匹配所有其他关键领域)
MERGE (tActual)-[:CONTAINS]->(p)
RETURN instanceId
  • tActual现在是传入点p应该属于的轨迹 - 创建:CONTAINS关系

第三个参数至关重要:

{ parallel: false, batchSize: 1 })
  • 重要提示:为此,“内部”Cypher 语句的每次迭代都必须完全按顺序发生,因此我们强制 abatchSize为 1 并禁用并行性,以防止APOC以一次一个以外的任何方式调度批次.

索引

我认为随着导入规模的增长和轨迹数量的增加,上述性能会迅速下降。至少我认为你会想要一个复合索引

  • :Trajectory(trajectoryId, cameraSid, classQual, classType)- 以便快速找到给定点的候选轨迹的初始匹配
  • :Trajectory(trajectoryId, cameraSid, classQual, classType, instanceId)- 以便MERGE在最后找到现有的轨迹以快速添加(如果存在)

但是 - 这是通过观察查询的猜测,不幸的是,您无法正确查看查询以判断执行计划是什么,因为我们正在使用apoc.periodic.iterate- EXPLAINorPROFILE只会告诉您有一个过程调用需要 1 db hit,哪个是真的,但没有帮助。


推荐阅读