Consumer
개념
프로듀서가 브로커에 전송한 데이터들을 가져와 사용하기 위해 브로커에 요청을 보내는 클라이언트이다.
1개 이상의 컨슈머로 이뤄진 컨슈머 그룹을 운영하거나 토픽의 특정 파티션만 구독하는 컨슈머를 운영할 수 있다.
컨슈머 그룹
컨슈머 그룹으로 묶인 컨슈머들은 토픽의 여러 파티션들에 할당되어 데이터를 가져갈 수 있다. 그리고 1개의 파티션은 최대 1개의 컨슈머에만 할당이 가능하다.
따라서 컨슈머 그룹의 컨슈머 개수는 토픽의 파티션 개수보다 같거나 작아야 한다. 예를 들어 3개의 파티션을 갖는 토픽을 효과적으로 처리하려면 3개 이하로 이뤄진 컨슈머 그룹으로 운영해야 한다.
현재 운영하고 있는 토픽의 데이터가 어디에 적재되는지, 어떻게 처리되는지 파악하여 컨슈머 그룹을 최대한 나누는 것이 좋다.
예를 들어 elastic search와 hadoop에 데이터를 동기로 적재하던 파이프라인을 카프카를 통해 각각의 저장소에 비동기로 적재하도록 변경할 수 있다. 이 때 컨슈머 그룹을 분리하지 않으면 한쪽 저장소에 장애가 발생했을 때 적재에 대해 지연이 발생할 수 있다.
리밸런싱
컨슈머가 추가/제거되는 상황에 리밸런싱이 발생한다.
컨슈머 그룹의 컨슈머 중 일부에 장애가 발생(ex. 연결 불가)하면 해당 컨슈머에 할당되어 있던 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어간다.
언제든지 발생할 수 있기 때문에 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 컨슈머 클라이언트에 작성해두어야 한다.
파티션의 소유권을 재할당할 때 다른 컨슈머들이 토픽의 데이터를 읽을 수 없으므로 리밸런싱은 자주 발생하면 안된다.
카프카 브로커 중 한 프로세스가 그룹 코디네이터가 되어 컨슈머 그룹의 컨슈머 추가/삭제를 감지하고 리밸런싱을 발동시킨다.
커밋
컨슈머가 브로커로부터 어느 데이터까지 가져갔는지 커밋을 통해 기록한다.
카프카 브로커의 내부 토픽인
__consumer_offsets
에 특정 토픽의 파티션을 어떤 컨슈머 그룹이 어느 레코드까지 읽었는지에 대한 정보가 기록된다. 만약 이 토픽에 오프셋 커밋이 기록되지 않는다면 데이터를 중복해서 처리할 수 있기 때문에 컨슈머 애플리케이션은 오프셋 커밋이 정상적으로 처리되었는지 검증해야 한다.비명시 오프셋 커밋
일정 간격마다 오프셋을 커밋하도록 하는 것이다.
poll 메서드가 호출되었을 때
auto.commit.interval.ms
만큼 주기가 지났다면 해당 시점까지 읽은 레코드의 오프셋을 커밋한다.이 방식은 편리하지만 poll 메서드 호출 이후에 리밸런싱 또는 컨슈머 강제 종료 발생 시 컨슈머가 처리하는 데이터가 중복되거나 유실될 수 있다.
명시 오프셋 커밋
poll 메서드 호출 이후 반환받은 데이터의 처리가 완료되면 commitSync/commitAsync 메서드를 호출하여 가져온 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행하도록 한다.
commitSync의 경우 커밋 요청 및 응답에 시간이 오래 걸릴 수 있으며, commitAsync의 경우 커밋 요청이 실패했을 때 현재 처리 중인 데이터 순서를 보장하지 못해 데이터 중복 처리가 발생할 수 있다.
Kafka Client로 컨슈머 구현하기
자바를 통해 컨슈머를 구현하기 위해서는 Kafka Client의 클래스들을 사용할 수 있다.
컨슈머 그룹을 지정하여 컨슈머의 목적을 구분할 수 있다. 컨슈머 그룹을 기준으로 컨슈머 오프셋을 관리하기 때문에, subscribe 메서드를 통해 토픽을 구독하는 경우 컨슈머 그룹을 선언하여 프로그램이 재시작되더라도 해당 오프셋 이후의 데이터부터 처리되도록 해야 한다.
프로듀서에서 지정한 직렬화 타입으로 역직렬화해야 한다.
subscribe 메서드는 컨슈머에게 토픽을 할당한다.
poll 메서드는 데이터를 브로커로부터 가져와 ConsumerRecord 타입을 반환한다. 이 때 Duration 타입을 인자로 받아 컨슈머 버퍼에 데이터를 기다리기 위한 타임아웃 간격을 설정한다.
poll 메서드를 호출하는 시점에 데이터를 브로커로부터 가져오는 것이 아니라 컨슈머 애플리케이션 실행 시 생성되는 Fetcher 인스턴스에 의해 미리 레코드들을 내부 큐에 저장해두고 사용자가 poll 메서드를 호출하면 내부 큐에 있는 레코드를 반환한다.
명시적 커밋
동기 커밋
아래와 같이 동기 방식의 커밋 메서드를 호출할 수 있다.
개별 레코드 단위로 매번 오프셋을 커밋하고 싶다면 동기 방식의 커밋 메서드에 파티션객체와 오프셋 객체 맵을 입력하면 된다.
이 때 현재 처리한 오프셋에 1을 더한 값을 오프셋으로 입력해야 한다. 왜냐하면 컨슈머가 poll 메서드를 호출할 때 마지막 커밋 오프셋부터 읽기 시작하기 때문이다.
비동기 커밋
비동기 방식의 커밋 메서드에는 콜백 메서드를 가진 객체를 넣어 호출할 수 있다.
리밸런스 처리
리밸런스 발생 시 현재까지 처리된 데이터를 기준으로 커밋을 시도해야 한다.
onPartitionAssigned 메서드는 리밸런스 완료 후 파티션이 할당되면 호출되는 메서드이고, onPartitionRevoked 메서드는 리밸런스가 시작되기 직전에 호출되는 메서드이다.
파티션 할당
subscribe 메서드 대신 assign 메서드를 사용해 특정 파티션을 컨슈머에 직접 할당시킬 수 있다.
컨슈머에 할당된 파티션을 확인하려면 assignment 메서드를 사용하면 된다.
안전한 종료
셧다운 훅을 구현해 wakeup 메서드를 호출하도록 하고, poll 메서드에서 WakeupException이 발생한 후에 close 메서드를 호출해 리소스가 정상적으로 종료되도록 한다.
어드민 API 활용
AdminClient 클래스를 통해 내부 옵션들을 설정하거나 조회할 수 있다.
브로커 정보, 토픽 리스트, 컨슈머 그룹 등을 조회할 수 있고, 신규 토픽을 생성하거나 파티션 개수를 변경하거나 접근 제어 규칙(ACL) 등을 설정할 수 있다.
컨슈머나 프로듀서는 이 옵션을 사용해 상황에 맞게 적절한 처리를 할 수 있다.
주요 옵션
필수 옵션
bootstrap.servers
프로듀서가 데이터를 전송한 카프카 브로커 클러스터의 주소
2개 이상의 주소를 입력하여 일부 브로커에 이슈가 발생해도 접속에 문제가 없도록 하는 게 좋다.
key.deserializer
레코드의 메시지 키 역직렬화 클래스 지정
value.deserializer
레코드의 메시지 값 역직렬화 클래스 지정
선택 옵션
group.id
컨슈머 그룹 아이디
subsecribe() 메서드로 토픽을 구독할 경우 반드시 선언해주어야 한다.
기본값은 null이다.
auto.offse.reset
저장된 오프셋이 없는 경우 어디서부터 읽을지 선택한다.
다음과 같은 타입이 있다.
latest : 가장 최근에 넣은 높은 번호의 오프셋부터 읽기
earliest: 가장 오래전에 넣은 낮은 번호의 오프셋부터 읽기
none : 컨슈머 그룹이 커밋한 기록을 확인해 없는 경우 에러를 반환하고, 있으면 기존 커밋 기록 이후의 오프셋부터 읽기
기본값은 latest이다.
enable.auto.commit
자동 커밋 동작 여부를 설정한다.
기본값은 true이다.
auto.commit.interval.ms
자동 커밋일 경우의 커밋 간격을 설정한다.
기본값은 5000이다.
max.poll.records
poll() 호출 시 반환되는 레코드 개수를 지정한다.
기본값은 500이다.
session.timout.ms
컨슈머와 브로커의 연결이 끊겨도 유지하는 최대시간을 지정한다.
이 시간 내에 하트비트를 브로커에 보내지 않으면 브로커는 컨슈머에 이상이 생겼다고 간주하고 리밸런싱을 시작한다.
보통 하트비트 시간 간격의 3배로 지정한다.
기본값은 10000ms이다.
heartbeat.interval.ms
하트비트를 전송하는 간격을 지정한다.
max.poll.interval.ms
poll 메서드 호출 간격의 최대 시간을 지정한다.
poll 메서드 호출 이후 데이터 처리에 시간이 너무 많이 걸리는 경우 비정상으로 판단하여 리밸런싱을 시작한다.
기본값은 300000ms(5분)이다.
isolation.level
트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 고립 수준을 지정한다.
read_commited: 커밋 완료된 레코드만 읽기
read_uncommited: 커밋 여부에 상관없이 레코드를 읽기
기본값은 read_commited이다.
출처
아파치 카프카 애플리케이션 프로그래밍
Last updated