kafka0.8.2消费者实例
maven
org.apache.kafka
kafka_2.10
0.8.2.2
高级api实例
public class NativeConsumer {
private static final String TOPIC = "kafkatopic";
public void exec() throws UnsupportedEncodingException {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("auto.offset.reset","smallest");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);
ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map topicCountMap = new HashMap();
int localConsumerCount = 1;
topicCountMap.put(TOPIC, localConsumerCount);
Map>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
List> streams = consumerMap.get(TOPIC);
streams.stream().forEach(stream -> {
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
});
}
}
关键字:kafka
版权声明
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处。如若内容有涉嫌抄袭侵权/违法违规/事实不符,请点击 举报 进行投诉反馈!