首页 > 解决方案 > Streamsets:Neo4j 查询非常慢

问题描述

我正在使用 Streamsets 管道从远程上传 .csv 文件的活动文件目录中读取数据,并将这些数据放入 neo4j 数据库中。我使用的步骤是-

现在我正在使用 jdbc 查询执行所有这些操作,并且使用的密码查询是

MERGE (m:OBSERVATION{
  SerialNumber: "${record:value('/SerialNumber')}",
  Test_Stage: "${record:value('/Test_Stage')}",
  CUR: "${record:value('/CUR')}",
  VOLT: "${record:value('/VOLT')}",
  Rel_Lot: "${record:value('/Rel_Lot')}",
  TimestampINT: "${record:value('/TimestampINT')}",     
  Temp: "${record:value('/Temp')}",
  LP: "${record:value('/LP')}",
  MON: "${record:value('/MON')}"
})       
MERGE (t:CSV{
       SerialNumber: "${record:value('/SerialNumber')}",
       Test_Stage: "${record:value('/Test_Stage')}",
       TimestampINT: "${record:value('/TimestampINT')}"
})  
WITH m
MATCH (t:CSV) where t.SerialNumber=m.SerialNumber and t.Test_Stage=m.Test_Stage and t.TimestampINT=m.TimestampINT MERGE (m)-[:PART_OF]->(t)

WITH t, t.TimestampINT AS TimestampINT
MATCH (rl:Burn_In_Test) where rl.SerialNumber=t.SerialNumber and rl.Test_Stage=t.Test_Stage and rl.TimestampINT<TimestampINT
SET rl.TimestampINT=TimestampINT     
WITH t 
MATCH (rl:Burn_In_Test) where rl.SerialNumber=t.SerialNumber and rl.Test_Stage=t.Test_Stage 
MERGE (t)-[:POINTS_TO]->(rl)
WITH rl
MATCH (t:CSV)-[r:POINTS_TO]->(rl) WHERE t.TimestampINT<rl.TimestampINT
DELETE r

现在这个过程非常缓慢,10 条记录大约需要 15 分钟的时间。这可以进一步优化吗?

标签: neo4jcypherstreamsets

解决方案


使用时的最佳实践MERGE是合并单个属性,然后使用SET添加其他属性。

如果我假设序列号属性对于每个节点都是唯一的(可能不是),它看起来像:

MERGE (m:OBSERVATION{SerialNumber: "${record:value('/SerialNumber')}"})
SET m.Test_Stage = "${record:value('/Test_Stage')}",
    m.CUR= "${record:value('/CUR')}",
    m.VOLT= "${record:value('/VOLT')}",
    m.Rel_Lot= "${record:value('/Rel_Lot')}",
    m.TimestampINT = "${record:value('/TimestampINT')}",     
    m.Temp= "${record:value('/Temp')}",
    m.LP= "${record:value('/LP')}",
    m.MON= "${record:value('/MON')}"       
MERGE (t:CSV{
       SerialNumber: "${record:value('/SerialNumber')}"       
})
SET t.Test_Stage = "${record:value('/Test_Stage')}",
    t.TimestampINT = "${record:value('/TimestampINT')}"  
WITH m
MATCH (t:CSV) where t.SerialNumber=m.SerialNumber and t.Test_Stage=m.Test_Stage and t.TimestampINT=m.TimestampINT MERGE (m)-[:PART_OF]->(t)

WITH t, t.TimestampINT AS TimestampINT
MATCH (rl:Burn_In_Test) where rl.SerialNumber=t.SerialNumber and rl.Test_Stage=t.Test_Stage and rl.TimestampINT<TimestampINT
SET rl.TimestampINT=TimestampINT     
WITH t 
MATCH (rl:Burn_In_Test) where rl.SerialNumber=t.SerialNumber and rl.Test_Stage=t.Test_Stage 
MERGE (t)-[:POINTS_TO]->(rl)
WITH rl
MATCH (t:CSV)-[r:POINTS_TO]->(rl) WHERE t.TimestampINT<rl.TimestampINT
DELETE r

要补充的另一件事是,我可能会将其拆分为两个查询。第一个是导入部分,第二个是删除关系。尽可能添加唯一的约束和索引。


推荐阅读