kafka0.8.2消费者实例

maven


    org.apache.kafka
    kafka_2.10
    0.8.2.2

高级api实例

public class NativeConsumer {

    private static final String TOPIC = "kafkatopic";

    public void exec() throws UnsupportedEncodingException {
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("auto.offset.reset","smallest");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "true");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200"); 
        props.put("auto.commit.interval.ms", "1000");
        ConsumerConfig consumerConfig =  new kafka.consumer.ConsumerConfig(props);
        ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

        Map topicCountMap = new HashMap();
        int localConsumerCount = 1;
        topicCountMap.put(TOPIC, localConsumerCount);
        Map>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);
        List> streams = consumerMap.get(TOPIC);
        streams.stream().forEach(stream -> {
            ConsumerIterator it = stream.iterator();
            while (it.hasNext()) {
                System.out.println(new String(it.next().message()));
            }
        });
    }
}

关键字:kafka

版权声明

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处。如若内容有涉嫌抄袭侵权/违法违规/事实不符,请点击 举报 进行投诉反馈!

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部