database - Neo4j 基于具有某些属性的其他标签创建新标签
问题描述
我需要基于点创建轨迹。
一个轨迹可以包含满足特定标准的任意数量的点。标准是:cameraSid、trajectoryId、classType 和 classQual 应该相等。每个点的时间差 (at) 必须小于或等于 1 小时。
为了创建轨迹,我们至少需要一个点。
为了将新点关联到现有轨迹,轨迹的最新关联点必须不早于新点并且与新点相比 1 小时。如果新点具有完全相同的属性但超过 1 小时,则需要创建新的轨迹。
我读了很多书,但是我无法按应有的方式完成这项工作。
这是我到目前为止所尝试的:
MATCH (p:Point)
WHERE NOT (:Trajectory)-[:CONTAINS]->(p)
WITH p.cameraSid AS cameraSid, p.trajectoryId AS trajectoryId, p.classType AS classType, p.classQual AS classQual, COLLECT(p) AS points
UNWIND points AS point
MERGE (trajectory:Trajectory{trajectoryId:point.trajectoryId, cameraSid: point.cameraSid, classType: point.classType, classQual: point.classQual, date: date(datetime(point.at))})
MERGE (trajectory)-[:CONTAINS{at:point.at}]->(point)
我不知道如何在 MERGE 子句中创建这种条件(1 小时或更少)。
这是用于创建一些数据的 neo4j 查询
// Create points
LOAD CSV FROM 'https://uca54485eb4c5d2a6869053af475.dl.dropboxusercontent.com/cd/0/get/AmR2pn0hC0c-CQW_mSS-TDqHQyi7MNVjPvqffQHhSIyMP37D7UMtfODdHDkNWi6-HqzQdp4ob2Q3326g6imEd26F3sdNJyJuAeNa8wJA2o_E6A/file?dl=1#' AS line
CREATE (:Point{trajectoryId: line[0],at: line[1],cameraSid: line[2],activity: line[3],x: line[4],atEpochMilli: line[5],y: line[6],control: line[7],classQual: line[8],classType: line[9],uniqueIdentifier: line[10]})
// Create Trajectory based on Points
MATCH (p:Point)
WHERE NOT (:Trajectory)-[:CONTAINS]->(p)
WITH p.cameraSid AS cameraSid, p.trajectoryId AS trajectoryId, p.classType AS classType, p.classQual AS classQual, COLLECT(p) AS points
UNWIND points AS point
MERGE (trajectory:Trajectory{trajectoryId:point.trajectoryId, cameraSid: point.cameraSid, classType: point.classType, classQual: point.classQual, date: date(datetime(point.at))})
MERGE (trajectory)-[:CONTAINS{at:point.at}]->(point)
如果指向 CSV 文件的链接不起作用,这里有一个替代方法,在这种情况下,您必须下载该文件,然后从 Neo4j 实例本地导入它。
解决方案
我认为这是其中之一,因为您可以在一个 Cypher 语句中执行此操作并不意味着您应该这样做,而且您几乎肯定会发现在应用程序代码中执行此操作更容易。
无论如何,它可以使用 APOC 并通过instanceId
在您的 Trajectory 节点上引入一个独特的属性来完成。
可能的解决方案
这几乎肯定不会扩展,并且您需要索引(稍后根据有根据的猜测进行讨论)。
首先,我们需要更改您的导入脚本,以确保该at
属性是 adatetime
而不仅仅是一个字符串(否则我们最终会在查询中添加datetime()
调用:
LOAD CSV FROM 'file:///export.csv' AS line
CREATE (:Point{trajectoryId: line[0], at: datetime(line[1]), cameraSid: line[2], activity: line[3],x: line[4], atEpochMilli: line[5], y: line[6], control: line[7], classQual: line[8], classType: line[9], uniqueIdentifier: line[10]})
然后,以下内容似乎会获取您的样本数据集并根据您的要求添加轨迹(并且可以在添加新点时运行)。
CALL apoc.periodic.iterate(
'
MATCH (p: Point)
WHERE NOT (:Trajectory)-[:CONTAINS]->(p)
RETURN p
ORDER BY p.at
',
'
OPTIONAL MATCH (t: Trajectory { trajectoryId: p.trajectoryId, cameraSid: p.cameraSid, classQual: p.classQual, classType: p.classType })-[:CONTAINS]-(trajPoint:Point)
WITH p, t, max(trajPoint.at) as maxAt, min(trajPoint.at) as minAt
WITH p, max(case when t is not null AND (
(p.at <= datetime(maxAt) + duration({ hours: 1 }))
AND
(p.at >= datetime(minAt) - duration({ hours: 1 }))
)
THEN t.instanceId ELSE NULL END) as instanceId
MERGE (tActual: Trajectory { trajectoryId: p.trajectoryId, cameraSid: p.cameraSid, classQual: p.classQual, classType: p.classType, instanceId: COALESCE(instanceId, randomUUID()) })
ON CREATE SET tActual.date = date(datetime(p.at))
MERGE (tActual)-[:CONTAINS]->(p)
RETURN instanceId
',
{ parallel: false, batchSize: 1 })
解释
提出的问题很棘手,因为关于是否创建新轨迹或将点添加到现有轨迹的决定完全取决于我们如何处理所有先前的点。这意味着两件事:
- 我们需要处理这些点以确保我们创建可靠的轨迹 - 我们从最早的开始,然后工作
- 我们需要对轨迹的每次创建或修改都立即可见,以便处理下一个点——也就是说,我们需要单独处理每个点,就好像它是一个小型交易一样
我们将使用apoc.periodic.iterate
with a batchSize
of 1 来提供我们需要的行为。
第一个参数构建要处理的节点集——所有那些当前不属于轨迹的点,按它们的时间戳排序。
的第二个参数apoc.periodic.iterate
是魔术发生的地方,所以让我们分解一下 - 给定一个到目前为止p
不属于轨迹的点:
OPTIONAL MATCH (t: Trajectory { trajectoryId: p.trajectoryId, cameraSid: p.cameraSid, classQual: p.classQual, classType: p.classType })-[:CONTAINS]-(trajPoint:Point)
WITH p, t, max(trajPoint.at) as maxAt, min(trajPoint.at) as minAt
WITH p, max(case when t is not null AND (
(p.at <= datetime(maxAt) + duration({ hours: 1 }))
AND
(p.at >= datetime(minAt) - duration({ hours: 1 }))
)
THEN t.instanceId ELSE NULL END) as instanceId
- 查找与关键字段匹配的任何轨迹,并且包含一个在传入点一小时内的点
p
,instanceId
如果我们找到合适的匹配项(或者如果有多个匹配项,则选择我们找到的最大的匹配项 - 我们只是想确保此时有零行或一行) - 我们将在一分钟内看到
instanceId
所有内容,但将其视为给定的唯一标识符Trajectory
MERGE (tActual: Trajectory { trajectoryId: p.trajectoryId, cameraSid: p.cameraSid, classQual: p.classQual, classType: p.classType, instanceId: COALESCE(instanceId, randomUUID()) })
ON CREATE SET tActual.date = date(datetime(p.at))
- 确保有一个与传入点的关键字段匹配的轨迹 - 如果前面的代码找到匹配的轨迹,则
MERGE
没有工作要做。否则创建新的 Trajectory -instanceId
如果我们之前没有匹配 Trajectory,我们将向新的随机 UUID 添加一个属性以强制MERGE
创建一个新节点(因为该 UUID 不会存在其他节点,即使一个匹配所有其他关键领域)
MERGE (tActual)-[:CONTAINS]->(p)
RETURN instanceId
tActual
现在是传入点p
应该属于的轨迹 - 创建:CONTAINS
关系
第三个参数至关重要:
{ parallel: false, batchSize: 1 })
- 重要提示:为此,“内部”Cypher 语句的每次迭代都必须完全按顺序发生,因此我们强制 a
batchSize
为 1 并禁用并行性,以防止APOC
以一次一个以外的任何方式调度批次.
索引
我认为随着导入规模的增长和轨迹数量的增加,上述性能会迅速下降。至少我认为你会想要一个复合索引
:Trajectory(trajectoryId, cameraSid, classQual, classType)
- 以便快速找到给定点的候选轨迹的初始匹配:Trajectory(trajectoryId, cameraSid, classQual, classType, instanceId)
- 以便MERGE
在最后找到现有的轨迹以快速添加(如果存在)
但是 - 这是通过观察查询的猜测,不幸的是,您无法正确查看查询以判断执行计划是什么,因为我们正在使用apoc.periodic.iterate
- EXPLAIN
orPROFILE
只会告诉您有一个过程调用需要 1 db hit,哪个是真的,但没有帮助。
推荐阅读
- recursion - 我尝试使用关于递归的答案来绘制每棵树 3 和 3 个分支的深度,但答案很奇怪?
- spring - Spring 注解 Transactional 内部如何工作?
- python - 如何将目录中的所有文件和子目录上传到共享点
- c++ - 是否可以将 lambda 表达式放入 C++ 中的映射或列表中?
- javascript - 无法使用 lightsail 虚拟服务器显示我的应用程序
- node.js - 为什么我在 AWS EC2 实例上的 nodejs 应用程序的大文件上传失败并显示状态 413?
- java - 有人可以向我解释这段代码最后一部分的逻辑吗?
- java - Java Mongo模板中同一查询中的Mongo DB日期差异和查询条件
- c++ - 一个函数中的非 const 字段很好,但在另一个函数中被识别为 const
- ant-media-server - 如何从 ubuntu 19.10 卸载 Ant Media 服务器?