본문으로 바로가기

자바, 인텔리제이 환경에서 아파치 카프카(Apache Kafka) 프로듀서 구현하여 간단히 이벤트를 날려보는 예제입니다. 

카프카 설치 및 콘솔 컨슈머 테스트를 위한 CLI 세팅는 이전 포스팅을 참조해 주세요.

 

[Kafka] 카프카 설치 및 실행, 이벤트 읽고 쓰기(CLI)

1. 카프카 설치 및 실행하기 설치에 앞서 아파치 카프카(Apache Kafka)에 대한 개요가 궁금하신 분들은 아래 포스팅을 참고해 주세요. [Kafka] 카프카란? 주요개념 및 용어 소개 카프카(Kafka)란? Apache Kaf

ifuwanna.tistory.com

프로듀서(Producer) 구현 

1. 먼저 프로젝트를 생성해 주세요. 이번 예제에서 저는 Empty 프로젝트를 생성하여 하위에 모듈로 추가하도록 하겠습니다.   

 

2. New Module에서 Gradle > Java를 선택해 주세요.  Maven 프로젝트가 편하신 분들은 메이븐으로 선택하셔도 됩니다.

3. 모듈(프로젝트)가 생성되었으면 build.gradle(maven:pom.xml)파일을 열어 카프카 클라이언트 라이브러리 의존성을 추가해 줍니다. 저는 로그 확인을 위해 slf4j 모듈도 함께 추가해 줬습니다. 

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0'

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.1.0</version>
</dependency>

 

4. 실행할 메인 클래스(ProducerMain)와 메인 메서드를 만들어 Producer Client를 구현해 줍니다. 

4-1. 먼저 프로듀서 초기화를 위한 Properties 를 세팅해 줍니다.

  • bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록
  • key.serializer : 메시지 키 직렬화에 사용되는 클래스
  • value.serializer : 메시지 값을 직렬화 하는데 사용되는 클래스

4-2. Properties설정으로  KafkaProducer 인스턴스를 생성해 줍니다. 

4-3. topic partition key data 값을 지정 ProducerRecord 인스턴스를 생성해 줍니다.

이번 예제에서는 간단하게 topic이름(test)와 data만 지정하여 전송해 보도록 하겠습니다. 

4-4. producer.send(record) 메서드를 통해 작성한 레코드를 카프카클러스트에 전송합니다.

본 예제에서는 1초에 한 번씩 레코드를 전송하겠습니다.

package com.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerMain {

    public static void main(String[] args) {

        try {
            // set kafka properties
            Properties configs = new Properties();
            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");  // kafka cluster
            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// KEY_SERIALIZER
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // VALUE_SERIALIZER

            // init KafkaProducer
            KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

            int idx = 0;
            while(true){

                // set ProducerRecord
                String topic = "test";           // topic name
                Integer partition = 0;           // partition number (default: Round Robin)
                String key = "key-" + idx;       // key  (default: null)
                String data = "record-"+ idx;    // data

                ProducerRecord<String, String> record = new ProducerRecord<>(topic, data);
                ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, key, data);
                ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, partition, key, data);

                // send record
                producer.send(record);

                System.out.println("producer.send() >> [topic:" + topic + "][data:" + data + "]");
                Thread.sleep(1000);
                idx++;
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

 

5. 카프카 클러스트 서버에서 Consumer Client(CLI) 를 실행 시켜 놓은 뒤  예제로 만든 프로듀서 Application을 실행해 보도록 하겠습니다. 

먼저 설정한 값과 기본값들로 세팅되어 프로듀서가 초기화 된 뒤 1초에 한 번씩 이벤트를 쓰는 것을 확인할 수 있습니다.

그리고 실행시켜놨던 컨슈머 클라이언트에서는 test 토픽을 구독하여 프로듀서에서 쓴 레코드를 읽어 출력하는 것을 확인하실 수 있습니다.

Producer 

[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
    acks = -1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1649226688710
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition test-0 to 0 since the associated topicId changed from null to o7t58QhsSWiBIVbfAN6cSw
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-axTywFSZ6BoLzqAiIAtg
producer.send() >> [topic:test][data:record-0]
producer.send() >> [topic:test][data:record-1]
producer.send() >> [topic:test][data:record-2]
producer.send() >> [topic:test][data:record-3]
producer.send() >> [topic:test][data:record-4]
producer.send() >> [topic:test][data:record-5]
producer.send() >> [topic:test][data:record-6]
producer.send() >> [topic:test][data:record-7]
producer.send() >> [topic:test][data:record-8]
producer.send() >> [topic:test][data:record-9]
producer.send() >> [topic:test][data:record-10]
producer.send() >> [topic:test][data:record-11]
producer.send() >> [topic:test][data:record-12]
producer.send() >> [topic:test][data:record-13]
producer.send() >> [topic:test][data:record-14]

Consumer 

➜  bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 —from-beginning --topic test
record-0
record-1
record-2
record-3
record-4
record-5
record-6
record-7
record-8
record-9
record-10
record-11
record-12
record-13
record-14
….

CLI가 아닌 Java로 컨슈머 클라이언트 구현을 해보고 싶으시면 아래 포스팅을 참고해 주셔요 :)

 

[Kafka] 컨슈머(Consumer) 구현 - Java,IntelliJ

자바, 인텔리제이 환경에서 아파치 카프카 컨슈머를 구현하여 토픽을 구독하고 이벤트를 읽어 들여보는 예제입니다. 카프카 설치가 로컬에 안되어 있으신 분은 아래 포스팅을 참고해 주세요.

ifuwanna.tistory.com