首页 > 解决方案 > 使用 neo4j python 驱动程序的简单示例?

问题描述

是否有使用 neo4j python 驱动程序的简单示例?如何将密码查询传递给驱动程序以运行并返回游标?

例如,如果我正在阅读这个,那么演示似乎有一个类包装器,我将一个私有成员函数传递给 session.write,

session.write_transaction(self._create_and_return_greeting, ...

然后用事务作为第一个参数调用它......

def _create_and_return_greeting(tx, message):

反过来运行密码

result = tx.run("CREATE (a:Greeting) "

这似乎比需要的复杂 10 倍。

我只是尝试了一个更简单的方法:

def raw_query(query, **kwargs):
    neodriver = neo_connect()  # cached dbconn
    with neodriver.session() as session:
        try:
            result = session.run(query, **kwargs)
            return result.data()

但这会导致查询出现套接字错误,可能是因为会话超出范围?

[dfcx/__init__] ERROR | Underlying socket connection gone (_ssl.c:2396)

[dfcx/__init__] ERROR | Failed to write data to connection IPv4Address(('neo4j-core-8afc8558-3.production-orch-0042.neo4j.io', 7687)) (IPv4Address(('34.82.120.138', 7687)))

我也不能返回游标/迭代器,只是data() 当会话超出范围时,查询结果似乎随之消失。

如果我手动打开和关闭会话,那么我会遇到同样的问题吗?

Python 一定是这个 DB 使用的最流行的语言,每个人都使用不同的驱动程序吗?Py2neo 看起来很可爱,但是对于大多数 cypher 语言功能来说完全缺乏 ORM 包装器功能,所以无论如何你都必须使用原始密码。而且我不确定它是否**kwargs以相同的方式支持参数插值。

我想大加薪应该有助于解决一些问题:D

尝试获得工作数据库包装器的稍长版本:

def neo_connect() -> Union[neo4j.BoltDriver, neo4j.Neo4jDriver]:

    global raw_driver
    if raw_driver:
        # print('reuse driver')
        return raw_driver

    neoconfig = NEOCONFIG
    raw_driver = neo4j.GraphDatabase.driver(
        neoconfig['url'], auth=(
            neoconfig['user'], neoconfig['pass']))
    if raw_driver is None:
        raise BaseException("cannot connect to neo4j")
    else:
        return raw_driver


def raw_query(query, **kwargs):
    # just get data, no cursor
    neodriver = neo_connect()
    session = neodriver.session()
    # logging.info('neoquery %s', query)
    # with neodriver.session() as session:
    try:
        result = session.run(query, **kwargs)
        data = result.data()
        return data

    except neo4j.exceptions.CypherSyntaxError as err:
        logging.error('neo error %s', err)
        logging.error('failed query: %s', query)
        raise err
    # finally:
    #     logging.info('close session')
    #     session.close()

更新:有人向我指出了这个例子,这是使用 tx 包装器的另一种方式。

https://github.com/neo4j-graph-examples/northwind/blob/main/code/python/example.py#L16-L21

标签: pythonneo4jpy2neo

解决方案


def raw_query(query, **kwargs):
    neodriver = neo_connect()  # cached dbconn
    with neodriver.session() as session:
        try:
            result = session.run(query, **kwargs)
            return result.data()

这很好,可以按我的预期工作。

您看到的错误表明存在连接问题。因此,服务器和驱动程序之间肯定发生了一些超出其影响的事情。

另外,请注意,运行查询的所有这些方式之间存在差异:

  • with driver.session():
        result = session.run("<SOME CYPHER>")
    
  • def work(tx):
        result = tx.run("<SOME CYPHER>") 
    
    with driver.session():
        session.write_transaction(work)
    

后者可能要长 3 行,并且负责驱动程序的团队收集了一些关于此的反馈。但是,这里还有更多需要考虑的事情。首先,更改 API 表面是需要仔细计划的事情,并且不能在补丁版本中完成。其次,有技术障碍需要克服。无论如何,这是语义:

  • 自动提交事务。仅将该查询作为一个工作单元运行。如果您在同一会话中运行新的自动提交事务,则先前的结果将为您缓冲所有可用记录(取决于查询,这将消耗大量内存)。这可以通过调用来避免result.consume()。但是,如果会话超出范围,则会自动使用结果。这意味着您无法从中提取更多记录。最后,将引发任何错误并需要在应用程序代码中进行处理。
  • 托管交易。在该函数中运行您想要的任何工作单元。围绕函数隐式启动和提交事务(除非您显式回滚)。如果事务结束(函数结束或回滚),结果将被消耗并变为无效。在此之前,您必须提取所需的所有记录。
    这是使用驱动程序的推荐方式,因为它不会引发所有错误,但会在内部处理一些错误(在适当的情况下)并重试该work功能(例如,如果服务器只是暂时不可用)。由于该函数可能会多次执行,因此您必须确保它是幂等的。

结束语:
请记住,stackoverlfow 会尽最大努力进行监控,可能会被视为草率评论的内容可能会妨碍您获得有用的问题答案


推荐阅读