카프카 설치 및 실습
카프카 브로커 클러스터 구성
docker compose를 사용하여 간단히 로컬에 카프카 브로커 클러스터를 구성할 수 있다.
Kafka 브로커 실행을 위해서는 여러 속성을 설정해주어야 한다.
카프카에서 나타내는 속성 이름과 docker compose에서 나타내는 속성 이름이 상이하지만 여기서는 카프카에서 나타내는 속성 이름을 기준으로 설명한다.
broker.id
: 실행하는 카프카 브로커의 번호를 직접 부여해주어야 한다. 브로커들을 구분하기 위해 부여하는 것이므로 다른 브로커와 겹치면 안된다.listeners
: 카프카 브로커의 통신을 위해 열어둘 인터페이스 IP, port, 프로토콜을 설정할 수 있다.advertised.listeners
: 카프카 클라이언트 또는 카프카 커맨드 라인 툴에서 접속 시 사용하는 IP, port 정보listener.security.protocol.map
: 보안 설정 시 사용할 프로토콜 매핑 지정num.network.threads
: 네트워크 처리를 위한 스레드 개수num.io.threads
: 카프카 브로커 내부에서 사용할 스레드 개수log.dirs
: 데이터를 저장할 디렉토리 위치num.partitions
: 토픽 당 파티션 개수를 지정할 수 있으며 파티션이 많아질수록 데이터 소비를 병렬로 많이 처리할 수 있게 된다.log.retention.ms
: 카프카 브로커가 저장한 파일이 삭제되기 까지 걸리는 시간 지정, -1로 설정하면 영원히 삭제되지 않는다.log.segment.bytes
: 카프카 브로커가 저장할 파일의 최대 크기log.retention.check.interval.ms
: 카프카 브로커가 저장한 파일을 삭제하기 위해 체크하는 간격(ms)
networks:
kafka_network:
volumes:
Kafka00:
driver: local
Kafka01:
driver: local
Kafka02:
driver: local
services:
Kafka00Service:
image: bitnami/kafka:3.5.1-debian-11-r44
restart: unless-stopped
container_name: Kafka00Container
ports:
- '10000:9094'
environment:
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://127.0.0.1:10000
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- kafka_network
volumes:
- "Kafka00:/bitnami/kafka"
Kafka01Service:
image: bitnami/kafka:3.5.1-debian-11-r44
restart: always
container_name: Kafka01Container
ports:
- '10001:9094'
environment:
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka01Service:9092,EXTERNAL://127.0.0.1:10001
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- kafka_network
volumes:
- "Kafka01:/bitnami/kafka"
Kafka02Service:
image: bitnami/kafka:3.5.1-debian-11-r44
restart: always
container_name: Kafka02Container
ports:
- '10002:9094'
environment:
- KAFKA_CFG_BROKER_ID=2
- KAFKA_CFG_NODE_ID=2
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka02Service:9092,EXTERNAL://127.0.0.1:10002
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- kafka_network
volumes:
- "Kafka02:/bitnami/kafka"
KafkaWebUiService:
image: provectuslabs/kafka-ui:latest
restart: always
container_name: KafkaWebUiContainer
ports:
- '8080:8080'
environment:
- KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
#- KAFKA_CLUSTERS_0_METRICS_PORT=9999
depends_on:
- Kafka00Service
- Kafka01Service
- Kafka02Service
networks:
- kafka_network
8080 포트에 브라우저로 접근하면 WebUi 서비스에 접근할 수 있다.

토픽 생성
도커 컨테이너에 exec으로 접근하여
/opt/bitnami/kafka/bin
디렉토리에서 아래와 같은 명령을 수행하면 토픽이 생성된다.
./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic hello.kafka
파티션 개수와 복제 개수, 토픽 데이터 유지 기간 옵션을 지정하여 토픽을 생성할 수도 있다.
partitions
: 최소 개수는 1개이며, 옵션을 사용하지 않으면 카프카 브로커 구동 시 설정했던num.partitions
옵션대로 설정된다.replication-factor
: 토픽의 파티션을 복제할 개수를 지정하며 1으로 지정하면 복제를 하지 않는다는 의미이고 2로 지정하면 복제본 1개를 사용하겠다는 의미이다.config
: kafka-topics.sh 명령에 포함되지 않은 추가적인 설정을 할 수 있다. retention.ms의 경우 토픽의 데이터를 유지하는 기간을 의미한다.
./kafka-topics.sh --create --bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1 --config retention.ms=172800000 \
--topic hello.kafka
토픽 조회
--list
인자를 주어 카프카 브로커 클러스터가 가지는 토픽들의 이름을 조회할 수 있다.
./kafka-topics.sh --bootstrap-server localhost:9092 --list

자세한 내용을 조회하려면
--describe --topic <토픽이름>
을 인자로 주면 된다.
./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka

토픽 옵션 수정
파티션 개수 변경을 위해서는 아래 명령어를 수행하면 된다. describe를 통해 확인해보면 실제로 파티션 개수가 변경된 것을 확인할 수 있다.
./kafka-topics.sh --bootstrap-server localhost:9092 --topic hello.kafka --alter --partitions 4

토픽 삭제를 위한 리텐션 기간 변경을 위해서는 아래 명령어를 수행하면 된다.
./kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello.kafka --alter --add-config retention.ms=86400000

토픽에 데이터 넣기
토픽에 넣는 데이터는 레코드라고 부르며 key-value 형태로 이루어진다. 아래와 같이 키가 없는 레코드를 입력할 수도 있고 키와 값의 구분자를 입력해 key-value 형태의 레코드를 입력할 수도 있다.
$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka
> hello
> kafka
$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka \
--property "parse.key=true" --property "key.separator=:"
> key1:no1
> key2:no2
이렇게 전달된 레코드는 토픽의 파티션에 저장된다.
기본 파티셔너를 사용하는 경우, 프로듀서가 파티션으로 레코드를 전달할 때 key가 존재한다면 해시 값에 따라 적절한 파티션으로 이동하고, key가 존재하지 않는다면 라운드 로빈으로 파티션 중 하나에 할당된다. 이러한 파티션 할당 방식은 커스텀하게 파티셔너를 구현해 변경할 수 있다.
토픽의 데이터 받기
아래 명령을 사용해 컨슈머를 통해 토픽의 데이터를 읽어올 수 있다.
--from-beginning
옵션을 통해 토픽에 들어온 레코드를 처음부터 조회할 수 있다.
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka \
--from-beginning
property
옵션을 통해 키 출력, 키 구분자 여부도 설정할 수 있다.--group 옵션을 통해 컨슈머 그룹을 설정할 수 있다. 컨슈머 그룹은 1개 이상의 컨슈머로 구성되며 조회한 토픽의 메시지에 대해 커밋을 한다. 커밋이란 컨슈머가 특정 레코드까지 처리를 완료했음을 알리기 위해 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것이다.
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka \
--property print.key=true --property key.separator="-" --group hello-group --from-beginning
컨슈머 그룹 확인
아래 명령을 사용해 존재하는 컨슈머 그룹들을 확인할 수 있다.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
아래 명령을 사용해 특정 컨슈머 그룹에 대한 정보를 확인할 수 있다.
CURRENT-OFFSET
: 컨슈머 그룹이 받고 있는 토픽의 파티션에 존재하는 가장 최신 오프셋이 몇 번인지 나타낸다.LOG-END-OFFSET
: 컨슈머 그룹의 컨슈머가 어느 오프셋까지 커밋했는지 알 수 있다.LAG
: 컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는 데에 얼마나 지연이 발생했는지 확인하기 위한 지표로, 컨슈머 그룹이 커밋한 오프셋과 파티션의 최신 오프셋 간의 차이를 나타낸다.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group hello-group --describe

간편 테스트
카프카 클러스터 설치 후 잘 동작하는지 확인해보기 위한 메시지를 자동으로 produce하고 consume하기 위해 아래 명령을 사용할 수 있다.
--max-message
옵션을 사용해 테스트할 메시지의 개수를 정할 수 있으며, 수행 시 가장 마지막줄에는 수행 결과의 통계값이 출력된다.
./kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --max-message 10 --topic verify-test
producer에서 발행한 메시지를 조회할 수 있다. 여기서는 consume한 메시지 개수 등의 정보가 출력된다.
./kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 --topic verify-test \
--group-id test-group
토픽 데이터 제거
delete.json 파일을 먼저 구성하고, 해당 json 파일을 기반으로 명령을 보내 토픽의 특정 데이터를 제거할 수 있다.
{
"partitions": [
{
"topic": "test,
"partition": 0,
"offset": 50
}
],
"version": 1
}
./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file delete.json
Last updated