자바, 인텔리제이 환경에서 아파치 카프카 컨슈머를 구현하여 토픽을 구독하고 이벤트를 읽어 들여보는 예제입니다.
카프카 설치가 로컬에 안되어 있으신 분은 아래 포스팅을 참고해 주세요.
컨슈머(Consumer) 구현
1. New Module에서 Gradle > Java를 선택해 주세요. Maven 프로젝트가 편하신 분들은 메이븐으로 선택하셔도 됩니다.
2. 모듈(프로젝트)가 생성되었으면 build.gradle(maven:pom.xml)파일을 열어 카프카 클라이언트 라이브러리 의존성을 추가해 줍니다. 저는 로그 확인을 위해 slf4j 모듈도 함께 추가해 줬습니다.
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0'
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
3. 실행할 메인 클래스(ConsumerMain)와 메인 메서드를 만들어 Consumer Client를 구현 후 실행시켜 줍니다.
* 코드 설명
4-1. 먼저 컨슈머 초기화를 위한 Properties 를 세팅해 줍니다.
- bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록
- key.deserializer : 메시지 키 역직렬화에 사용되는 클래스
- value.deserializer : 메시지 값을 역직렬화 하는데 사용되는 클래스
- group.id : 컨슈머 그룹 id
- enable.auto.commit : 자동 커밋 여부 (default : true)
4-2. Properties설정으로 KafkaConsumer 인스턴스를 생성해 줍니다.
4-3. 구독할 topic 목록(test)과 최소 배치 사이즈(minBatchSize) 를 지정해 줍니다.
4-4. 반복하며 consumer.poll() 메서드를 통해 설정한 토픽의 ConsumerRecords를 읽어옵니다.
4-5. ConsumerRecord를 반복하며 메시지를 읽고 수동 커밋(commitSync)해 줍니다.
public class ConsumerMain {
public static void main(String[] args) {
try {
// set kafka properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // kafka cluster
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// KEY_SERIALIZER
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // VALUE_SERIALIZER
props.put(ConsumerConfig.GROUP_ID_CONFIG,"testgroup");
//props.setProperty("enable.auto.commit", "false");
// init KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test")); // topic list
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // polling interval
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
System.out.println(buffer);
consumer.commitSync();
buffer.clear();
}
}
} catch (Exception e) {
System.out.println(e);
}
}
}
5. 구현한 컨슈머가 정상적으로 test토픽에서 이벤트를 읽어 오는지 확인하기 위해 먼저 이전에 예제로 만든 프로듀서 Application을 실행해 보도록 하겠습니다. 프로듀서 구현예제는 아래 포스팅을 참고해 주세요.
프로듀서가 실행되어 초기화 된 뒤 1초에 한 번씩 이벤트를 쓰는 것을 확인할 수 있습니다.
6. 실행시켜놨던 컨슈머 클라이언트에서는 test 토픽을 구독하여 프로듀서에서 쓴 레코드를 읽어 출력하는 것을 확인하실 수 있습니다.
'Program > Kafka' 카테고리의 다른 글
[Kafka] 쥬키퍼 실행시 exiting abnormally 오류 해결 (0) | 2022.05.11 |
---|---|
[Kafka] 프로듀서(Producer) 구현 - Java,IntelliJ (0) | 2022.04.06 |
[Kafka] 카프카 설치 및 실행, 이벤트 읽고 쓰기(CLI) (0) | 2022.04.02 |
[Kafka] 카프카란? 주요개념 및 용어 소개 (1) | 2022.04.01 |