본문으로 바로가기

[Kafka] 컨슈머(Consumer) 구현 - Java,IntelliJ

category Program/Kafka 2022. 5. 11. 15:01
 

자바, 인텔리제이 환경에서 아파치 카프카 컨슈머를 구현하여 토픽을 구독하고 이벤트를 읽어 들여보는 예제입니다.

카프카 설치가 로컬에 안되어 있으신 분은 아래 포스팅을 참고해 주세요.

 

[Kafka] 카프카 설치 및 실행, 이벤트 읽고 쓰기(CLI)

1. 카프카 설치 및 실행하기 설치에 앞서 아파치 카프카(Apache Kafka)에 대한 개요가 궁금하신 분들은 아래 포스팅을 참고해 주세요. [Kafka] 카프카란? 주요개념 및 용어 소개 카프카(Kafka)란? Apache Kaf

ifuwanna.tistory.com

컨슈머(Consumer) 구현 

 

1. New Module에서 Gradle > Java를 선택해 주세요.  Maven 프로젝트가 편하신 분들은 메이븐으로 선택하셔도 됩니다.

2. 모듈(프로젝트)가 생성되었으면 build.gradle(maven:pom.xml)파일을 열어 카프카 클라이언트 라이브러리 의존성을 추가해 줍니다. 저는 로그 확인을 위해 slf4j 모듈도 함께 추가해 줬습니다. 

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0'

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.1.0</version>
</dependency>

 

3. 실행할 메인 클래스(ConsumerMain)와 메인 메서드를 만들어 Consumer Client를 구현 후 실행시켜 줍니다. 

* 코드 설명 

4-1. 먼저 컨슈머 초기화를 위한 Properties 를 세팅해 줍니다.

  • bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록
  • key.deserializer : 메시지 키 역직렬화에 사용되는 클래스
  • value.deserializer : 메시지 값을 역직렬화 하는데 사용되는 클래스
  • group.id : 컨슈머 그룹 id
  • enable.auto.commit : 자동 커밋 여부 (default : true)

4-2. Properties설정으로  KafkaConsumer 인스턴스를 생성해 줍니다. 

4-3. 구독할 topic 목록(test)과 최소 배치 사이즈(minBatchSize) 를 지정해 줍니다.  

4-4. 반복하며 consumer.poll() 메서드를 통해 설정한 토픽의 ConsumerRecords를 읽어옵니다. 

4-5. ConsumerRecord를 반복하며 메시지를 읽고 수동 커밋(commitSync)해 줍니다. 

public class ConsumerMain {

    public static void main(String[] args) {

        try {
            // set kafka properties
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");  // kafka cluster
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// KEY_SERIALIZER
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // VALUE_SERIALIZER
            props.put(ConsumerConfig.GROUP_ID_CONFIG,"testgroup");
            //props.setProperty("enable.auto.commit", "false");

            // init KafkaConsumer
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

            consumer.subscribe(Arrays.asList("test")); // topic list
            final int minBatchSize = 200;

            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  // polling interval
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                    buffer.add(record);
                }
                if (buffer.size() >= minBatchSize) {
                    System.out.println(buffer);
                    consumer.commitSync();
                    buffer.clear();
                }
            }
        } catch (Exception e) {
            System.out.println(e);
        }

    }
}

 

5.  구현한 컨슈머가 정상적으로 test토픽에서 이벤트를 읽어 오는지 확인하기 위해 먼저 이전에 예제로 만든 프로듀서 Application을 실행해 보도록 하겠습니다.  프로듀서 구현예제는 아래 포스팅을 참고해 주세요.

 

[Kafka] 프로듀서(Producer) 구현 - Java,IntelliJ

자바, 인텔리제이 환경에서 아파치 카프카(Apache Kafka) 프로듀서 구현하여 간단히 이벤트를 날려보는 예제입니다. 카프카 설치 및 콘솔 컨슈머 테스트를 위한 CLI 세팅는 이전 포스팅을 참조해 주

ifuwanna.tistory.com

프로듀서가 실행되어 초기화 된 뒤 1초에 한 번씩 이벤트를 쓰는 것을 확인할 수 있습니다.

6.  실행시켜놨던 컨슈머 클라이언트에서는 test 토픽을 구독하여 프로듀서에서 쓴 레코드를 읽어 출력하는 것을 확인하실 수 있습니다.