1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.ACKS_CONFIG, "all"); config.put(ProducerConfig.RETRIES_CONFIG, 3); config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); config.put(ProducerConfig.LINGER_MS_CONFIG, 10); config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory()); template.setProducerListener(new ProducerListener<String, String>() { @Override public void onSuccess(ProducerRecord<String, String> record, RecordMetadata metadata) { log.info("发送成功:topic={}, partition={}, offset={}", metadata.topic(), metadata.partition(), metadata.offset()); } @Override public void onError(ProducerRecord<String, String> record, RecordMetadata metadata, Exception exception) { log.error("发送失败", exception); } }); return template; } }
|