博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka consumer demo
阅读量:5875 次
发布时间:2019-06-19

本文共 6039 字,大约阅读时间需要 20 分钟。

hot3.png

注意这里用的是spring-boot

config

import org.apache.kafka.clients.consumer.KafkaConsumer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;import java.util.Properties;@Componentpublic class KFKconfig {    @Value("${spring.kafka.bootstrap-servers}")    private String brokers;    @Value("${spring.kafka.consumer.group-id:id}")    private String group;    @Value("${enable.auto.commit}")    private boolean autocommit;    @Value("${auto.commit.interval.ms}")    private String autocommitInterval;    @Value("${session.timeout.ms}")    private String sessionTimeOut;    @Value("${max.poll.records}")    private String maxPollRecoeds;         @Bean    public KafkaConsumer consumer(){        Properties props = new Properties();        props.put("bootstrap.servers", brokers);        props.put("group.id", group);        props.put("enable.auto.commit", autocommit);        props.put("auto.commit.interval.ms", autocommitInterval);        props.put("auto.offset.reset", "earliest");        props.put("session.timeout.ms", sessionTimeOut);        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        KafkaConsumer
consumer = new KafkaConsumer<>(props); return consumer; }}

consumer

import com.seif.common.utils.RedisUtil;import com.seif.consumer.kafka.KFKconfig;import com.seif.consumer.kafka.management.AllMsgManagement;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import redis.clients.jedis.JedisCommands;import java.util.Arrays;import java.util.Date;import java.util.HashMap;import java.util.Map;@Componentpublic class KFKconsumer {    Logger logger = LoggerFactory.getLogger(KFKconsumer.class);    @Autowired    KFKconfig consumer;    @Autowired    AllMsgManagement msgma;    @Autowired    @Qualifier("DcpRedisUtil")    RedisUtil redisUtil;    @Value("${topic.name}")    private String topic;    @Value("${consumer.poll.time.out}")    private long timeout;    public void receive() {        KafkaConsumer cous = consumer.consumer();   //     InitializeOffset(cous);        while (true) {            cous.subscribe(Arrays.asList(topic));            ConsumerRecords
records = cous.poll(timeout); for (ConsumerRecord
record : records) { String message = (String) record.value(); String topic = record.topic(); logger.info(topic + " app_log--消费消息:" + message); long offset = record.offset(); int partition = record.partition(); System.out.println(record +"***"+topic +"***"+partition +"***"+partition+"***"+message); #这里是做监控和保存已提交的offset的,record 是我们取到的信息,自己保存,可忽略 将提交的offset记录到redis,下次重新启动consumer的时候,从redis取出最后一次保存的offset, 从新提交,那么consumer就会从这个offset开始往后读取数据, 而不必担心异常停机时有消息为处理完导致的数据丢失 // if (!msgma.sendMessage(message, offset, partition, topic)) { // String format = DateUtil.simpleDateFormate.get().get(DateUtil.format_second).format(new Date()); // JedisCommands jedis = redisUtil.getJedis(); // Map
map = new HashMap<>(); // map.put(topic + "-error@" + partition, offset + "@" + format); // jedis.hmset("monitor:dcp", map); // redisUtil.returnJedis(jedis); // return; // } else { // JedisCommands jedis = redisUtil.getJedis(); // Map
map = new HashMap<>(); // map.put(String.valueOf(partition),String.valueOf(offset)); // jedis.hmset("offset-"+topic, map); // cous.commitAsync(); // } // } } } // public void InitializeOffset(KafkaConsumer cous) { // JedisCommands jedis = redisUtil.getJedis(); // Map
map = jedis.hgetAll("offset-"+topic); // if (map != null) { // Map
hashMaps = new HashMap
(); // for (Map.Entry entry:map.entrySet()) { // hashMaps.put(new TopicPartition(topic, Integer.parseInt(entry.getKey().toString())), new OffsetAndMetadata(Long.parseLong(entry.getValue().toString()))); // } // cous.commitSync(hashMaps); // }// }}

main

@SpringBootApplication@EnableTransactionManagement@EnableAutoConfigurationpublic class DcpMsgConsumerApplication {    static final Logger logger = LoggerFactory.getLogger(DcpMsgConsumerApplication.class);    public static void main(String[] args) {        ConfigurableApplicationContext ac=  SpringApplication.run(DcpMsgConsumerApplication.class, args);        KFKconsumer consumer =ac.getBean(KFKconsumer.class);        consumer.receive();    }}

properties

application.properties

#kfktopic.name=testspring.kafka.bootstrap-servers=localhost:9095consumer.poll.time.out=1000enable.auto.commit=falseauto.commit.interval.ms=200session.timeout.ms=25000max.poll.records=200#redisredis.dcp.ip = 192.168.80.110redis.dcp.port = 8090redis.dcp.timout = 2000redis.dcp.password =1234redis.dcp.database =7jedis.pool.config.maxTotal = 1000jedis.pool.config.maxIdle = 10jedis.pool.config.minIdle = 1jedis.pool.config.maxWaitMillis = 30000jedis.pool.config.testOnBorrow = truejedis.pool.config.testOnReturn = truejedis.pool.config.testWhileIdle = true

转载于:https://my.oschina.net/ToFlySeif/blog/1802762

你可能感兴趣的文章
[转]eclipse中的两种Jre 及 Jre与Jdk的区别+[原创]在Myeclipse中配置Javadoc的方法
查看>>
团购网营销策划:推广策略及方法总结
查看>>
sql server2000 完全卸载
查看>>
C#综合揭秘——细说多线程(下)(转载)
查看>>
Visual Studio 2010,给项目添加Post-Build Event的批处理调用。第二次执行Build,就出错。请各位帮忙验证,谢谢!...
查看>>
<cf>Square
查看>>
全局变量的危险
查看>>
提高vs操作效率,vs 2008与vs 2010快捷键总结
查看>>
TABLE 多表头固定问题(基本jquery插件)
查看>>
Android之单元测试
查看>>
关于百度地图坐标转换接口的研究
查看>>
29.案例:递归
查看>>
activity
查看>>
优秀网页设计:25个精美的旅游网站设计示例
查看>>
深入浅出SQL Server中的死锁
查看>>
手把手玩转win8开发系列课程(12)
查看>>
App Store 审核指南
查看>>
cisco2950交换机上配置vtp
查看>>
一定话搞定CSS透明度问题
查看>>
一个WebRequest+代理+POST登录
查看>>