python - 使用来自 KAFKA 主题的数据并从中提取字段并使用 python 存储在 MySQL 中
问题描述
我想使用以下命令使用来自 Kafka 主题的数据,如下所示:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTestTopic --from-beginning
然后这将输出以下内容(只是粘贴前 2 行输出,但它会是很多行......):
&time=1561768216000&gameCategory=PINPOINT&category=ONE&uniqueId=2518Z-0892A-0030O-16H70&transactionType=CRD&familyId=000-222-115-11119&realTs=1561768319000&sortId=1&msg=SET-UP+PRAYER+%26+intercession+begins+in+just+30+minutes.&remoteIpAddress=127.0.0.1&userAgent=HTTP&
&uniqueId=872541806296826880&time=1571988786000&gameCategory=NOTIFY&category=TWO&transactionType=CRD&familyId=401-222-115-89387&sortId=1&realTs=1571988989000&msg=This-is+a+reminder.&remoteIpAddress=127.0.0.1&userAgent=HTTPS&
我想从输出中使用以下内容:
真实的
家庭编号
味精
唯一身份
您可以看到每个元素都由一个 & 符号 ('&') 分隔。它们并不总是在同一个索引/位置,所以我不确定我是否需要正则表达式?最终,当我在本地运行的 MySQL 上进行查询时,我会看到:
描述测试表;
+----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| realTs | bigint(20) | YES | | NULL | |
| familyId | varchar(255) | YES | | NULL | |
| msg | text | YES | | NULL | |
| uniqueId | varchar(255) | YES | | NULL | |
+----------+--------------+------+-----+---------+-------+
4 rows in set (0.00 sec)
选择 * 从测试表;
+---------------+-------------------+-----------------------------------------------------------+-------------------------+
| realTs | familyId | msg | uniqueId |
+---------------+-------------------+-----------------------------------------------------------+-------------------------+
| 1561768319000 | 000-222-115-11119 | SET-UP+PRAYER+%26+intercession+begins+in+just+30+minutes. | 2518Z-0892A-0030O-16H70 |
| 1571988989000 | 401-222-115-89387 | This-is+a+reminder. | 872541806296826880 |
+---------------+-------------------+-----------------------------------------------------------+-------------------------+
到目前为止我有什么?我有一个带有python的mysql-connector,我可以连接到本地mysql等,但我正在努力解析它并插入它......
解决方案
使用 Python,您可以使用urllib.parse.parse_qs
Python 字典中检索 URL 查询字符串组件,稍后您可以迭代这些组件以在 MySQL 数据库中插入数据。
例如:
from urllib.parse import parse_qs
line = "&time=1561768216000&gameCategory=PINPOINT&category=ONE&uniqueId=2518Z-0892A-0030O-16H70&transactionType=CRD&familyId=000-222-115-11119&realTs=1561768319000&sortId=1&msg=SET-UP+PRAYER+%26+intercession+begins+in+just+30+minutes.&remoteIpAddress=127.0.0.1&userAgent=HTTP&uniqueId=872541806296826880&time=1571988786000&gameCategory=NOTIFY&category=TWO&transactionType=CRD&familyId=401-222-115-89387&sortId=1&realTs=1571988989000&msg=This-is+a+reminder.&remoteIpAddress=127.0.0.1&userAgent=HTTPS&"
o = parse_qs(line)
print(o)
结果:
{'time': ['1561768216000', '1571988786000'], 'gameCategory': ['PINPOINT', 'NOTIFY'], 'category': ['ONE', 'TWO'], 'uniqueId': ['2518Z-0892A-0030O-16H70', '872541806296826880'], 'transactionType': ['CRD', 'CRD'], 'familyId': ['000-222-115-11119', '401-222-115-89387'], 'realTs': ['1561768319000', '1571988989000'], 'sortId': ['1', '1'], 'msg': ['SET-UP PRAYER & intercession begins in just 30 minutes.', 'This-is a reminder.'], 'remoteIpAddress': ['127.0.0.1', '127.0.0.1'], 'userAgent': ['HTTP', 'HTTPS']}
推荐阅读
- javascript - 使用 child_process.spawn() 在原始模式下打开 VIM 会导致它冻结
- python - Python 在日志记录中向基于 structlog 的格式化程序添加额外字段
- elasticsearch - elasticsearch中如何正确翻转索引
- python - jupyter lab 上未显示的绘图和小部件
- powershell - 要求用户填充字段(Powershell)
- layout - qiskit 的转译器中的 DenseLayout 是如何工作的?
- android - 我无法解决此错误 [android]
- laravel - Laravel vue:我无法获取与模型之间的关系相关的对象
- amazon-web-services - 如果我在 AWS 组织中创建 AWS 账户,我可以从拥有该组织的根账户中删除它吗?
- mysql - 获取 id 最高的行