首页 > 解决方案 > 在 Python 中维护来自 Kafka 更新消息流的表的最佳数据结构

问题描述

假设我有一个固定维度 (N x M) 的表格数据集。我收到来自 Kafka 更新此表中条目的更新流。最终,我想要一个带有最新版本表的 pandas 数据框,我正在考虑这样做的几个选项:

  1. 将其作为表/数据框保存在内存中。我在这里担心的是,我不知道是否可以避免使用多线程,因为一个进程将永远处于接收消息的 for 循环中。

  2. 将其维护在外部结构中,并有一个独立的进程从中独立读取。外部数据存储的选择: a) SQLite - 可能存在并发问题,并且任意行的更新可能有点混乱。b) Redis - 易于维护,但很难一次查询/读取整个表(这是我通常访问数据的方式)。

我有点卡夫卡初学者,所以这里的任何建议都将不胜感激。你会如何处理这个问题?谢谢!

编辑:我想我也可以将它保存在内存中,然后将整个内容推送到 SQLite?

标签: pythonapache-kafkaredisproducer-consumer

解决方案


我最初的方法是问:我可以创建一个“足够好”的解决方案,然后在需要时对其进行优化?

除非您需要担心非常敏感的信息(如医疗保健或财务数据),或者肯定会很快扩大规模的数据,否则我建议您先尝试一个简单的解决方案,然后看看您是否遇到任何问题。你不可以!

最终,我可能会从 SQLite 解决方案开始,因为它设置起来相对简单,并且非常适合用例(即“事务”情况)。

以下是我会考虑的一些注意事项:

单一流程的优缺点

除非您的数据是高速/大容量的,否则您在同一过程中使用和处理数据的建议可能很好。在本地处理数据比通过网络接收数据要快得多(假设您的 Kafka 提要不在本地计算机上),因此您从 Kafka 摄取的数据可能会成为瓶颈。

但是,让 Python 进程无限期地旋转可能会很昂贵,并且您需要确保将数据存储到文件或数据库中,以防止进程关闭时丢失。

关系数据库(例如 SQLite)

使用像 SQLite 这样的关系数据库可能是您最好的选择,这再次取决于您接收数据的速度。但是关系数据库一直用于事务目的(实际上这是它们的主要预期目的之一),这意味着写入量和写入速度都很高——因此将数据保存在 SQLite 中并在那里进行更新肯定是有意义的. 如果有意义的话,您可以看到将数据分成单独的表格(例如第三范式),或者如果更合适,您可以将它们全部保存在一个表格中。

维护内存中的表

您也可以按照您的建议将表保存在内存中,只要您在更新后以某种方式(CSV、SQLite 等)将其保存到磁盘。例如,您可以:

  1. 将您的副本保存在内存中。
  2. 当您获得更新时,请对您的内存表进行更新。
  3. 将表写入磁盘。
  4. 如果您的进程停止或重新启动,请从内存中读取表以启动。

但是,Pandas 访问和更新行中的单个值可能会更慢,因此将表作为字典或其他东西保存在内存中并在不使用 pandas 的情况下将其写入磁盘实际上可能更有意义。但是,如果您可以在 pandas 中完成所有操作(重新:速度和体积),那也可能是一个很好的开始方式。


推荐阅读