首页 > 技术文章 > Kafka.net使用编程入门(四)

Wulex 2017-06-06 20:04 原文

新建一个cmd窗口,zkServer命令启动zookeeper
打开另一个cmd窗口,输入:

cd D:\Worksoftware\Apachekafka2.11\bin\windows

kafka-server-start  D:\Worksoftware\Apachekafka2.11\config\server.properties
删除主题:E:\WorkSoftWare\kafka2.11\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --delete --topic TestSiso --zookeeper localhost:2181

kafka 删除topic 提示marked for deletion
并没有真正删除,如果要真正删除

在每一台机器中的kafka_2.10/config/server.properties 文件加入 delete.topic.enable=true

最后所有机器重新启动kafka

启动kafka成功后,就可以运行项目了

引用了kafka-net.dll

Program.cs

 internal class Program
    {
        private static void Main(string[] args)
        {
            string header = "kafka测试";

            Console.Title = header;
            Console.WriteLine(header);
            ConsoleColor color = Console.ForegroundColor;

            var pub = new KafkaHelper("Test", true);

            var sub = new KafkaHelper("Test", false);

            Task.Run(() =>
            {
                while (true)
                {
                    string msg = string.Format("{0}这是一条测试消息", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
                    pub.Pub(new List<string> {msg});

                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("发送消息:" + msg);
                    //Console.ForegroundColor = color;
                    Thread.Sleep(2000);
                }
            });

            Task.Run(() => sub.Sub(msg =>
            {
                Console.ForegroundColor = ConsoleColor.Green;
                Console.WriteLine("收到消息:{0}", msg);
                //Console.ForegroundColor = color;
            }));

            Console.ReadLine();
        }
    }

KafkaHelper.cs代码:


    /// <summary>
    ///  kafka辅助类
    /// </summary>
    public sealed class KafkaHelper
    {
        private readonly KafkaConfig _config;

        private readonly ConsumerHelper _consumerHelper;
        private readonly bool _isProducer = true;
        private readonly ProduceHelper _produceHelper;
        private BrokerHelper _brokerHelper;

        /// <summary>
        ///     kafka辅助类构造方法
        /// </summary>
        /// <param name="sectionName">config中配置节点名称</param>
        /// <param name="isProducer"></param>
        public KafkaHelper(string sectionName, bool isProducer = true)
        {
            _isProducer = isProducer;
            _config = KafkaConfig.GetConfig(sectionName);
            _brokerHelper = new BrokerHelper(_config.Broker);
            if (isProducer)
                _produceHelper = new ProduceHelper(_brokerHelper);
            else
                _consumerHelper = new ConsumerHelper(_brokerHelper);
        }

        /// <summary>
        /// 是否是生产者模式
        /// </summary>
        public bool IsProducer
        {
            get { return _isProducer; }
        }


        /// <summary>
        ///     发送消息到队列
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="datas"></param>
        /// <param name="acks"></param>
        /// <param name="timeout"></param>
        public void Pub(List<string> datas, short acks = 1, TimeSpan? timeout = default(TimeSpan?))
        {
            _produceHelper.Pub(_config.Topic, datas, acks, timeout, MessageCodec.CodecNone);
        }

        /// <summary>
        ///     订阅消息
        /// </summary>
        /// <param name="onMsg"></param>
        public void Sub(Action<string> onMsg)
        {
            _consumerHelper.Sub(_config.Topic, onMsg);
        }

        /// <summary>
        ///     取消订阅
        /// </summary>
        public void UnSub()
        {
            _consumerHelper.UnSub();
        }
    }

KafkaConfig.cs代码:

 /// <summary>
    ///  kafka配置类
    /// </summary>
    public class KafkaConfig : ConfigurationSection
    {
        /// <summary>
        ///     当前配置名称
        ///     此属性为必须
        /// </summary>
        public string SectionName { get; set; }

        /// <summary>
        ///     代理
        /// </summary>
        [ConfigurationProperty("broker", IsRequired = true)]
        public string Broker
        {
            get { return (string) base["broker"]; }
            set { base["broker"] = value; }
        }

        /// <summary>
        ///     主题
        /// </summary>
        [ConfigurationProperty("topic", IsRequired = true)]
        public string Topic
        {
            get { return (string) base["topic"]; }
            set { base["topic"] = value; }
        }

        #region 从配置文件中创建kafka配置类

        /// <summary>
        ///     获取默认kafka配置类
        /// </summary>
        /// <returns></returns>
        public static KafkaConfig GetConfig()
        {
            return (KafkaConfig) ConfigurationManager.GetSection("kafkaConfig");
        }

        /// <summary>
        ///     获取指定的kafka配置类
        /// </summary>
        /// <param name="sectionName"></param>
        /// <returns></returns>
        public static KafkaConfig GetConfig(string sectionName)
        {
            var section = (KafkaConfig) ConfigurationManager.GetSection(sectionName);
            //  跟默认配置相同的,可以省略
            if (section == null)
                section = GetConfig();
            if (section == null)
                throw new ConfigurationErrorsException("kafkacofng节点 " + sectionName + " 未配置.");
            section.SectionName = sectionName;
            return section;
        }

        /// <summary>
        ///     从指定位置读取配置
        /// </summary>
        /// <param name="fileName"></param>
        /// <param name="sectionName"></param>
        /// <returns></returns>
        public static KafkaConfig GetConfig(string fileName, string sectionName)
        {
            return GetConfig(ConfigurationManager.OpenMappedMachineConfiguration(new ConfigurationFileMap(fileName)),
                sectionName);
        }

        /// <summary>
        ///     从指定Configuration中读取配置
        /// </summary>
        /// <param name="config"></param>
        /// <param name="sectionName"></param>
        /// <returns></returns>
        public static KafkaConfig GetConfig(Configuration config, string sectionName)
        {
            if (config == null)
                throw new ConfigurationErrorsException("传入的配置不能为空");
            var section = (KafkaConfig) config.GetSection(sectionName);
            if (section == null)
                throw new ConfigurationErrorsException("kafkacofng节点 " + sectionName + " 未配置.");
            section.SectionName = sectionName;
            return section;
        }

        #endregion
    }

BrokerHelper.cs代码:

 /// <summary>
    /// 代理人辅助类
    /// </summary>
    internal class BrokerHelper
    {
        private readonly string _broker;

        public BrokerHelper(string broker)
        {
            _broker = broker;
        }

        /// <summary>
        ///     获取代理的路由对象
        /// </summary>
        /// <returns></returns>
        public BrokerRouter GetBroker()
        {
            var options = new KafkaOptions(new Uri(string.Format("http://{0}", _broker)));
            return new BrokerRouter(options);
        }
    }

ConsumerHelper.cs代码:

 /// <summary>
    ///  消费者辅助类
    /// </summary>
    internal class ConsumerHelper
    {
        private readonly BrokerHelper _brokerHelper;

        private Consumer _consumer;

        private bool _unSub;

        public ConsumerHelper(BrokerHelper brokerHelper)
        {
            _brokerHelper = brokerHelper;
        }

        public void Sub(string topic, Action<string> onMsg)
        {
            _unSub = false;

            var opiton = new ConsumerOptions(topic, _brokerHelper.GetBroker());

            _consumer = new Consumer(opiton);

            Task.Run(() =>
            {
                while (!_unSub)
                {
                    IEnumerable<Message> msgs = _consumer.Consume();
                    Parallel.ForEach(msgs, msg => onMsg(Encoding.UTF8.GetString(msg.Value)));
                }
            });
        }


        public void UnSub()
        {
            _unSub = true;
        }
    }

ProduceHelper.cs代码:

/// <summary>
    /// 生产者辅助类
    /// </summary>
    internal class ProduceHelper : IDisposable
    {
        private readonly Producer _producer;
        private BrokerHelper _brokerHelper;

        public ProduceHelper(BrokerHelper brokerHelper)
        {
            _brokerHelper = brokerHelper;

            _producer = new Producer(_brokerHelper.GetBroker());
        }


        public void Dispose()
        {
            if (_producer != null)
                _producer.Dispose();
        }

        /// <summary>
        ///  发送消息到队列
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="datas"></param>
        /// <param name="acks"></param>
        /// <param name="timeout"></param>
        /// <param name="codec"></param>
        public void Pub(string topic, List<string> datas, short acks = 1, TimeSpan? timeout = default(TimeSpan?), MessageCodec codec = MessageCodec.CodecNone)
        {
            //var msgs = new List<Message>();
            //foreach (string item in datas)
            //{
            //    msgs.Add(new Message(item));
            //}
            var msgs = datas.Select(item => new Message(item)).ToList();

            _producer.SendMessageAsync(topic, msgs, acks, timeout, codec);
        }
    }

App.config

<?xml version="1.0" encoding="utf-8"?>

<configuration>
  <configSections>
    <section name="Test" type="xxxxx.sssss.KafkaConfig, xxxxx.sssss" />
  </configSections>
  <Test broker="127.0.0.1:9092" topic="TestSiso" />
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
  </startup>
</configuration>

运行结果如图:

这里写图片描述

推荐阅读