首页 > 解决方案 > 创建扁平流时数据重复

问题描述

我有一个来自包含 271 条消息的主题的流,该流还包含 271 条消息,但是当我从前一个流创建另一个流以将其展平时,我得到的总消息为 542 =(271 * 2)。

这是从主题派生的流

 Name                 : TRANSACTIONSPURE
 Type                 : STREAM
 Key field            : 
 Key format           : STRING
 Timestamp field      : Not set - using <ROWTIME>
 Value format         : JSON
 Kafka topic          : mongo_conn.digi.transactions (partitions: 1, 
 replication: 1)

 Field   | Type                                                                                                                                                                                                                                                                                                                                                   

 ROWTIME | BIGINT           (system)                                                                                                                                                                                                                                                                                                                              
 ROWKEY  | VARCHAR(STRING)  (system)                                                                                                                                                                                                                                                                                                                              
 PAYLOAD | STRUCT<SENDER VARCHAR(STRING), RECEIVER VARCHAR(STRING),  
 RECEIVERWALLETID VARCHAR(STRING), STATUS VARCHAR(STRING), TYPE 
 VARCHAR(STRING), AMOUNT DOUBLE, TOTALFEE DOUBLE, CREATEDAT 
 VARCHAR(STRING), UPDATEDAT VARCHAR(STRING), ID VARCHAR(STRING), 
 ORDERID 
 VARCHAR(STRING), __V VARCHAR(STRING), TXID VARCHAR(STRING), 
 SENDERWALLETID VARCHAR(STRING)> 
 Local runtime statistics
 ------------------------
 consumer-messages-per-sec:         0 consumer-total-bytes:    361356  
 consumer-total-messages:       271     last-message: 
 2019-09-02T10:44:14.003Z

这是我从前一个流派生的扁平流

 Name                 : TRANSACTIONSRAW
 Type                 : STREAM
 Key field            : 
 Key format           : STRING
 Timestamp field      : Not set - using <ROWTIME>
 Value format         : JSON
 Kafka topic          : TRANSACTIONSRAW (partitions: 4, replication: 1)

  Field            | Type                      
 ----------------------------------------------
  ROWTIME          | BIGINT           (system) 
  ROWKEY           | VARCHAR(STRING)  (system) 
  SENDER           | VARCHAR(STRING)           
  RECEIVER         | VARCHAR(STRING)           
  RECEIVERWALLETID | VARCHAR(STRING)           
  STATUS           | VARCHAR(STRING)           
  TYPE             | VARCHAR(STRING)           
  AMOUNT           | DOUBLE                    
  TOTALFEE         | DOUBLE                    
  CREATEDAT        | VARCHAR(STRING)           
  UPDATEDAT        | VARCHAR(STRING)           
  ID               | VARCHAR(STRING)           
  ORDERID          | VARCHAR(STRING)           
  __V              | VARCHAR(STRING)           
  TXID             | VARCHAR(STRING)           
  SENDERWALLETID   | VARCHAR(STRING)           
 ----------------------------------------------

 Queries that write into this STREAM
 -----------------------------------
CSAS_TRANSACTIONSRAW_10 : CREATE STREAM transactionsraw 
with(value_format='JSON') as SELECT payload->sender as sender, 
payload->receiver as receiver, payload->receiverWalletId as 
receiverWalletId, payload->status as status, payload->type as type, 
payload->amount as amount, payload->totalFee as totalFee, 
payload->createdAt as createdAt, payload->updatedAt as updatedAt, 
payload->id as id,  payload->orderId as orderId , payload-> __v as __v, 
payload->txId as txId, payload->senderWalletId as senderWalletId from 
transactionspure;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
consumer-messages-per-sec:         0 consumer-total-bytes:    315500 
consumer-total-messages:       542 messages-per-sec:         0   total-
messages:       271     last-message: 2019-09-02T10:44:15.493Z

标签: apache-kafkaksqldbconfluent-platform

解决方案


推荐阅读