首页 > 解决方案 > python cassandra 在生成器中获得 select * 的大结果(在 ram 中没有存储结果)

问题描述

我想获取 cassandra 表“用户”中的所有数据

我有 840000 个用户,我不想让所有用户都出现在 python 列表中。我希望以 100 个用户为一组获得用户

在 cassandra doc https://datastax.github.io/python-driver/query_paging.html 我看到我可以使用 fetch_size,但在我的 python 代码中我有包含所有 cql 指令的数据库对象

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement

class Database:
   def __init__(self, name, salary):
        self.cluster = Cluster(['192.168.1.1', '192.168.1.2'])
        self.session = cluster.connect()

   def get_users(self):
        users_list = []
        query = "SELECT * FROM users"
        statement = SimpleStatement(query, fetch_size=10)
        for user_row in session.execute(statement):
            users_list.append(user_row.name)
        return users_list

实际上 get_users 返回非常大的用户名列表,但我想将返回 get_users 转换为“生成器”

我不想在 1 个列表和 1 个函数 get_users 调用中获取所有用户名,但我希望有很多调用 get_users 并返回列表,每个调用函数最多只有 100 个用户

例如:list1 = database.get_users() list2 = database.get_users() ... listn = database.get_users()

list1 在查询中包含 100 个第一个用户 list2 在查询中包含 100 个“第二个”用户 listn 包含查询中的最新元素 (<=100)

这可能吗 ?感谢您提前回答

标签: pythoncassandra

解决方案


根据分页大查询

每当当前页面中没有更多行时,将透明地获取下一页。

所以,如果你像这样执行你的代码,你仍然会得到整个结果集,但这是以透明的方式分页的。

为了实现你需要使用回调。您还可以在上面的链接中找到一些代码示例。

我在完整代码下方添加以供参考。

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from threading import Event

class PagedResultHandler(object):

    def __init__(self, future):
        self.error = None
        self.finished_event = Event()
        self.future = future
        self.future.add_callbacks(
            callback=self.handle_page,
            errback=self.handle_error)

    def handle_page(self, rows):
        for row in rows:
            process_row(row)

        if self.future.has_more_pages:
            self.future.start_fetching_next_page()
        else:
            self.finished_event.set()
    def handle_error(self, exc):
        self.error = exc
        self.finished_event.set()

def process_row(user_row):
    print user_row.name, user_row.age, user_row.email

cluster = Cluster()
session = cluster.connect()

query = "SELECT * FROM myschema.users"
statement = SimpleStatement(query, fetch_size=5)

future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
    raise handler.error
cluster.shutdown()

移动到下一页是在调用handle_page时完成的。start_fetching_next_page

如果您将 if 语句替换为self.finished_event.set()您将看到迭代在前 5 行之后停止,如fetch_size


推荐阅读