首页 > 技术文章 > kafka的使用

super-chao 2018-07-13 13:59 原文

kafka基于zookeeper。

需要安装kafka、zookeeper。

安装方法参考:http://tzz6.iteye.com/blog/2401197

启动zookeeper:点击zkServer.cmd启动zookeeper。

启动kafka:

如果启动报错:

启动kafka的时候报错:

ERROR Error while deleting the clean shutdown file in dir E:\kafka_2.11-1.0.0\tmp\kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: E:\kafka_2.11-1.0.0\tmp\kafka-logs\__consumer_offsets-9\00000000000000000000.timeindex: 另一个程序正在使用此文件,进程无法访问。

解决办法:

删除日志:

日志的路径在kafka的文件中找到server.properties:log.dirs=/tmp/kafka-logs。删除tmp文件夹下的kafka-logs文件夹。重启kafka即可。

kafka配置:

kafka-beans.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">


    <context:component-scan base-package="com.unionpay.producer"></context:component-scan>
    <context:component-scan base-package="com.unionpay.consumer"></context:component-scan>


    <bean id="kafkaProducerDemo" class="com.test.www.unionpay.producer.KafkaProducerDemo">
        <property name="properties">
            <props>
                <prop key="topic">my-replicated-topic</prop>
                <prop key="bootstrap.servers">127.0.0.1:9092</prop>
                <prop key="acks">all</prop>
                <prop key="key.serializer">org.apache.kafka.common.serialization.StringSerializer
                </prop>
                <prop key="value.serializer">org.apache.kafka.common.serialization.StringSerializer
                </prop>
                <prop key="buffer.memory">33554432</prop>
            </props>

        </property>
    </bean>

    <bean id="kafkaConsumerDemo" class="com.test.www.unionpay.consumer.KafkaConsumerDemo">
        <property name="props">
            <props>
                <prop key="topic">my-replicated-topic</prop>
                <prop key="bootstrap.servers">127.0.0.1:9092</prop>
                <prop key="group.id">group1</prop>
                <prop key="enable.auto.commit">true</prop>
                <prop key="auto.commit.interval.ms">1000</prop>
                <prop key="session.timeout.ms">30000</prop>
                <prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer
                </prop>
                <prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer
                </prop>
            </props>

        </property>
    </bean>
</beans>

 redis.properties:

# 控制一个pool可分配多少个jedis实例
redis.pool.maxTotal=1000
# 控制一个pool最多有多少个状态为idle(空闲)的jedis实例
redis.pool.maxIdle=200
# 表示当borrow一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException
redis.pool.maxWaitMillis=2000
#在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的
redis.pool.testOnBorrow=true
# redis 单机
# 单机 host
jedis.host=127.0.0.1
# 单机 port
jedis.port=6379

KafkaProducerDemo.java:

package com.test.www.unionpay.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerDemo {
    Properties properties;
    public KafkaProducerDemo() {

    }
    public KafkaProducerDemo(Properties properties) {
        super();
        this.properties = properties;
    }

    public Properties getProperties() {
        return properties;
    }

    public void setProperties(Properties properties) {
        this.properties = properties;
    }

    public void sendMessage(String msg) {

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        ProducerRecord<String, String> record = new ProducerRecord<String, String>(properties.getProperty("topic"),msg);
        producer.send(record);

        producer.close();

    }

}

KafkaConsumerDemo.java:

package com.test.www.unionpay.consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaConsumerDemo {
private Properties props;
    public KafkaConsumerDemo() {
    }
    public KafkaConsumerDemo(Properties props) {
        super();
        this.props = props;
    }

    public Properties getProps() {
        return props;
    }

    public void setProps(Properties props) {
        this.props = props;
    }

    public String receive(){
        
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList(props.getProperty("topic")));
        
        String msg = "";
        while(true){
            ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
            for(ConsumerRecord<String, String> consumerRecord:consumerRecords){
                msg += consumerRecord.value();
            }
            consumer.close();
            return msg;
        }
    }

}

KafkaController.java:

package com.test.www.web.controller;

import java.text.SimpleDateFormat;
import java.util.Date;

import javax.annotation.Resource;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.ModelAndView;

import com.test.www.unionpay.consumer.KafkaConsumerDemo;
import com.test.www.unionpay.producer.KafkaProducerDemo;

@Controller
public class KafkaController {

    @Resource(name = "kafkaProducerDemo")
    KafkaProducerDemo producer;

    @Resource(name = "kafkaConsumerDemo")
    KafkaConsumerDemo consumer;

    @RequestMapping(value = "/welcome")
    public ModelAndView welcome() {
        System.out.println("--------welcome--------");
        ModelAndView mv = new ModelAndView();
        mv.setViewName("welcome");
        return mv;
    }

    @RequestMapping(value = "/sendmessage", method = RequestMethod.GET)
    public ModelAndView sendMessage() {
        System.out.println("--------sendmessage--------");
        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String now = sdf.format(date);

        ModelAndView mv = new ModelAndView();
        mv.addObject("time", now);
        mv.setViewName("kafka_send");
        return mv;
    }

    @RequestMapping(value = "/onsend", method = RequestMethod.POST)
    public ModelAndView onsend(@RequestParam("message") String msg) {
        System.out.println("--------onsend--------");
        producer.sendMessage(msg);

        ModelAndView mv = new ModelAndView();
        mv.setViewName("welcome");
        return mv;
    }

    @RequestMapping(value = "/receive")
    public ModelAndView receive() {
        System.out.println("--------receive--------");
        
        String msg = consumer.receive();
        
        ModelAndView mv = new ModelAndView();
        mv.addObject("msg", msg);
        mv.setViewName("kafka_receive");
        return mv;
    }

}

页面:

welcome.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>welcome</title>
</head>
<body>
    <h1>Welcome</h1>
    <h2><a href="sendmessage.html">Send a Message</a></h2>
    <h2><a href="receive.html">Get a Message</a></h2>
</body>
</html>

kafka_send.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>kafka_send</title>
</head>
<body>
    <h1>Send a Message</h1>
    <form action="onsend.html" method="post">
        MessageText:<textarea name="message">${time}</textarea>
        <br>
        <input type="submit" value="Submit">
    </form>
    
    <h2><a href="welcome.html">RETURN HOME</a></h2>
</body>
</html>

kafka_receive.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>kafka_receive</title>
</head>
<body>

    <h1>Kafka_Reveive!!!</h1>
    <h2>Receive Message : ${msg}</h2>
    <h2><a href="welcome.html">RETURN HOME</a></h2>
</body>
</html>

效果图:

如图,kafka发送消息、接受消息运行成功。

 

推荐阅读