注意这里用的是spring-boot
config
import org.apache.kafka.clients.consumer.KafkaConsumer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;import java.util.Properties;@Componentpublic class KFKconfig { @Value("${spring.kafka.bootstrap-servers}") private String brokers; @Value("${spring.kafka.consumer.group-id:id}") private String group; @Value("${enable.auto.commit}") private boolean autocommit; @Value("${auto.commit.interval.ms}") private String autocommitInterval; @Value("${session.timeout.ms}") private String sessionTimeOut; @Value("${max.poll.records}") private String maxPollRecoeds; @Bean public KafkaConsumer consumer(){ Properties props = new Properties(); props.put("bootstrap.servers", brokers); props.put("group.id", group); props.put("enable.auto.commit", autocommit); props.put("auto.commit.interval.ms", autocommitInterval); props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", sessionTimeOut); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); return consumer; }}
consumer
import com.seif.common.utils.RedisUtil;import com.seif.consumer.kafka.KFKconfig;import com.seif.consumer.kafka.management.AllMsgManagement;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import redis.clients.jedis.JedisCommands;import java.util.Arrays;import java.util.Date;import java.util.HashMap;import java.util.Map;@Componentpublic class KFKconsumer { Logger logger = LoggerFactory.getLogger(KFKconsumer.class); @Autowired KFKconfig consumer; @Autowired AllMsgManagement msgma; @Autowired @Qualifier("DcpRedisUtil") RedisUtil redisUtil; @Value("${topic.name}") private String topic; @Value("${consumer.poll.time.out}") private long timeout; public void receive() { KafkaConsumer cous = consumer.consumer(); // InitializeOffset(cous); while (true) { cous.subscribe(Arrays.asList(topic)); ConsumerRecordsrecords = cous.poll(timeout); for (ConsumerRecord record : records) { String message = (String) record.value(); String topic = record.topic(); logger.info(topic + " app_log--消费消息:" + message); long offset = record.offset(); int partition = record.partition(); System.out.println(record +"***"+topic +"***"+partition +"***"+partition+"***"+message); #这里是做监控和保存已提交的offset的,record 是我们取到的信息,自己保存,可忽略 将提交的offset记录到redis,下次重新启动consumer的时候,从redis取出最后一次保存的offset, 从新提交,那么consumer就会从这个offset开始往后读取数据, 而不必担心异常停机时有消息为处理完导致的数据丢失 // if (!msgma.sendMessage(message, offset, partition, topic)) { // String format = DateUtil.simpleDateFormate.get().get(DateUtil.format_second).format(new Date()); // JedisCommands jedis = redisUtil.getJedis(); // Map map = new HashMap<>(); // map.put(topic + "-error@" + partition, offset + "@" + format); // jedis.hmset("monitor:dcp", map); // redisUtil.returnJedis(jedis); // return; // } else { // JedisCommands jedis = redisUtil.getJedis(); // Map map = new HashMap<>(); // map.put(String.valueOf(partition),String.valueOf(offset)); // jedis.hmset("offset-"+topic, map); // cous.commitAsync(); // } // } } } // public void InitializeOffset(KafkaConsumer cous) { // JedisCommands jedis = redisUtil.getJedis(); // Map map = jedis.hgetAll("offset-"+topic); // if (map != null) { // Map hashMaps = new HashMap (); // for (Map.Entry entry:map.entrySet()) { // hashMaps.put(new TopicPartition(topic, Integer.parseInt(entry.getKey().toString())), new OffsetAndMetadata(Long.parseLong(entry.getValue().toString()))); // } // cous.commitSync(hashMaps); // }// }}
main
@SpringBootApplication@EnableTransactionManagement@EnableAutoConfigurationpublic class DcpMsgConsumerApplication { static final Logger logger = LoggerFactory.getLogger(DcpMsgConsumerApplication.class); public static void main(String[] args) { ConfigurableApplicationContext ac= SpringApplication.run(DcpMsgConsumerApplication.class, args); KFKconsumer consumer =ac.getBean(KFKconsumer.class); consumer.receive(); }}
properties
application.properties
#kfktopic.name=testspring.kafka.bootstrap-servers=localhost:9095consumer.poll.time.out=1000enable.auto.commit=falseauto.commit.interval.ms=200session.timeout.ms=25000max.poll.records=200#redisredis.dcp.ip = 192.168.80.110redis.dcp.port = 8090redis.dcp.timout = 2000redis.dcp.password =1234redis.dcp.database =7jedis.pool.config.maxTotal = 1000jedis.pool.config.maxIdle = 10jedis.pool.config.minIdle = 1jedis.pool.config.maxWaitMillis = 30000jedis.pool.config.testOnBorrow = truejedis.pool.config.testOnReturn = truejedis.pool.config.testWhileIdle = true