博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka java api消费者
阅读量:5999 次
发布时间:2019-06-20

本文共 1873 字,大约阅读时间需要 6 分钟。

import java.util.HashMap;

import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer {

private final ConsumerConnector consumer;
private KafkaConsumer() {
Properties props = new Properties();
// zookeeper 配置
props.put("zookeeper.connect", "192.168.170.185:2181");
// 消费者所在组
props.put("group.id", "testgroup");
// zk连接超时
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
// 序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}
void consume() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
ConsumerIterator<String, String> it = stream.iterator();
int messageCount = 0;
while (it.hasNext()){
System.out.println(it.next().message());
messageCount++;
if(messageCount == 100){
System.out.println("Consumer端一共消费了" + messageCount + "条消息!");
}
}
}
public static void main(String[] args) {
new KafkaConsumer().consume();
}
}

转载于:https://www.cnblogs.com/wangjing666/p/6860751.html

你可能感兴趣的文章
为什么要用java重写logstash
查看>>
Dart基础-类
查看>>
Git远程02:git clone都做了什么
查看>>
BSD vi/vim 命令大全(下)[转]
查看>>
SQL Server 2008开启sa用户名和远程连接
查看>>
css3中变形与动画(一)
查看>>
Linux 系统查看命令
查看>>
Spring中我们用到的功能实现:基于注解的Ioc自动装配
查看>>
mac book新手入门-快捷键
查看>>
蚂蚁金服安全产品技术资深总监冯春培:用生态的力量解决安全生态的问题
查看>>
SQLite的使用二
查看>>
log4net的配置与使用
查看>>
C# 视频监控系列(2):客户端——封装API
查看>>
angularjs $apply 数据绑定
查看>>
IOS Core Image之二
查看>>
[XMove-自主设计的体感解决方案] 系统综述
查看>>
设计模式 ( 十五 ) 中介者模式Mediator(对象行为型)
查看>>
【LINUX学习】磁盘分割之建立primary和logical 分区
查看>>
【YUM】第三方yum源rpmforge
查看>>
Qt之模型/视图(自定义进度条)
查看>>