apache-kafka - 创建扁平流时数据重复
问题描述
我有一个来自包含 271 条消息的主题的流,该流还包含 271 条消息,但是当我从前一个流创建另一个流以将其展平时,我得到的总消息为 542 =(271 * 2)。
这是从主题派生的流
Name : TRANSACTIONSPURE
Type : STREAM
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : mongo_conn.digi.transactions (partitions: 1,
replication: 1)
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
PAYLOAD | STRUCT<SENDER VARCHAR(STRING), RECEIVER VARCHAR(STRING),
RECEIVERWALLETID VARCHAR(STRING), STATUS VARCHAR(STRING), TYPE
VARCHAR(STRING), AMOUNT DOUBLE, TOTALFEE DOUBLE, CREATEDAT
VARCHAR(STRING), UPDATEDAT VARCHAR(STRING), ID VARCHAR(STRING),
ORDERID
VARCHAR(STRING), __V VARCHAR(STRING), TXID VARCHAR(STRING),
SENDERWALLETID VARCHAR(STRING)>
Local runtime statistics
------------------------
consumer-messages-per-sec: 0 consumer-total-bytes: 361356
consumer-total-messages: 271 last-message:
2019-09-02T10:44:14.003Z
这是我从前一个流派生的扁平流
Name : TRANSACTIONSRAW
Type : STREAM
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : TRANSACTIONSRAW (partitions: 4, replication: 1)
Field | Type
----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
SENDER | VARCHAR(STRING)
RECEIVER | VARCHAR(STRING)
RECEIVERWALLETID | VARCHAR(STRING)
STATUS | VARCHAR(STRING)
TYPE | VARCHAR(STRING)
AMOUNT | DOUBLE
TOTALFEE | DOUBLE
CREATEDAT | VARCHAR(STRING)
UPDATEDAT | VARCHAR(STRING)
ID | VARCHAR(STRING)
ORDERID | VARCHAR(STRING)
__V | VARCHAR(STRING)
TXID | VARCHAR(STRING)
SENDERWALLETID | VARCHAR(STRING)
----------------------------------------------
Queries that write into this STREAM
-----------------------------------
CSAS_TRANSACTIONSRAW_10 : CREATE STREAM transactionsraw
with(value_format='JSON') as SELECT payload->sender as sender,
payload->receiver as receiver, payload->receiverWalletId as
receiverWalletId, payload->status as status, payload->type as type,
payload->amount as amount, payload->totalFee as totalFee,
payload->createdAt as createdAt, payload->updatedAt as updatedAt,
payload->id as id, payload->orderId as orderId , payload-> __v as __v,
payload->txId as txId, payload->senderWalletId as senderWalletId from
transactionspure;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
consumer-messages-per-sec: 0 consumer-total-bytes: 315500
consumer-total-messages: 542 messages-per-sec: 0 total-
messages: 271 last-message: 2019-09-02T10:44:15.493Z
解决方案
推荐阅读
- python - 根据条件将字典中的值分配给新列
- css - 如何避免在页面加载时使用 CSS grid-template-columns 进行布局转换?
- c++ - Visual Studio C++ 在 main 函数结束时触发断点
- github - 找不到从 GitHub webhook 响应创建 SHA256 的格式
- android - 错误:java.lang.ArrayIndexOutOfBoundsException:长度=5;指数=5
- sql - 如何获得人口数量最多的国家/地区人口最多的城市
- php - php 和 Ajax:当我选择主类别时自动显示子类别
- reactjs - 反应表单复选框删除或添加,第一次交互时不更新
- java - Hikari 数据源打开连接超过最大池大小限制
- xcode - SwiftUI:.fileExporter 仅导出 1 个文档