python - 为什么数据不使用 python 发送到 KAFKA 主题?
问题描述
我不知道有什么问题。我从 MYSQL 数据库中读取数据并想将其发送到 Kafka。但是脚本运行没有问题并停留在发送时刻
def parse(d,param): # распарс
r={}
if str(type(d)) == "<class 'dict'>":
# print(d)
return (d)
return -1
s_con = pymysql.connect(host="xxx", user="xxx", password="xxx", port=3306,
db="xxx", cursorclass=pymysql.cursors.SSCursor)
s_cur = s_con.cursor()
s_cur.execute(s_sql)
producer = KafkaProducer(bootstrap_servers=param["BOOTSTRAP_SERVERS"],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),)
print(producer)
while True:
r = [dict((s_cur.description[i][0], value) \
for i, value in enumerate(row)) for row in s_cur.fetchmany(
1000)]
for d in r:
j = parse(d, param)
print(j)
print(param["TOPIC"])
producer.send(param["TOPIC"], value=j)
sleep(5)
结果如下
解决方案
推荐阅读
- javascript - 在 ng-bootstrap 模式中构建数据表的问题
- javascript - 在 index.html 中使用环境变量
- vega - Vega-Lite - 处理大量数据的最佳方式是什么?
- sql-server - 为什么下面的 t-sql 事务不能按预期工作?
- r - R:如果日期差小于 7,则按组从前一行添加值
- java - 在 setItems() 之后无法在表格视图中查看单元格中的数据
- codeigniter - 错误链接到带有 href 的另一个页面(我的 localhost url 中的 %E2%80%8B 代码出错)
- c# - 可以从应用程序中删除“政策后参考”吗?
- r - 如何在 R 的系统命令中运行“conda ***”
- jpa - Spring JPA DDL 文件生成 - 如何在生成之前删除或清理文件