首页 > 技术文章 > kafak之间的转发

shuaidong 2020-10-14 19:49 原文

近期要实现一个业务,将一个环境kafka中的数据转发到另一个环境kafka中~ 

话不多说,干活!!emmm····

--------------------------------------------------------华丽丽的代码风格线----------------------------------------------------------------

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;


import java.util.Arrays;
import java.util.Properties;

/**
* @author wanghd
* @version 1.0
* @date 2020/10/14 9:55
*/

public class Consumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private KafkaProducer<String, String> producer;
private ConsumerRecords<String, String> msgList;
private final String topic;
private String produceTopic;
private static final String GROUPID = "groupA";


public Consumer(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.12.14:9092");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}

@Override
public void run() {
System.out.println("---------开始消费---------");
try {
msgList = consumer.poll(1000);
if (null != msgList && msgList.count() > 0) {
for (ConsumerRecord<String, String> record : msgList) {
String value = record.value();
//将json转换成Map
if (value != null || !value.isEmpty()) {
JSONObject jsonObject = JSON.parseObject(value);
String vin = jsonObject.get("vin").toString();
System.out.println("vin" + vin);
//vin作为key推送数据
sendToSgl("sglkafkaTopic", vin, record.value());
}
}
} else {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}

public void sendToSgl(String topic, String vin, String value) {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.12.14:9092");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
this.produceTopic = topic;
startSend(vin, value);
}

public void startSend(String vin, String value) {
try {
producer.send(new ProducerRecord<String, String>(produceTopic, vin, value));
System.out.println("发送的信息:" + value);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}

public static void main(String args[]) {
Consumer test1 = new Consumer("KAFKA_TEST");
Thread thread1 = new Thread(test1);
thread1.start();
}
}

--------------------------------------------------------华丽丽的代码风格线----------------------------------------------------------------

上述代码意思是:将要执行的类放入线程中~ 让它嗡嗡嗡的一直跑...就是这样跑!hhhh~ 首先它会执行run方法,在这里,让run方法先消费数据,在消费到之后进行发送...嘟嘟嘟的发送。!

 

想法就是这么个想法,我依旧很菜~.!

--------------------------------------------------------华丽丽的pom风格线----------------------------------------------------------------

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>sendToSGL</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
</parent>

<dependencies>


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>


</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<executable>true</executable>
</configuration>
</plugin>
</plugins>
</build>
</project>

--------------------------------------------------------华丽丽的pom风格线----------------------------------------------------------------

 

推荐阅读