카프카 컨슈머에 대한 개념과 컨슈머 API의 사용법에 대해 알아보자.
컨슈머와 컨슈머 그룹
컨슈머는 컨슈머 그룹의 일부로 동작하며 컨슈머 그룹에 컨슈머만 추가하면 메시지의 처리 속도를 높일 수 있다. 그러나 파티션 수보다 컨슈머의 수가 커지게 되면 파티션에 대응되지 못한 컨슈머는 놀게 되어 적절한 파티션 수를 설정해주는 것이 중요하다.
토픽 하나에 여러 개의 컨슈머 그룹을 할당할 수도 있으며, 각각의 컨슈머 그룹은 토픽의 메시지를 독립적으로 소비한다. (메시지 1~100 이 있으면 그룹1도 1~100, 그룹2도 1~100 메시지를 모두 받을 수 있다.)
컨슈머 그룹과 파티션 리밸런스
컨슈머 그룹은 동일한 메시지를 중복해서 컨슈머들이 소비하지 않도록, 컨슈머가 새로 추가되더라도 읽고 있던 파티션의 오프셋을 받아 그 이후부터 읽는다. 컨슈머가 종료되거나 크래시가 난 경우에도 남은 컨슈머가 나간 컨슈머의 오프셋을 받아 이어서 파티션을 읽게 되는데, 이러한 과정을 리밸런스라고한다. 리밸런스는 컨슈머 그룹만의 변경이 아닌, 토픽이 변경되었을때(ex. 토픽에 새 파티션이 추가된 경우)도 동일하게 발생한다.
리밸런스는 컨슈머 그룹이 사용하는 파티션을 재할당하는 것인데, 이 전략으로는 2가지가 있다.
리밸런스 작업은 해당 컨슈머 그룹의 그룹 코디네이터 역할을 맡은 카프카 브로커와 함께 이루어진다. 컨슈머들은 그룹 코디네이터에게 하트비트를 백그라운드 스레드로 전송하며, 하트비트가 일정 시간 오지 않으면 그룹 코디네이터는 해당 컨슈머가 죽었다고 판단하여 리밸런스를 진행한다. 컨슈머를 닫을때는 즉시 리밸런스가 진행된다.
조급한 리밸런스(eager rebalance)
리밸런스가 실행되게 되면, 모든 컨슈머는 읽기 작업을 중단하고 자신에게 할당된 파티션의 소유권을 모두 포기한다.
이후 다시 컨슈머들이 컨슈머 그룹에 참여하여 완전히 새로운 파티션을 할당받는다.
작업이 중단되는 시간의 길이는 컨슈머 그룹의 크기는 물론 여러 설정값들의 영향을 받는다.
협력적 리밸런스(cooperative rebalance 혹은 점진적 리밸런스incremental rebalance)
한 컨슈머에게 할당되어 있던 파티션만을 다른 컨슈머에게 재할당한다.
이런 과정이 여러번 반복되어 리밸런스가 진행되지만, 조급한 리밸런스와 달리 읽기 작업이 전체가 중단되는 stop the world 현상이 발생하지 않아 리밸런싱 작업에 시간이 많이 소요되는 특히 컨슈머의 수가 많은 경우에 용이하다.
3.1 버전 부터는 협력적 리밸런스 방식을 기본값으로 사용하고 있으며, 조급한 리밸런스 방식은 추후에 삭제될 예정이다.
정적 그룹 멤버십
위에서 하트비트나 컨슈머의 종료 요청을 통해 그룹 코디네이터가 리밸런싱 작업을 수행한다고 했는데, 정적 그룹 멤버십을 갖는 컨슈머의 경우에는 약간 다르다.
정적 그룹 멤버십을 갖는 컨슈머란 groud.instance.id 값을 고유하게 지정한 컨슈머다.
정적 그룹 멤버십을 갖는 컨슈머는 최초 할당 시 해당 그룹의 파티션 할당 전략에 따라 할당되는 것은 동일하나, 해당 컨슈머가 꺼질 경우 session.timeout.ms로 설정한 시간을 넘기지 않는 이상 리밸런스를 수행하지 않는다.
session.timeout.ms이내에 해당 컨슈머가 재시작되면, 그룹 코디네이터에 있던 캐시를 통해 해당 컨슈머에게 읽고 있던 파티션을 그대로 다시 할당해준다.
정적 그룹 멤버십은 애플리케이션이 파티션의 내용물을 통해 로컬 상태나 캐시를 유지해야할 때 동일한 파티션을 다시 할당받을 수 있고, 리밸런싱 작업도 일어나지 않기 때문에 용이하다. 그러나 재시작된 컨슈머가 계속 쓰여지고있는 파티션의 최신 메시지까지 따라 잡을 수 있는지 확인할 필요가 있다.
카프카 컨슈머 생성하기
package org.example;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("group.id", "CountryCounter");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
}
}
컨슈머 생성 방법은 이전에 살펴본 프로듀서 생성 방법과 유사하며, 시리얼라이저가 아니라 디시리얼라이저를 지정해주어여 한다.
bootstrap.servers, key.deserializer, value.deserializer 는 필수 속성값이고, group.id 는 컨슈머 그룹의 id 를 나타내는데 컨슈머가 컨슈머 그룹 없이 동작하는 것은 실제 운영에서는 비정상적인 일이므로 추가로 지정한다.
토픽 구독하기
consumer.subscribe(Collections.singletonList("customerCountries"));
생성한 컨슈머에 구독할 토픽을 subscribe 메서드의 파라미터로 넘겨준다.
여러 개의 토픽을 구독할 경우 여러 개의 토픽 이름을 갖는 리스트를 파라미터로 넘길 수 있으며, 정규식을 통해서도 구독할 수 있다.
폴링 루프
카프카 컨슈머 API의 핵심은 서버에 추가 데이터가 들어왔는지 지속적으로 폴링하는 단순한 루프이다.
Duration timout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timout);
for (ConsumerRecord<String, String> record : records) {
...
}
}
위 코드는 컨슈머 API를 사용하는 애플리케이션 코드의 폴링 루프 예시 코드이다.
timeout 값으로 설정한 100ms 동안 consumer 는 데이터가 들어올때까지 대기하고, 100ms가 넘도록 데이터를 받아오지 못하면 즉시 리턴된다.
while(true) 를 통해 계속해서 폴링하는데, 폴링이 일어나지 않으면 해당 컨슈머는 죽은 것으로 간주되어 읽어오던 파티션들이 다른 컨슈머에게 넘어간다. 브로커가 컨슈머가 죽었다고 판단하는 폴링 주기는 max.poll.interval.ms 설정값으로 설정할 수 있다.
poll 을 호출하는 것은 단순히 데이터만 가져오는 것 외에도, 처음 호출 시 그룹 코디네이터를 찾아 컨슈머 그룹에 참가하고, 파티션을 할당받는 일도 한다. 이 과정에서 리밸런스도 일어나는데 그 과정의 예외도 poll 메서드에서 예외의 형태로 발생하게 된다.
스레드 안정성
하나의 컨슈머는 하나의 스레드를 사용해야 한다. 컨슈머는 스레드 안전하지 않다.
하나의 애플리케이션에서 다수의 컨슈머를 사용하고 싶다면, 컨슈머 로직을 ExecutorService 를 사용해 다수의 스레드로 다수의 컨슈머를 다루는 것이 좋다.
또 다른 방법으로는 컨슈머에서는 이벤트를 받아서 큐에 넣고, 여러 개의 워커 스레드가 큐에서 이벤트를 꺼내서 처리하는 방식도 있다.
fetch.min.bytes
컨슈머가 브로커로부터 레코드를 얻어올 때 받는 데이터의 최소량. 기본값 1byte
이 값을 올려 잡으면 오가는 데이터가 적을 때 데이터를 묶어서 보내 브로커와 컨슈머의 부하를 줄여줄 수 있다.
하지만 이 값을 올려 잡으면 바로 레코드를 처리하지 못해 지연시간이 증가될 수 있다.
fetch.max.wait.ms
fetch.min.bytes 값을 올려 잡아 레코드를 대기시킬 경우 최대 대기시키는 시간. 기본값 500ms
fetch.min.bytes 이상 데이터가 모이거나, fetch.max.wait.ms 시간을 넘길 경우에 데이터가 전송된다.
fetch.max.bytes
컨슈머가 브로커로부터 받는 레코드 배치의 최대 크기. 기본값 50MB
컨슈머의 메모리의 양을 제한하기 위해 사용된다.
첫 레코드 배치는 특별하게 설정한 최대 크기를 넘기더라도 컨슈머에게 전송된다. 첫 배치가 너무 크다고 최대 크기만큼만 보내버리면 이후의 레코드들을 읽어올때도 최대 크기를 넘어가 계속해서 컨슈머가 읽어올 수 없어지기 때문이다.
max.poll.records
컨슈머가 poll 메서드를 호출할 때마다 최대 리턴되는 레코드의 수.
레코드 개수를 제어할 필요가 있다면 설정하면 된다.
max.partition.fetch.bytes
파티션별로 리턴하는 최대 바이트 수.
파티션별로 리턴하는 바이트 수를 제한하여 컨슈머의 메모리를 관리할 수는 있으나, 한 번에 얼마나 많은 파티션의 응답이 올 지 알 수 없기 때문에 파티션별로 비슷한 양의 데이터를 받아와야 한다는 등 특별한 이유가 아니고는 fetch.max.bytes 설정을 사용할 것을 강력히 권장한다.
session.timeout.ms, heartbeat.interval.ms
컨슈머가 하트비트를 받지 못해 죽었다고 판단하는 기준 시간이 session.timeout.ms 이며, 기본값은 10초다. (3.0 부터는 45초)
heartbeat.interval.ms 는 하트비트를 보내는 주기에 대한 설정값이며, 대개 session.timout.ms 시간의 1/3 으로 잡는다. (3.0 이후로는 지켜지지 않고 있긴 하다.)
session.timeout.ms 를 짧게 잡으면 더 빨리 죽은 컨슈머를 찾아내 리밸런스 할 수 있지만, 원하지 않는 리밸런스가 발생할 수 있고, 길게 잡으면 리밸런스가 자주 일어나지 않지만 죽은 컨슈머를 찾아내는 시간이 더 오래걸린다.
max.poll.interval.ms
컨슈머가 폴링을 하지 않고도 죽은 것으로 판정되지 않을 수 있는 최대 시간. 기본값은 5분
세션 타임아웃과 비슷해보일 수 있으나, 하트비트는 백그라운드 스레드에서 전송하므로 메인 스레드 폴링을 블록되었지만 하트비트는 잘 전송하고 있을 수도 있다. 따라서 별도의 설정 값이다.
폴링을 요청하는 주기를 통해 컨슈머를 관리하는 것은 사실상 데이터를 받아 처리하는 시간이 길어질 수도 있어 완벽한 예방책은 아니지만, 정상적인 컨슈머가 매우 드물게 도달할 정도로 충분히 크게, 그럼에도 정지한 컨슈머로 인해 서비스에 영향이 가지 않도록 충분히 작게 설정되어야 한다.
이 타임아웃이 지나면 백그라운드 스레드는 하트비트를 전송하지 않고, leave group 요청을 보내 리밸런스가 일어나도록 한다.
default.api.timeout.ms
모든 컨슈머 API 호출에 적용되는 타임아웃 값. 기본값은 1분
request.timeout.ms 의 기본값 30초보다 크기 때문에 이 시간 안에 재시도를 할 수 있다.
poll 메서드는 별도의 타임아웃을 지정하므로 이 설정이 무시된다.
request.timeout.ms
컨슈머가 브로커로부터 응답을 기다릴 수 있는 최대시간. 기본값은 30초
타임아웃 경과 시 브로커가 응답하지 않을 것이라 간주하고 연결을 닫고 재연결을 시도한다.
이미 과부화 상태인 브로커에 다시 요청을 보낸다고 얻는게 없고, 브로커에 요청을 처리할 충분할 시간을 주는 것이 중요하므로, 30초보다 내리지 않는 걸 권장한다.
auto.offset.reset
컨슈머가 오프셋을 커밋한 적 없거나, 커밋된 오프셋이 유효하지 않을 때(ex. 너무 오래동안 메시지를 읽지 않았을 때) 파티션을 읽으려 할 때의 작동에 대한 정의
기본값은 latest 이며, 컨슈머가 동작하고 난 뒤부터 쓰여진 레코드부터 읽기 시작한다.
다른 값으로는 earliest 가 있는데, 파티션의 맨 처음부터 모든 데이터를 읽는 방식이다.
enable.auto.commit
컨슈머가 자동으로 오프셋을 커밋할 지 여부. 기본값 true
true 인 경우 auto.commit.interval.ms 설정값을 통해 커밋할 주기를 설정할 수 있다.
중복을 최소화하고, 유실되는 데이터를 방지하기 위해 언제 커밋할 지 직접 정의하고 싶다면 false 로 설정후 직접 커밋하면 된다.
partition.assignment.strategy
컨슈머 그룹에 속한 컨슈머의 수와 구독할 토픽, 파티션의 수가 정해졌을 때 각 파티션을 컨슈머에게 어떻게 할당할 지에 대한 설정값
컨슈머 C1, C2가 파티션이 각 3개인 T1, T2 토픽을 구독중이라고 하자.
Range
컨슈머 C1 는 T1, T2의 0번, 1번 파티션을, 컨슈머 C2 는 T1, T2의 2번 파티션을 할당받는다.
파티션을 연속적으로 끊어서 할당하므로 파티션의 개수가 컨슈머의 개수로 깔끔하게 나누어 떨어지지 않을 때 앞의 컨슈머가 뒤의 컨슈머보다 더 많은 파티션을 할당받는 상황이 자주 발생한다.
RoundRobin
컨슈머 C1는 T1의 0, 2번과 T2의 1번 파티션을, 컨슈머 C2 는 T1의 1번과 T2의 0, 2번 파티션을 할당받는다.
컨슈머들이 모든 파티션을 가져다 순차적으로 하나씩 컨슈머에 할당하여 컨슈머 간 할당받는 파티션 수의 차이는 최대 1개이다.
Sticky
파티션들을 최대한 균등하게 할당하며, 리밸런스가 발생했을 때 가능한 많은 파티션들이 같은 컨슈머에 할당되게 하여 리밸런스 오버헤드를 최소화하는 목표를 갖고 있는 할당방식이다.
같은 컨슈머 그룹의 컨슈머들이 같은 토픽을 구독할 경우 round robin 방식과 균등하게 분배되는 정도에 차이가 없지만, 컨슈머들이 다른 토픽을 구독할 경우 sticky 방식이 더 균형있게 분배한다.
Cooperative Sticky
Sticky 와 동일한 할당 방식을 사용하지만, 협력적 리밸런스 기능을 지원하여 Sticky 방식을 stop the world 현상 없이 가능하도록 해준다.
client.id
브로커가 요청을 보낸 클라이언트를 인식하는 데 사용한다.
모니터링, 로깅, 쿼터 등에 사용된다.
client.rack
다수의 데이터센터, 다수의 클라우드 가용 영역을 사용하는 경우 무조건 리더 레플리카에서 읽어오는 것은 네트워크 지연을 늘릴 수 있다. 따라서 동일한 데이터센터, 동일한 가용 영역의 레플리카에서 데이터를 읽어오면 네트워크 속도를 높일 수 있다. 이런 기능을 사용하기 위해서는 client 의 위치를 알아야 하므로 이 설정값을 통해 가까운 레플리카와 매칭한다.
이 설정값을 설정하고, replica.selector.class 의 설정값을 org.apache.kafka.common.replica.RackAwareReplicaSelector로 잡아주면 렉 위치를 통해 가까운 렉에서 데이터를 읽어온다.
selector 를 커스텀하여 직접 읽어올 레플리카를 선택하고 싶다면 ReplicaSelector 인터페이스를 구현하는 클래스 구현하여 replica.selector.class 의 설정값으로 넣어주면 된다.
groud.instance.id
정적 그룹 멤버십 기능을 적용하기 위해 사용되는 설정이다.
어떤 고유한 문자열도 사용 가능하다.
receive.buffer.bytes, send.buffer.bytes
TCP 수신, 송신 버퍼 크기
offsets.retention.minutes
커밋된 오프셋을 보존하는 기간. 기본값 7일
커밋된 오프셋이 보존 기간을 넘겨 삭제되면, 이후에 동일한 컨슈머 그룹이 동작을 시작하더라도 완전히 새로운 컨슈머 그룹인 것처럼 작동한다.
오프셋 커밋
오프셋 커밋이란 컨슈머가 파티션에서의 현재 위치를 업데이트 하는 작업을 말한다.
컨슈머는 성공적으로 처리한 마지막 메세지를 커밋하는 방식으로 그 앞의 메세지들도 성공적으로 처리 됐음을 보장한다.
오프셋 커밋 방식
카프카 특수 토픽인 __consumer_offsets 토픽에 각 파티션별 커밋된 오프셋을 업데이트하는 방식으로 동작한다.
기본적으로 오프셋 커밋은 poll 메서드를 통해 읽어온 메시지의 다음 위치를 가리키는 오프셋을 일정 시간마다 자동으로 커밋한다.
그러나 일정 시간마다 커밋하는 방식에서 컨슈머에서 크래시가 발생해 마지막 커밋 지점으로 돌아가 다른 컨슈머가 작업을 이어받는 과정에서, 메시지를 받아 처리하다가 오프셋 커밋을 하기 전에 죽어버렸다면 중복 처리되는 메시지가 발생할 것이고, 커밋을 먼저하고 메시지들을 처리 하는 중에 죽었다면 유실되는 메시지가 발생할 것이다.
위 문제를 해결하기 위해 KafkaConsumer API 는 오프셋을 커밋하는 다양한 방법을 지원한다.
자동 커밋
enable.auto.commit 설정값을 true 로 잡아주면 5초에 한번 poll 메서드를 통해 받은 메시지 중 마지막 메시지의 다음 오프셋을 커밋한다.
커밋 이후 3초 동안 동작하다가 크래시가 난 컨슈머를 생각해보면, 다른 컨슈머가 크래시난 컨슈머의 파티션을 읽을 때 크래시난 컨슈머가 3초 동안 처리한 메시지들을 중복해서 처리하게 된다. 5초라는 interval 을 줄인다고 하더라도 중복을 완전히 막을 수는 없다.
자동 커밋은 편리하나 중복을 방지 할 수 없다.
현재 오프셋 커밋하기
메시지 유실의 가능성을 제거하기 위해 컨슈머 API는 애플리케이션 개발자가 원하는 시간에 현재 오프셋을 커밋할 수 있도록 하는 옵션을 제공한다.
commitSync 메서드를 호출하면 poll 메서드로 받아온 메시지의 다음 위치 오프셋을 커밋한다.
메시지를 처리하고 애플리케이션 코드에서 commitSync 메서드를 호출하는 방식으로 많이 사용하겠지만, 메시지 처리 중 크래시가 난다면 역시 중복 처리를 피할 수 없다.
commitSync 는 해결할 수 없는 에러가 발생하지 않는 한 커밋을 재시도한다.
package org.example;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerSyncCommit {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("group.id", "CountryCounter");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
consumer.subscribe(Collections.singletonList("customerCountries"));
Duration timout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timout);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
}
}
}
}
비동기적 커밋
앞에서 살펴본 수동 커밋의 단점은 브로커의 커밋 응답 메시지를 받을 때까지 애플리케이션이 블록된다는 점이다.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timout);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
consumer.commitAsync();
}
수동 커밋에서 commitSync 를 호출하는 부분을 commitAsync 를 호출하도록 수정했다.
비동기 커밋은 에러가 발생하면 재시도를 하지 않는다. 따라서 try catch 블록도 없는 것
비동기 커밋이 재시도를 하지 않는 이유는 응답을 받지 않기 때문에 비동기 커밋 간 순서가 보장되지 않아 이후의 커밋이 앞선 커밋보다 늦게 처리되어 오프셋이 의도치 않게 동작하는 에러가 발생할 수 있기 때문이다. 따라서 비동기 커밋을 재시도하고 싶다면 별도의 콜백에서 비동기 커밋에 단조증가하는 번호를 붙여 번호를 비교하여 최신 커밋만 재시도하도록 제어할 수 있다.
동기적 커밋과 비동기적 커밋 함께 사용하기
비동기적 커밋을 재시도 없이 사용하는 것은 가끔 일시적인 에러로 인해 커밋이 실패한다고 해도 이후의 커밋이 성공하면 되므로 큰 문제가 되지는 않는다. 그러나, 컨슈머를 닫기 전, 리밸런스 전 마지막 커밋같은 경우에는 커밋이 보장되는 것이 필요할 수 있다.
...
while (!closing) {
ConsumerRecords<String, String> records = consumer.poll(timout);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
consumer.commitAsync();
}
try {
consumer.commitSync();
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
닫히기 직전에 동기 커밋 요청을 통해 확실하게 커밋하도록 하고 닫는다.
앞서 말한 리밸런스 상황에는 이후에 살펴볼 리밸런스 리스너를 통해 이벤트를 감지하고 동기 커밋 메서드를 호출한다.
특정 오프셋 커밋하기
위에서 살펴본 commitSync, commitAsync 는 메서드 호출만 하면 poll 메서드로 받아온 메시지의 다음 메시지 오프셋을 자동으로 커밋한다. 하지만, 커밋 메서드들에 파라미터로 오프셋을 전달하면 특정 오프셋을 커밋하도록 할 수 있다.
레코드 배치의 크기가 커, 중간에 별도 커밋을 지정하고 싶거나 리밸런스를 감지하여 처리한 부분까지만 커밋하고 싶을 경우 유용하게 사용할 수 있다.
리밸런스 리스너는 ConsumerRebalanceListener 를 구현하여 사용하며, 메서드는 3가지가 존재한다.
onPartitionsAssigned
파티션이 컨슈머에게 재할당된 후, 컨슈머가 메시지를 읽기 전에 호출
onPartitionsRevoked
컨슈머의 파티션이 할당 해제될 때 호출
조급한 리밸런스를 사용할 경우 컨슈머가 메시지 읽기를 멈추고 리밸런스가 시작되기 전에 호출
협력적 리밸런스를 사용할 경우 리밸런스가 완료될 때, 리밸런스 대상 파티션에 대해서만 호출
이 메서드에서 오프셋을 커밋해야함.
onPartitionsLost
협력적 리밸런스 중 파티션이 할당 해제되기 전에 다른 컨슈머에게 해당 파티션이 할당되는 예외적인 경우에만 호출
구현하지 않을 시 onPartitionsRevoked가 대신 호출
특정 오프셋의 레코드 읽어오기
지금까지는 poll 메서드에 파라미터 없이 호출하여 커밋된 마지막 오프셋을 기준으로 순차적으로 읽어오는 방식으로만 사용했지만, consumer.seek 메서드를 통해 특정 오프셋으로 오프셋을 설정하여 읽어올 수 있다.
폴링 루프를 벗어나는 방법
다른 스레드에서 메인 스레드에서 수행중인 poll 메서드가 WakeupException 예외를 발생시키도록 하여 폴링 루프를 벗어날 수 있다.
다른 스레드에서 consumer.wakeup 메서드를 호출하면 메인 스레드의 poll 메서드에서 예외가 발생하며, poll 메서드 실행 중이 아니라면 다음 실행에서 바로 예외가 발생한다.
메인 스레드에서는 WakeupException 예외를 캐치하여 오프셋을 커밋하고, 파일을 닫는 등의 작업을 해준 뒤 consumer.close 메서드를 호출하여 컨슈머를 닫고 종료할 수 있다. close 메서드를 호출하면 그룹 코디네이터에게 해당 컨슈머가 그룹을 떠난다는 메시지가 전달되어 즉시 리밸런스가 발생한다.
디시리얼라이저
시리얼라이저와 마찬가지로 기본적으로 제공하는 디시리얼라이저를 사용하거나, 직접 커스텀하거나, Avro 와 같은 라이브러리를 통해 커스텀하는 방법으로 사용할 수 있다.
Avro 를 사용하지 않을 경우 시리얼라이저에 대응되어 디시리얼라이저를 직접 관리해주어야 하는데, 많아질 경우 관리가 매우 어렵다.
Avro 를 사용할 경우 스키마 레지스트리에 별도로 스키마들을 저장하고 있으므로 url 만 지정해주면 디시리얼라이저를 편리하게 꺼내 쓸 수 있다.
독립 실행 컨슈머
지금까지는 컨슈머가 컨슈머 그룹의 일원으로 동작하여 리밸런스, 파티션 할당 등에 대해 알아보았다. 하지만 토픽에 대한 모든 파티션의 데이터를 읽어와 하나의 컨슈머에서 처리한다면 리밸런스나 파티션 자동할당 같은 기능이 필요하지 않기 때문에 독립적으로 컨슈머를 실행하여 리소스를 절약할 수 있다.
사용방법
consumer.partitionsFor('topicName') 메서드를 통해 특정 토픽에 대한 파티션들을 가져오고, consumer.assign(partitions) 메서드로 파티션을 수동 할당한다.
파티션이 할당되었으므로 기존에 읽어오던 방식대로 메시지를 읽어올 수 있다.