博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
开发简单的Kafka应用
阅读量:4550 次
发布时间:2019-06-08

本文共 5601 字,大约阅读时间需要 18 分钟。

  之前基于集群和单机安装过kafka,现在利用kafka提供的API构建一个简单的生产者消费者的项目示例,来跑通kafka的流程,具体过程如下:

  首先使用eclipse for javaee建立一个maven项目,然后在pom.xml添加如下依赖配置:

org.apache.kafka
kafka_2.9.2
0.8.2.2

  这里kafka版本是kafka_2.9.2-0.8.2.2,保存之后maven会自动下载依赖,注意要关闭windows防火墙,尽量专用网络和外网都要关闭,否则下载的很慢,下载好之后就可以编写项目代码了,这里的pom.xml所有配置如下:

1 
3
4.0.0
4 5
kafkatest
6
kafkatest
7
0.0.1-SNAPSHOT
8
jar
9 10
kafkatest
11
http://maven.apache.org
12 13
14
UTF-8
15
16 17
18
19
junit
20
junit
21
3.8.1
22
test
23
24
25
org.apache.kafka
26
kafka_2.9.2
27
0.8.2.2
28
29
30

  然后,我们建立一个简单生产者类SimpleProducer,代码如下:

1 package test; 2  3 import java.util.Properties; 4  5 import kafka.javaapi.producer.Producer; 6 import kafka.producer.KeyedMessage; 7 import kafka.producer.ProducerConfig; 8  9 public class SimpleProducer {10         private static Producer
producer;11 private final Properties props=new Properties();12 public SimpleProducer(){13 //定义连接的broker list14 props.put("metadata.broker.list", "192.168.1.216:9092");15 //定义序列化类 Java中对象传输之前要序列化16 props.put("serializer.class", "kafka.serializer.StringEncoder");17 producer = new Producer
(new ProducerConfig(props));18 }19 public static void main(String[] args) {20 SimpleProducer sp=new SimpleProducer();21 //定义topic22 String topic="mytopic";23 24 //定义要发送给topic的消息25 String messageStr = "This is a message";26 27 //构建消息对象28 KeyedMessage
data = new KeyedMessage
(topic, messageStr);29 30 //推送消息到broker31 producer.send(data);32 producer.close();33 }34 }

  类的代码很简单,我这里是kafka单机环境端口就是kafka broker端口9092,这里定义topic为mytopic当然可以自己随便定义不用考虑服务器是否创建,对于发送消息的话上面代码是简单的单条发送,如果发送数据量很大的话send方法多次推送会耗费时间,所以建议把data数据按一定量分组放到List中,最后send一下AarrayList即可,这样速度会大幅度提高

  接下来写一个简单的消费者类SimpleHLConsumer,代码如下:

1 package test; 2  3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 import java.util.Properties; 7  8 import kafka.consumer.Consumer; 9 import kafka.consumer.ConsumerConfig;10 import kafka.consumer.ConsumerIterator;11 import kafka.consumer.KafkaStream;12 import kafka.javaapi.consumer.ConsumerConnector;13 14 public class SimpleHLConsumer {15         private final ConsumerConnector consumer;16         private final String topic;17 18         public SimpleHLConsumer(String zookeeper, String groupId, String topic) {19                 Properties props = new Properties();20                 //定义连接zookeeper信息21                 props.put("zookeeper.connect", zookeeper);22                 //定义Consumer所有的groupID23                 props.put("group.id", groupId);24                 props.put("zookeeper.session.timeout.ms", "500");25                 props.put("zookeeper.sync.time.ms", "250");26                 props.put("auto.commit.interval.ms", "1000");27                 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));28                 this.topic = topic;29         }30 31         public void testConsumer() {32                 Map
topicCount = new HashMap
();33 //定义订阅topic数量34 topicCount.put(topic, new Integer(1));35 //返回的是所有topic的Map36 Map
>> consumerStreams = consumer.createMessageStreams(topicCount);37 //取出我们要需要的topic中的消息流38 List
> streams = consumerStreams.get(topic);39 for (final KafkaStream stream : streams) {40 ConsumerIterator
consumerIte = stream.iterator();41 while (consumerIte.hasNext())42 System.out.println("Message from Topic :" + new String(consumerIte.next().message()));43 }44 if (consumer != null)45 consumer.shutdown();46 }47 48 public static void main(String[] args) {49 String topic = "mytopic";50 SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.1.216:2181/kafka", "testgroup", topic);51 simpleHLConsumer.testConsumer();52 }53 54 }

  消费者代码主要逻辑就是对生产者发送过来的数据做简单处理和输出,注意这里的地址是zookeeper的地址并且包括节点/kafka,topic名称要一致

  上面两个类已经可以实现消息的生产和消费了,但是现在服务器需要做一定的配置才可以,否则会抛出异常,就是在之前配置的server.properties基础之上进行修改,进入kafka安装目录下,使用命令 vim config/server.properties 打开配置文件,找到host.name这个配置,首先去掉前面的#注释,然后把默认的localhost改成IP地址192.168.1.216,因为eclipse远程运行代码时读取到localhost再执行时就是提交到本地了,所以会抛出异常,当然把代码打成jar包在服务器运行就不会出现这样的问题了,这里要注意:

  

  修改之后保存并退出,然后确保zookeeper的正常运行

  如果之前kafka正在运行,那么就执行 bin/kafka-server-stop.sh  停止kafka服务,然后再执行

   nohup bin/kafka-server-start.sh config/server.properties >> /dev/null & 启动服务,如果原来就是停止的,那么直接启动即可

  启动之后先运行启动消费者,消费者处于运行等待

  

  然后启动生产者发送消息,生产者发送完成立即关闭,消费者消费输出如下:

  

  到这里,就完成了kafka从生产到消费简单示例的开发,消息队列可以跑通了

 

转载于:https://www.cnblogs.com/freeweb/p/5291134.html

你可能感兴趣的文章
使用最快的方法计算2的16次方是多少?
查看>>
urllib 中的异常处理
查看>>
【SQL Server高可用性】高可用性概述
查看>>
通过SQL Server的扩展事件来跟踪SQL语句在运行时,时间都消耗到哪儿了?
查看>>
SQL优化:重新编译存储过程和表
查看>>
PCB“有铅”工艺将何去何从?
查看>>
Solr环境搭建
查看>>
IE兼容性的一些。。
查看>>
第二章-递归与分治策略
查看>>
快速排查SQL服务器阻塞语句
查看>>
推荐系统常用数据集
查看>>
stack
查看>>
spring-boot+nginx+tomcat+ssl配置笔记
查看>>
查找轮廓(cv2.findCountours函数)
查看>>
动态规划:插头DP
查看>>
离线下载解决Nuget程序包及其依赖包的方法
查看>>
react中的refs
查看>>
使用cvCanny方法边缘检测出现的错误
查看>>
redhat6.5安装oracle 11g
查看>>
Using View and Data API with Meteor
查看>>