首页 > 技术文章 > kafka中常用API的简单JAVA代码

skyfeng 2017-04-16 20:42 原文

  通过之前《kafka分布式消息队列介绍以及集群安装》的介绍,对kafka有了初步的了解。本文主要讲述java代码中常用的操作。

准备:增加kafka依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

一、kafka中对topic的操作

package org.kafka;

import kafka.admin.DeleteTopicCommand;
import kafka.admin.TopicCommand;

/**
 * kafka主题操作
 */
public class TopicDemo {
	/**
	 * 添加主题
	 * linux命令:bin/kafka-topics.sh --create --zookeeper 192.168.2.100:2181 --replication-factor 3 --partitions 1 --topic topictest0416
	 */
	public static void createTopic() {
		String[] options = new String[] {
				"--create", 
				"--zookeeper",
				"192.168.2.100:2181", 
				"--replication-factor", 
				"3",
				"--partitions",
				"1", 
				"--topic", 
				"topictest0416" };
		TopicCommand.main(options);
	}

	/**
	 * 查询所有主题
	 * linux命令:bin/kafka-topics.sh --list --zookeeper 192.168.2.100:2181
	 */
	public static void queryTopic() {
		String[] options = new String[] { 
				"--list", 
				"--zookeeper",
				"192.168.2.100:2181" };
		TopicCommand.main(options);
	}
	
	/**
	 * 查看指定主题的分区及副本状态信息
	 * bin/kafka-topics.sh --describe --zookeeper 192.168.2.100:2181 --topic topictest0416
	 */
	public static void queryTopicByName() {
		String[] options = new String[]{  
			    "--describe",  
			    "--zookeeper",  
			    "192.168.2.100:2181",  
			    "--topic",  
			    "topictest0416",  
			};  
		TopicCommand.main(options);
	}
	
	/**
	 * 修改主题
	 * linux命令:bin/kafka-topics.sh --zookeeper 192.168.2.100:2181 --alter --topic topictest0416 --partitions 3
	 */
	public static void alterTopic() {
		String[] options = new String[]{  
			    "--alter",  
			    "--zookeeper",  
			    "192.168.2.100:2181",  
			    "--topic",  
			    "topictest0416",  
			    "--partitions",  
			    "3"  
			};  
			TopicCommand.main(options); 
	}
	
	/**
	 * 删除主题
	 */
	public static void delTopic() {
		String[] options = new String[] { 
				"--zookeeper",  
			    "192.168.2.100:2181",  
			    "--topic",  
			    "topictest0416" };
		DeleteTopicCommand.main(options);
	}

}

二、Producer代码

package org.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerDemo {
	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		//zookeeper集群列表
		props.put("zk.connect", "hadoop1-1:2181,hadoop1-2:2181,hadoop1-3:2181");
		props.put("metadata.broker.list", "hadoop1-1:9092,hadoop1-2:9092,hadoop1-3:9092");
		//设置消息使用哪个类来序列化
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		
		ProducerConfig config = new ProducerConfig(props);
		//构造Producer对象
		Producer<String, String> producer = new Producer<String, String>(config);
		
		// 发送业务消息
		// 读取文件 读取内存数据库
		for (int i = 0; i < 10; i++) {
			Thread.sleep(500);
			KeyedMessage<String, String> km = new KeyedMessage<String, String>("topictest0416", "I am a producer " + i + " hello!");
			producer.send(km);
		}
		
	}
}

三、consumer代码

package org.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
	private static final String topic = "topictest0416";
	private static final Integer threads = 1;
	
	public static void main(String[] args) {
		Properties props = new Properties();
		//zookeeper集群列表
		props.put("zookeeper.connect", "hadoop1-1:2181,hadoop1-2:2181,hadoop1-3:2181");
		//消费者组ID
		props.put("group.id", "001");
		//设置读取的偏移量;smallest意思是指向最小的偏移量
		props.put("auto.offset.reset", "smallest");
		//将Properties封装成消费者配置对象
		ConsumerConfig config = new ConsumerConfig(props);
		ConsumerConnector consumer =  Consumer.createJavaConsumerConnector(config);
		
		Map<String, Integer> topicMap = new HashMap<>();
		//key为消费的topic
		//value为消费的线程数量
		topicMap.put(topic, threads);
		
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicMap);
		
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
		
		for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					for (MessageAndMetadata<byte[], byte[]> mm : kafkaStream) {
						System.out.println(new String(mm.message()));
					}
				}
			}).start();
		}
	}

}

四、测试

  先启动Consumer,再启动Producer

  测试结果:

  

 

推荐阅读