python - 在 Janusgraph 中提取数据的多线程 python 程序
问题描述
我创建了一个使用多线程 python将数据添加到janusgraph的程序。
由于我的数据文件的行是独立的,我可以将我的文件拆分为我想要运行的线程数。
这通常适用于独立行的情况吗?
import os
from gremlin_python import statics
from gremlin_python.structure.graph import Graph
from gremlin_python.process.graph_traversal import __
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
import sys
import csv
import time
graph = Graph()
connection = DriverRemoteConnection('ws://localhost:8182/gremlin', 'g')
g = graph.traversal().withRemote(connection)
ct = 0
def task1(file):
#print("Task 1 assigned to thread: {}".format(threading.current_thread().name))
#print("ID of process running task 1: {}".format(os.getpid()))
global ct
print(file)
with open(file,'rt')as f:
data = csv.reader(f)
for row in data:
lst = row.split(",")
node1 = lst[0]
node2 = lst[1]
relation = lst[2]
global ct
print(ct)
ct = ct + 1
# First check if node is already present in graph, otherwise insert it
if not g.V().hasLabel("label").has("code", node1).hasNext():
g.addV("label").property("name", node1).next()
# First check if node is already present in graph, otherwise insert it
if not g.V().hasLabel("label").has("code", node2).hasNext():
g.addV("label").property("name", node2).next()
if not g.V().has("code", node1).bothE().where(__.otherV().has("code", node2)).hasNext() and not g.V().has("code", node2).bothE().where(__.otherV().has("code", node1)).hasNext():
g.V().has("code", node1).addE(relation).to(g.V().has("code", node2)).next()
if __name__ == "__main__":
path = 'path of folder where all splitted csv files are stored'
files = []
for r, d, f in os.walk(path):
for file in f:
if not '.DS' in file:
files.append(os.path.join(r, file))
# I have set number of threads to 19, you change accoring to number of files
for i in range(0, 19):
t1 = threading.Thread(target=task1, args=(files[i],))
t1.start()
t1.join()