首页 > 解决方案 > 用于大数据集查询的 Gremlin 分页

问题描述

我正在使用 gremlin 服务器,我有一个大数据集,我正在执行 gremlin 分页。以下是查询示例:

query = """g.V().both().both().count()"""
data = execute_query(query)
for x in range(0,int(data[0]/10000)+1):
    print(x*10000, " - ",(x+1)*10000)
    query = """g.V().both().both().range({0}*10000, {1}*10000)""".format(x,x+1)
    data = execute_query(query)

def execute_query(query):
    """query execution"""

上面的查询工作正常,对于分页,我必须知道停止执行查询的范围。为了获得范围,我必须首先获取查询的计数并传递给 for 循环。有没有其他可以使用gremlin的分页。

-- 需要分页,因为它在单个 ex 中获取 100k 数据时失败。 g.V().both().both().count()

如果我们不使用分页,那么它会给我以下错误:

ERROR:tornado.application:Uncaught exception, closing connection.
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 554, in wrapper
    return callback(*args)
  File "/usr/local/lib/python3.5/dist-packages/tornado/stack_context.py", line 343, in wrapped
    raise_exc_info(exc)
  File "<string>", line 3, in raise_exc_info
  File "/usr/local/lib/python3.5/dist-packages/tornado/stack_context.py", line 314, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/tornado/websocket.py", line 807, in _on_frame_data
    self._receive_frame()
  File "/usr/local/lib/python3.5/dist-packages/tornado/websocket.py", line 697, in _receive_frame
    self.stream.read_bytes(2, self._on_frame_start)
  File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 312, in read_bytes
    assert isinstance(num_bytes, numbers.Integral)
  File "/usr/lib/python3.5/abc.py", line 182, in __instancecheck__
    if subclass in cls._abc_cache:
  File "/usr/lib/python3.5/_weakrefset.py", line 75, in __contains__
    return wr in self.data
RecursionError: maximum recursion depth exceeded in comparison
ERROR:tornado.application:Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f3e1c409ae8>)
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/tornado/ioloop.py", line 604, in _run_callback
    ret = callback()
  File "/usr/local/lib/python3.5/dist-packages/tornado/stack_context.py", line 275, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 554, in wrapper
    return callback(*args)
  File "/usr/local/lib/python3.5/dist-packages/tornado/stack_context.py", line 343, in wrapped
    raise_exc_info(exc)
  File "<string>", line 3, in raise_exc_info
  File "/usr/local/lib/python3.5/dist-packages/tornado/stack_context.py", line 314, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/tornado/websocket.py", line 807, in _on_frame_data
    self._receive_frame()
  File "/usr/local/lib/python3.5/dist-packages/tornado/websocket.py", line 697, in _receive_frame
    self.stream.read_bytes(2, self._on_frame_start)
  File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 312, in read_bytes
    assert isinstance(num_bytes, numbers.Integral)
  File "/usr/lib/python3.5/abc.py", line 182, in __instancecheck__
    if subclass in cls._abc_cache:
  File "/usr/lib/python3.5/_weakrefset.py", line 75, in __contains__
    return wr in self.data
RecursionError: maximum recursion depth exceeded in comparison
Traceback (most recent call last):
  File "/home/rgupta/Documents/BitBucket/ecodrone/ecodrone/test2.py", line 59, in <module>
    data = execute_query(query)
  File "/home/rgupta/Documents/BitBucket/ecodrone/ecodrone/test2.py", line 53, in execute_query
    results = future_results.result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 405, in result
    return self.__get_result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.5/dist-packages/gremlin_python/driver/resultset.py", line 81, in cb
    f.result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 398, in result
    return self.__get_result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "/usr/lib/python3.5/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.5/dist-packages/gremlin_python/driver/connection.py", line 77, in _receive
    self._protocol.data_received(data, self._results)
  File "/usr/local/lib/python3.5/dist-packages/gremlin_python/driver/protocol.py", line 100, in data_received
    self.data_received(data, results_dict)
  File "/usr/local/lib/python3.5/dist-packages/gremlin_python/driver/protocol.py", line 100, in data_received
    self.data_received(data, results_dict)
  File "/usr/local/lib/python3.5/dist-packages/gremlin_python/driver/protocol.py", line 100, in data_received
    self.data_received(data, results_dict)
  File "/usr/local/lib/python3.5/dist-packages/gremlin_python/driver/protocol.py", line 100, in data_received

此行重复 100 次File "/usr/local/lib/python3.5/dist-packages/gremlin_python/driver/protocol.py", line 100, in data_received

标签: python-3.xneo4jcyphergremlingremlin-server

解决方案


这个问题在很大程度上得到了回答,但我会添加更多评论。

您的分页方法非常昂贵,因为我不知道有任何图表可以优化该特定遍历,并且您基本上多次迭代所有这些数据。您为 . 执行一次count(),然后迭代第一个 10000,然后为第二个 10000,迭代第一个 10000,然后是第二个 10000,然后在第三个 10000 上,迭代第一个 20000,然后是第三个 10000,依此类推上...

我不确定您的逻辑是否还有更多内容,但是您所拥有的看起来像是一种“批处理”形式,可以得到更小的结果。不需要这样做,因为 Gremlin Server 已经在内部为您这样做了。如果您只是发送g.V().both().both()Gremlin Server 将在给定resultIterationBatchSize配置选项的情况下批量处理结果。

无论如何,除了我提到的另一个问题中解释的内容之外,我知道没有更好的方法来使分页工作。


推荐阅读