首页 > 解决方案 > 在 Janusgraph 中提取数据的多线程 python 程序

问题描述

我创建了一个使用多线程 python将数据添加到janusgraph的程序。

由于我的数据文件的行是独立的,我可以将我的文件拆分为我想要运行的线程数。

这通常适用于独立行的情况吗?

import os 
from gremlin_python import statics
from gremlin_python.structure.graph import Graph
from gremlin_python.process.graph_traversal import __
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
import sys
import csv
import time

graph = Graph()
connection = DriverRemoteConnection('ws://localhost:8182/gremlin', 'g')
g = graph.traversal().withRemote(connection)
ct = 0

def task1(file): 
    #print("Task 1 assigned to thread: {}".format(threading.current_thread().name)) 
    #print("ID of process running task 1: {}".format(os.getpid())) 
    global ct
    print(file)
    with open(file,'rt')as f:
        data = csv.reader(f)
        for row in data:

            lst = row.split(",")
            node1 = lst[0]
            node2 = lst[1]
            relation = lst[2]

            global ct
            print(ct)
            ct = ct + 1

            # First check if node is already present in graph, otherwise insert it
            if not g.V().hasLabel("label").has("code", node1).hasNext():
                    g.addV("label").property("name", node1).next()

            # First check if node is already present in graph, otherwise insert it
            if not g.V().hasLabel("label").has("code", node2).hasNext():
                    g.addV("label").property("name", node2).next()


            if not g.V().has("code", node1).bothE().where(__.otherV().has("code", node2)).hasNext() and not g.V().has("code", node2).bothE().where(__.otherV().has("code", node1)).hasNext():
                g.V().has("code", node1).addE(relation).to(g.V().has("code", node2)).next()


if __name__ == "__main__": 

    path = 'path of folder where all splitted csv files are stored'

    files = []
    for r, d, f in os.walk(path):
        for file in f:
            if not '.DS' in file:
                files.append(os.path.join(r, file))

    # I have set number of threads to 19, you change accoring to number of files
    for i in range(0, 19):
        t1 = threading.Thread(target=task1, args=(files[i],))
        t1.start() 

    t1.join() 

标签: pythonjanusgraph

解决方案


推荐阅读