본문으로 바로가기

1. 카프카 설치 및 실행하기

설치에 앞서 아파치 카프카(Apache Kafka)에 대한 개요가 궁금하신 분들은 아래 포스팅을 참고해 주세요. 

 

[Kafka] 카프카란? 주요개념 및 용어 소개

카프카(Kafka)란? Apache Kafka 는 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 오픈 소스 분산 이벤트 스트리밍 플랫폼(distributed event streaming platform

ifuwanna.tistory.com

 

1. 아래 사이트에서 카프카 최신 릴리즈(kafka_2.13-3.1.0.tgz)를 다운로드 받은 뒤 압축 해제후 폴더로 이동해 주세요.

https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz

$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0

2. config/server.properties 파일을 열어 Kafka 설정 을 변경해 주세요. (서버 정보 변경 필요시)

클라우드나 별도 서버를 통해 띄울 경우 listeners advertised.listeners 옵션의 주석을 해제하고 서버정보를 기재해주시면 됩니다. 저는 로컬에 서버를 띄울 것이므로 수정하지 않겠습니다. ( Default : localhost:9092)

$ vi config/server.properties 
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

3. 카프카 Broker를 띄우기 전에 먼저 ZooKeeper를 실행해 줍니다.

ZooKeeper카프카 클러스트의 메타데이터(브로커, 컨트롤러 ID 등)를 저장하는 서버로 현재는 Broker를 실행하려면 필수로 떠있어야 하는 서비스이나 향후 카프카 클러스트에 통합시킬 예정이라고 하니 참고해 주세요.

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

[2022-04-01 21:19:56,351] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
..... 실행 로그

4. 쥬키퍼가 정상적으로 떴으면 Kafka 서버를 실행해 줍니다.

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

[2022-04-01 21:26:03,452] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-04-01 21:26:03,732] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2022-04-01 21:26:03,813] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2022-04-01 21:26:03,816] INFO starting (kafka.server.KafkaServer)
..... 실행 로그 

5. 이후jps명령어를 통해 쥬키퍼, 카프카가 정상적으로 실행 됐는지 확인해보시면 됩니다. 

$ jps
90737 Jps
71585
89360 QuorumPeerMain
22913
89685 Kafka
22935 MicroProfileServerLauncher
22936 camel-lsp-server-1.1.0-SNAPSHOT.jar
22937 BootLanguagServerBootApp
87949 RemoteMavenServer36
65564

참고 - Kafka CLI(Command Line Interface - ShellScript)

설치한 카프카의 bin폴더에 카프카를 편리하게 사용할 수 있도록 다양한 쉘스크립트를 제공하고 있습니다. 이를 통해 터미널의 CLI 환경에서도 카프카에 이벤트를 쓰고 읽고 쓸 수가 있으며 자주 사용되는 쉘은 아래와 같습니다.

  • kafka-topics.sh : 토픽 생성, 조회, 수정 등 역할
  • kafka-console-consumer.sh토픽의 레코드 즉시 조회
  • kafka-console-producer.sh토픽의 레코드를 전달(String)
  • kafka-consumer-groups.sh : 컨슈머그룹 조회, 컨슈머 오프셋 확인, 수정

connect-distributed.sh
connect-mirror-maker.sh
connect-standalone.sh
kafka-acls.sh
kafka-broker-api-versions.sh
kafka-cluster.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
kafka-consumer-perf-test.sh
kafka-delegation-tokens.sh
kafka-delete-records.sh
kafka-dump-log.sh
kafka-features.sh
kafka-get-offsets.sh
kafka-leader-election.sh
kafka-log-dirs.sh
kafka-metadata-shell.sh
kafka-mirror-maker.sh
kafka-producer-perf-test.sh
kafka-reassign-partitions.sh
kafka-replica-verification.sh
kafka-run-class.sh
kafka-server-start.sh
kafka-server-stop.sh
kafka-storage.sh
kafka-streams-application-reset.sh
kafka-topics.sh
kafka-transactions.sh
kafka-verifiable-consumer.sh
kafka-verifiable-producer.sh
trogdor.sh
zookeeper-security-migration.sh
zookeeper-server-start.sh
zookeeper-server-stop.sh
zookeeper-shell.sh

2. 토픽 생성 및 조회

 토픽(Topic)이란 메시지를 구분하는 단위입니다. 먼저 위에서 실행한 카프카 브로커에   토픽(Topic)을 생성해 보도록 하겠습니다.

생성될 토픽 정보는 아래와 같습니다. 

  • 토픽이름 : test
  • replication-factor : 복제 대상 파티션 개수 1개 (없음)
  • partitions : 파티션 개수 3개
$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test
Created topic test.

# kafka_2.13-3.1.0/logs/controller.log
[2022-04-02 01:27:19,822] INFO [Controller id=0] New topics: [Set(test)], deleted topics: [HashSet()], new partition replica assignment [Set(TopicIdReplicaAssignment(test,Some(txyHqn4fSoqvUk4KrW39hg),Map(test-0 -> ReplicaAssignment(replicas=0, addingReplicas=, removingReplicas=), test-1 -> ReplicaAssignment(replicas=0, addingReplicas=, removingReplicas=), test-2 -> ReplicaAssignment(replicas=0, addingReplicas=, removingReplicas=))))] (kafka.controller.KafkaController)
[2022-04-02 01:27:19,822] INFO [Controller id=0] New partition creation callback for test-0,test-1,test-2 (kafka.controller.KafkaController)

참고로 ..logs/controller.log 에서 카프카 로그를 확인해 보실 수 있습니다.

그럼 토픽이 정상적으로 생성되었는지 —describe 옵션을 통해 조회해 보겠습니다.

$ bin ./kafka-topics.sh --describe --bootstrap-server localhost:9092
Topic: test    TopicId: txyHqn4fSoqvUk4KrW39hg    PartitionCount: 3    ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 2    Leader: 0    Replicas: 0    Isr: 0

정상적으로 test 토픽이 파티션 3개와 각각의 복제본(Replicas)은 없는 상태로 생성된 것을 확인하실 수 있습니다.

3. 이벤트 쓰기 (Producer - write)

Kafka 클라이언트는 이벤트 쓰기(+읽기)를 위해 네트워크를 통해 Kafka 브로커와 통신하며 브로커는 이벤트를 수신하면 설정에 따라 필요한 기간 동안 이벤트를 저장합니다.(파일)

kafka-console-producer.sh 를 이용하여 생성했던 브로커내 test 토픽에 hello ifuwanna kafka 총 3개의 이벤트(레코드)를 발행하여 test 토픽(파티션)에 저장해 보겠습니다.

$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>hello
>ifuwanna
>kafka
#producer client를 종료하려면 Ctrl+C

4-1. 이벤트 읽기 (Consumer - read)

 kafka-console-producer.sh 를 통해 test 토픽에 저장했던 이벤트(레코드)들을 읽어와 보도록 하겠습니다. 별도의 컨슈머 그룹 없이 --from-beginning 옵션을 통해 최초 시작지점의 오프셋부터 데이터를 가져오는 예제입니다.

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
hello
kafka
ifuwanna
  • 컨슈머가 데이터를 polling()해서 Producer가 넣은 이벤트 3개 hello ifuwanna kafka 모두 조회해 온 것을 확인 하실 수 있습니다.
  • 데이터가 순서대로 출력되지 않는 이유는 test토픽의 파티션이 3개이기 때문입니다. 컨슈머는 모든 파티션에 할당이 되었을 뿐이지 가져오는 것에는 순서가 없습니다. 순서를 유지하려면 파티션 하나를 지정하여 메시지를 관리해야 합니다.
  • 이벤트는 Kafka에 영구적으로 저장되기 때문에 원하는 만큼 많은 소비자가 이벤트를 읽을 수 있습니다. 또 다른 터미널 세션을 열고 이전 명령을 다시 실행하여 이를 쉽게 확인할 수 있습니다.

4-2. 이벤트 읽기 (Consumer group)

여러개의 컨슈머의 논리그룹인 Consumer group은 이벤트를 처리할때 해당 그룹의 컨슈머들이 이벤트를 어디까지 처리했는지Consumer Offset을 통해 기억하고 그 이후 오프셋의 이벤트부터 처리해 나갑니다.

—group 옵션을 추가하여 아래 예제로 컨슈머 그룹이 어떻게 동작하는지 확인해 보겠습니다.

 

1. testgroup 이라는 컨슈머그룹을 지정하여 test 토픽에 들어있는 기존 이벤트 3개를 모두 읽어 들임

2. 다시 프로듀서를 통해 test 토픽에 1,2,3 이라는 이벤트를 신규로 추가

# 1.test 토픽의 기존 이벤트 읽기
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 -group testgroup  --from-beginning --topic test
hello
kafka
ifuwanna

# 2. test 토픽에 1,2,3 이벤트 추가
$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>1
>2
>3

현재 test 토픽에 총 6개의 이벤트를 넣었고 (hello ifuwanna kafka 1 2 3) testgroup 컨슈머 그룹에서 이중 3개를 읽어들인 상태입니다. 이상태에서 --describe 옵션을 통해 testgroup 그룹의 상태를 확인해 보겠습니다.

# 3. testgroup 컨슈머그룹의 현재 상태 확인 
$ bin ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --describe
Consumer group 'testgroup' has no active members.
GROUP           TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID  HOST CLIENT-ID
testgroup       test       0          1               1            0      -           -      -
testgroup       test       1          2               2            0      -           -      -
testgroup       test       2          0               3            3      -           -      -
  • CURRENT-OFFSET LOG-END-OFFSET 모두 Partition#0은 1, Partition#1은 2인 것으로 보아 먼저 들어갔던 3개의 데이터는 파티션0,1에 할당되어 처리된 것을 알 수 있습니다.
  • Partition#2의 CURRENT-OFFSET은 0이고 LOG-END-OFFSET 은 3인걸로 보아 추가 데이터(1,2,3)은 모두 Partition#2에 할당되었고 아직 해당 그룹에서 읽어 들이지 않았습니다.

해당 컨슈머그룹에서 위에서 추가로 넣었던 이벤트 1,2,3을 읽어 온 뒤 상태를 다시 확인해 보도록 하겠습니다.

# 4. test토픽의 나머지 1,2,3 이벤트 읽어들임
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 -group testgroup  --from-beginning --topic test
1
2
3
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --describe
# 5. testgroup 컨슈머그룹의 현재 상태 확인 
GROUP           TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID  HOST CLIENT-ID
testgroup       test       0          1               1            0      -           -      -
testgroup       test       1          2               2            0      -           -      -
testgroup       test       2          3               3            0      -           -      -
  • 해당 컨슈머그룹에서 기존에 읽었던 3개의 이벤트(hello ifuwanna kafka)는 제외하고 그 이후 오프셋의 이벤트부터 처리하여 (1,2,3) 된 것을 확인 하실 수 있습니다.

4-3. Consumer group 오프셋 재설정

컨슈머 그룹에서 구독중인 토픽(파티션)의 offset을 변경하고 싶은 경우 --reset-offsets 을 사용해여 offset을 재설정 할 수 있습니다.

# testgroup이 구독하는 test토픽의 모든 파티션의 오프셋을 가장 빠른 offset 으로 재설정
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
testgroup                      test                           0          0
testgroup                      test                           1          0
testgroup                      test                           2          0

# testgroup이 구독하는 test 토픽 1번 파티션의 오프셋을 1로 재설정 
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --topic test:1 --reset-offsets --to-offset 1 --execute
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
testgroup                      test                           1          1

5. KAFKA 환경 종료

카프카를 종료하려면 콘솔에서 Ctrl-C 로 아래 순서대로 종료해 주시면됩니다.

  1. 생산자 및 소비자 클라이언트 중지
  2. Kafka 브로커를 중지
  3. ZooKeeper 서버 중지

정리

지금까지 간단하게 콘솔을 통해 카프카를 실행하고 토픽을 생성한 뒤 Producer Client를 이용하여 이벤트(레코드)를 브로커의 토픽(파티션)에 저장하고 Consumer Client로 이벤트를 읽어와 봤습니다.

 

 실제 운영환경이라면 카프카 서버(브로커)는 각각 여러대의 별도 서버(클라우드, 온디맨드, 도커 등)으로 구성되어 있을 것이고 쉘스크립트로 실행한 Prodecer,Consumer도 아키텍쳐에 맞게 별도의 서비스로 분리가 되어 유기적으로 구성이 되어야 합니다.

Reference

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org