자바, 인텔리제이 환경에서 아파치 카프카(Apache Kafka) 프로듀서 구현하여 간단히 이벤트를 날려보는 예제입니다.
카프카 설치 및 콘솔 컨슈머 테스트를 위한 CLI 세팅는 이전 포스팅을 참조해 주세요.
프로듀서(Producer) 구현
1. 먼저 프로젝트를 생성해 주세요. 이번 예제에서 저는 Empty 프로젝트를 생성하여 하위에 모듈로 추가하도록 하겠습니다.
2. New Module에서 Gradle > Java를 선택해 주세요. Maven 프로젝트가 편하신 분들은 메이븐으로 선택하셔도 됩니다.
3. 모듈(프로젝트)가 생성되었으면 build.gradle(maven:pom.xml)파일을 열어 카프카 클라이언트 라이브러리 의존성을 추가해 줍니다. 저는 로그 확인을 위해 slf4j 모듈도 함께 추가해 줬습니다.
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.1.0'
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
4. 실행할 메인 클래스(ProducerMain)와 메인 메서드를 만들어 Producer Client를 구현해 줍니다.
4-1. 먼저 프로듀서 초기화를 위한 Properties 를 세팅해 줍니다.
- bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록
- key.serializer : 메시지 키 직렬화에 사용되는 클래스
- value.serializer : 메시지 값을 직렬화 하는데 사용되는 클래스
4-2. Properties설정으로 KafkaProducer 인스턴스를 생성해 줍니다.
4-3. topic partition key data 값을 지정 ProducerRecord 인스턴스를 생성해 줍니다.
이번 예제에서는 간단하게 topic이름(test)와 data만 지정하여 전송해 보도록 하겠습니다.
4-4. producer.send(record) 메서드를 통해 작성한 레코드를 카프카클러스트에 전송합니다.
본 예제에서는 1초에 한 번씩 레코드를 전송하겠습니다.
package com.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerMain {
public static void main(String[] args) {
try {
// set kafka properties
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // kafka cluster
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// KEY_SERIALIZER
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // VALUE_SERIALIZER
// init KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
int idx = 0;
while(true){
// set ProducerRecord
String topic = "test"; // topic name
Integer partition = 0; // partition number (default: Round Robin)
String key = "key-" + idx; // key (default: null)
String data = "record-"+ idx; // data
ProducerRecord<String, String> record = new ProducerRecord<>(topic, data);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, key, data);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, partition, key, data);
// send record
producer.send(record);
System.out.println("producer.send() >> [topic:" + topic + "][data:" + data + "]");
Thread.sleep(1000);
idx++;
}
} catch (Exception e) {
System.out.println(e);
}
}
}
5. 카프카 클러스트 서버에서 Consumer Client(CLI) 를 실행 시켜 놓은 뒤 예제로 만든 프로듀서 Application을 실행해 보도록 하겠습니다.
먼저 설정한 값과 기본값들로 세팅되어 프로듀서가 초기화 된 뒤 1초에 한 번씩 이벤트를 쓰는 것을 확인할 수 있습니다.
그리고 실행시켜놨던 컨슈머 클라이언트에서는 test 토픽을 구독하여 프로듀서에서 쓴 레코드를 읽어 출력하는 것을 확인하실 수 있습니다.
Producer
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1649226688710
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition test-0 to 0 since the associated topicId changed from null to o7t58QhsSWiBIVbfAN6cSw
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-axTywFSZ6BoLzqAiIAtg
producer.send() >> [topic:test][data:record-0]
producer.send() >> [topic:test][data:record-1]
producer.send() >> [topic:test][data:record-2]
producer.send() >> [topic:test][data:record-3]
producer.send() >> [topic:test][data:record-4]
producer.send() >> [topic:test][data:record-5]
producer.send() >> [topic:test][data:record-6]
producer.send() >> [topic:test][data:record-7]
producer.send() >> [topic:test][data:record-8]
producer.send() >> [topic:test][data:record-9]
producer.send() >> [topic:test][data:record-10]
producer.send() >> [topic:test][data:record-11]
producer.send() >> [topic:test][data:record-12]
producer.send() >> [topic:test][data:record-13]
producer.send() >> [topic:test][data:record-14]
Consumer
➜ bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 —from-beginning --topic test
record-0
record-1
record-2
record-3
record-4
record-5
record-6
record-7
record-8
record-9
record-10
record-11
record-12
record-13
record-14
….
CLI가 아닌 Java로 컨슈머 클라이언트 구현을 해보고 싶으시면 아래 포스팅을 참고해 주셔요 :)
'Program > Kafka' 카테고리의 다른 글
[Kafka] 쥬키퍼 실행시 exiting abnormally 오류 해결 (0) | 2022.05.11 |
---|---|
[Kafka] 컨슈머(Consumer) 구현 - Java,IntelliJ (0) | 2022.05.11 |
[Kafka] 카프카 설치 및 실행, 이벤트 읽고 쓰기(CLI) (0) | 2022.04.02 |
[Kafka] 카프카란? 주요개념 및 용어 소개 (1) | 2022.04.01 |