python - ModuleNotFoundError("'kafka' 不是一个有效的名称。你的意思是 aiokafka,kafka 之一吗?")
问题描述
我正在使用 Celery 和 Kafka 运行一些作业,以便将数据推送到 Kafka。我还使用浮士德来连接工人。faust -A project.streams.app worker -l info
但不幸的是,为了运行管道,我在运行后遇到了错误。我想知道是否有人可以帮助我。
/home/admin/.local/lib/python3.6/site-packages/faust/fixups/django.py:71: UserWarning: Using settings.DEBUG leads to a memory leak, never
use this setting in production environments!
warnings.warn(WARN_DEBUG_ENABLED)
Command raised exception: ModuleNotFoundError("'kafka' is not a valid name. Did you mean one of aiokafka, kafka?",)
File "/home/admin/.local/lib/python3.6/site-packages/mode/worker.py", line 67, in exiting
yield
File "/home/admin/.local/lib/python3.6/site-packages/faust/cli/base.py", line 528, in _inner
cmd()
File "/home/admin/.local/lib/python3.6/site-packages/faust/cli/base.py", line 611, in __call__
self.run_using_worker(*args, **kwargs)
File "/home/admin/.local/lib/python3.6/site-packages/faust/cli/base.py", line 620, in run_using_worker
self.on_worker_created(worker)
File "/home/admin/.local/lib/python3.6/site-packages/faust/cli/worker.py", line 57, in on_worker_created
self.say(self.banner(worker))
File "/home/admin/.local/lib/python3.6/site-packages/faust/cli/worker.py", line 97, in banner
self._banner_data(worker))
File "/home/admin/.local/lib/python3.6/site-packages/faust/cli/worker.py", line 127, in _banner_data
(' transport', app.transport.driver_version),
File "/home/admin/.local/lib/python3.6/site-packages/faust/app/base.py", line 1831, in transport
self._transport = self._new_transport()
File "/home/admin/.local/lib/python3.6/site-packages/faust/app/base.py", line 1686, in _new_transport
return transport.by_url(self.conf.broker_consumer[0])(
File "/home/admin/.local/lib/python3.6/site-packages/mode/utils/imports.py", line 101, in by_url
return self.by_name(URL(url).scheme)
File "/home/admin/.local/lib/python3.6/site-packages/mode/utils/imports.py", line 115, in by_name
f'{name!r} is not a valid name. {alt}') from exc
解决方案
我不知道浮士德出了什么问题,但我pip install faust
碰巧跑了,它解决了问题。
推荐阅读
- clojure - 如何根据 uri 在 reframe 中提供正确的面板?
- snowflake-cloud-data-platform - Snowflake 中的值对:变体还是对象?
- flutter - 在 Flutter 中合并流
- google-apps-script - 如何使用归档脚本将 Importrange 单元格转换为硬值,而无需将源设置为公共共享
- r - Shiny 中的可下载表格
- ruby-on-rails - Selenium 在 Heroku 上不起作用(Selenium::WebDriver::Error::SessionNotCreatedError)
- python - 将熊猫数据框中的两列相乘并创建一个包含解决方案的新列
- ffmpeg - 如何使用 ffmpeg 实时流式传输输入批次的单个帧,实时“附加”到 m3u8 播放列表?
- java - 如何将我的类的实例注入另一个类?
- java - 如何使用“-Xlog:heap*=debug”“-Xlog:gc*=debug”从 GC 正确读取日志的某些部分