메시지를 쓰는 프로듀서는 다음과 같은 순서로 메시지를 쓰게 된다.
ProducerRecord 객체를 생성한다.
메시지가 들어갈 토픽과 메시지 벨류를 지정한다.
파티션, 키 값은 필수는 아니지만 지정할 수 있다.
ProducerRecord 객체가 네트워크 상에서 전송될 수 있도록 시리얼라이저를 통해 바이트 배열로 변환된다.
파티션이 지정되지 않았다면 파티셔너에게 보내 ProducerRecord 객체의 키 값을 통해 파티션을 결정한다.
해당 메시지를 레코드 배치에 추가한다.
별도의 스레드가 레코드 배치를 적절한 카프카 브로커에게 전송한다.
메시지를 성공적으로 저장했다면 토픽, 파티션, 오프셋 등의 정보를 담은 RecordMetadata 객체를 리턴한다.
메시지 저장에 실패했을 경우 에러가 리턴되며, 에러가 리턴되기 전까지 몇 번 더 재전송을 시도할 수 있다.
카프카 프로듀서 필수 속성값
bootstrap.servers
프로듀서가 사용할 브로커의 host:port 목록
모든 브로커를 지정할 필요는 없다. 첫 연결 후 추가 정보를 가져오기 때문. 그러나 브로커 중 하나가 정지하는 경우에도 프로듀서가 연결할 수 있도록 최소 2개 이상을 지정하는 것이 좋다.
key.serializer
key 값은 기본적으로 바이트 배열로 이루어져 있는데, 가독성을 위해 임의의 자바 객체를 키로 사용할 수 있다. 이럴 경우 직렬화 방식을 해당 설정을 통해 지정해주어야 한다.
org.apache.kafka.common.serialization.Serializer 인터페이스를 구현하는 클래스 이름으로 지정한다.
value.serializer
key 시리얼라이저와 마찬가지로 밸류 값을 어떻게 직렬화할 지 지정해준다.
카프카는 메시지 전송 방식으로 크게 3가지 방법이 존재한다.
파이어 앤 포겟 Fire and forget
메시지를 서버에 전송만 하고 성공, 실패 여부는 신경 쓰지 않는다.
대부분의 경우 카프카는 가용성이 높고 자동으로 실패한 메시지를 재전송 시도하기 때문에 성공적으로 전달되지만, 에러가 발생하면 알 수 없어 데이터가 유실된다.
동기적 전송 Synchronous send
카프카 프로듀서는 언제나 비동기적으로 작동한다.
따라서, 메시지를 보내면 Future 객체를 리턴받는데, 이 객체의 get() 메서드를 다음 메시지를 전송하기 전에 호출하여 실제 작업이 완료되고 성공 여부를 확인한 뒤에 다음 메시지를 보내는 방식으로 동기적으로 전송할 수 있다.
비동기적 전송 Asyncronous send
콜백 함수와 함께 send() 메서드를 호출하면 응답을 받을 때 콜백 함수가 호출된다.
먼저 간단하게 메시지를 전달해보자.
프로젝트를 gradle 로 새로 만들고 다음과 같이 설치한다.
dependencies {
implementation 'org.apache.kafka:kafka-clients:3.8.0'
}
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
앞서 살펴본 설정대로 서버 주소를 설정하고, key와 value 모두 string을 사용하므로 StringSerializer 를 지정해준다.
ProducerRecord 객체를 생성하여 producer 를 통해 전송한다.
ProducerRecord 생성자에 넣은 값은 순서대로 topic, key, value 이다.
위 코드로 전송한 메시지를 확인해보자.
/opt/homebrew/Cellar/kafka/3.8.0/libexec/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CustomerCountry --from-beginning
France
동기적으로 메시지 전송하기
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
}
}
기존 코드에서 변경된 부분은 producer.send(record) 에서 get 메서드를 호출하는 부분 뿐이다.
producer.send(record) 메서드의 반환값은 Future 객체로 비동기적인 처리를 위해 사용된다. get 메서드를 호출할 경우 실제로 kafka 에서 온 응답 메시지를 기다리다가 메시지를 전달 받으면 RecordMetadata 메시지의 메타데이터를 담은 객체를 반환한다.
send 메서드를 여러번 실행할 경우 매 실행마다 카프카 브로커의 응답을 기다리므로 성능에 좋지 않다.
비동기적으로 메시지 전송하기
package org.example;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
public class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
}
}
}
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
producer.send(record, new DemoProducerCallback());
} catch (Exception e) {
e.printStackTrace();
}
}
}
기존 코드에서 send 메서드에 DemoProducerCallback 객체를 인자로 넘기는 부분만 달라졌다.
send 의 응답을 기다리지 않아도 브로커에서 응답이 올 때 지정한 callback 이 실행되는 방식으로 비동기 처리 된다.
callback 를 정의하기 위해서는 org.apache.kafka.clients.producer.Callback 인터페이스의 onCompletion 메서드를 구현해야 한다.
콜백은 동기적으로 메인 스레드에서 실행되므로 블로킹 작업을 수행하는 것은 권장하지 않는다.
위에서 살펴본 필수적인 설정 외에도 프로듀서에 다양한 설정을 주는 것이 가능하다. 대부분의 설정값은 합리적으로 설정되어 건드릴 필요 없지만 몇몇 설정값들은 성능, 신뢰성에 영향을 미친다. 그런 설정들을 알아보자.
client.id
프로듀서와 그것을 사용하는 애플리케이션의 식별자
설정해주면 트러블슈팅에 용이하다.
acks
쓰기 작업에 대해 몇 개의 레플리카가 해당 레코드를 받아야 성공했다고 판단할지 지정한다.
acks=0 이면, 쓰기 작업이 성공했다고 가정하여 성능은 좋겠지만 에러가 발생하면 데이터가 그대로 유실된다.
acks=1 이면, 리더 레플리카만 메시지를 받으면 쓰기 작업을 성공했다고 판단한다. 리더 레플리카에 쓰기 작업이 성공했어도 메시지 복제 작업이 끝나기 전에 리더가 죽으면 데이터는 유실된다.
acks=all 이면, 모든 레플리카가 메시지를 전송받아야 성공했다고 판단한다.
acks 설정값을 낮추면 쓰기 작업의 응답시간은 빨라지지만, 데이터의 신뢰성이 떨어져 트레이드오프 관계가 있다. 그러나 컨슈머의 입장에서는 쓰기 응답이 언제오든 읽기 응답만 기다리기 때문에 acks 옵션이 달라져도 응답시간에 차이가 없다. 따라서 응답을 빨리 받아야할 특별할 이유가 없다면 신뢰성이 높은 acks=all 설정을 사용하는 것이 좋다. (기본값도 acks=all 이다.)
메시지 전달 시간
카프카에서는 메시지 전달 시간을 구간을 나누어 timeout 을 설정해줄 수 있다.
max.block.ms
send 메서드가 실행되고 Future 객체를 받기까지 걸리는 시간
프로듀서의 전송 버퍼가 가득차거나, 메타데이터가 사용가능하지 않을 때 블록되면 max.block.ms를 초과해 에러가 발생할 수 있다.
delivery.timeout.ms
send 메서드가 문제없이 리턴된 후부터 콜백이 호출되기까지 걸리는 시간
카프카가 쓰기 작업에서 문제가 생겨 재시도를 하거나 레코드 배치에서 전송을 기다리는 등 여러 이유로 시간이 초과할 수 있다.
사용자 입장에서 메시지가 전송되기까지 기다릴 수 있는 만큼 delivery.timout.ms를 단순하게 설정해주면 된다.
request.timeout.ms
서버에서 응답을 받기위해 얼마나 기다릴 것인지
재시도 시간, 실제 전송 이전에 소요되는 시간(레코드 배치에 있는 시간, 네트워크를 타는 시간)은 포함하지 않고 실제로 응답을 하는데 얼마나 걸리는 지에 대한 타임아웃을 정의한다.
retries, retry.backoff.ms
retries는 전송 실패시 재시도 횟수를, retry.backoff.ms는 재시도 간 대기하는 시간을 의미한다.
delivery.timeout.ms 설정을 통해 전체 대기 시간만 설정하고 이 설정은 건드리는 것을 권장하지 않는다.
애초에 재전송해도 의미가 없는 에러(메시지가 지나치게 큰 경우 등)는 알아서 재전송을 하지 않기 때문에 재전송에 대한 설정을 해주기 보다는 에러가 발생했을 때 대처를 해주는 것에만 집중하면 된다.
레코드 배치를 전송하기 전까지 대기하는 시간
KafkaProducer 는 레코드 배치가 꽉 차거나, linger.ms 시간이 다 되면 배치를 전송한다.
기본적으로 0으로 설정되어 배치를 전송할 수 있는 스레드가 있을때마다 바로 전송하도록 되어 있지만, 전송이 조금 지연되어도 상관없다면 linger.ms 를 0보다 큰 값으로 설정해 배치에 많은 메시지가 담겨 한 번에 전송될 수 있도록 할 수 있다.
buffer.memory
메시지를 전송하기 전에 메시지를 대기시키는 버퍼의 크기
메시지 전송 속도보다 애플리케이션에서 더 빠르게 메시지를 전송하려 하면 버퍼 메모리가 가득 차 max.block.ms 시간만큼 block 되면 타임아웃 에러가 발생하게 된다.
compression.type
메시지 압축 방식, snappy, gzip, lz4, zstd 등의 압축 방식을 선택할 수 있다.
Snappy 압축 알고리즘은 CPU 부하가 작고 성능이 좋으며 압축율이 꽤 좋다.
Gzip 압축 알고리즘은 CPU 와 시간을 더 많이 사용하지만 압축율이 좋아 네트워크 대역폭이 제한적일때 좋을 수 있다.
batch.size
각각의 레코드 배치에 사용될 메모리의 양(byte)
batch.size 만큼 배치에 메시지가 차게 되면 linger.ms 시간이 지나지 않아도 배치를 전송한다.
max.in.flight.requests.per.connection
프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지 수
이 값을 높게 잡으면 처리량이 증가하지만, 재전송이 발생할 경우 메시지의 순서가 보장되지 않을 수 있다.
enable.idempotence 설정을 true 로 주면 기본값인 5번까지의 in-flight 요청을 허용하면서도 순서를 보장할 수 있다.
max.request.size
프로듀서가 전송하는 쓰기 요청의 크기
여러 개의 배치를 전송할 때, 전체 쓰기 요청의 크기를 제어하는 설정으로 브로커가 한 번에 받을 수 있는 최대 메시지 크기 message.max.bytes 설정값과 동일하게 맞춰줌으로써 브로커가 받아들일 수 있는 사이즈의 메시지만 프로듀서가 전송하도록 할 수 있다.
receive.buffer.bytes, send.buffer.bytes
데이터를 읽고 쓸 때 사용할 TCP 버퍼 사이즈
-1 을 설정할 경우 운영체제 기본값을 사용함
데이터센터가 다른 브로커와 통신해야할 경우 네트워크 대역폭은 낮고 지연은 길어지는 것이 보통이므로 이 값들을 올려잡아주는 것이 좋다.
enable.idempotence
메시지가 정확히 한 번 저장될 수 있도록 하는 설정. True 로 설정 시 메시지가 정확히 한 번만 저장된다.
프로듀서에서 쓰기 요청을 보낸 메시지가 리더 브로커에 저장되고, 다른 브로커에도 저장된 뒤 리더 브로커에서 프로듀서에게 응답을 하려다가 크래시가 발생하였다면, 브로커가 재전송을 자동으로 시도하여 새 리더 브로커에게 해당 메시지 쓰기 요청을 재전송하게 되고, 이럴 경우 새 리더 브로커에는 기존 메시지와 재전송된 메시지가 중복되어 저장되어 동일한 두 개의 메시지를 갖게 될 수 있다.
해당 옵션을 True 로 설정하면 레코드마다 순차적인 번호를 붙여서 같은 번호가 두 개 이상 저장되지 않도록 하여 중복을 방지한다.
위 프로듀서 예제에서 String 시리얼라이저를 살펴보았다. 이 외에 기본적으로 제공하는 시리얼라이저로는 모든 유형의 데이터를 직렬화 할 수 없다. 사용자 나름의 시리얼라이저를 정의하는 방법을 알아보자.
커스텀 시리얼라이저를 정의하는 방법
아파치 Avro, Thrift, Protobuf 등의 범용 직렬화 라이브러리를 사용한다.
사용하고 있는 객체를 직렬화하기 위한 커스텀 로직을 작성한다.
2번 방식은 직접 정의하는게 번거롭기도 하지만, 객체에 담을 정보가 변경되면 변경점이 너무 많아져 관리가 어렵다는 큰 단점이 있어 1번 방법이 권장된다.
아파치 에이브로로 직렬화하기
아파치 에이브로로 직렬화할 경우 보통 JSON 형식으로 스키마를 정의한다.
스키마에 따라 이진 파일 형태로 직렬화 결과물이 나오게 된다.
에이브로 파일을 쓰는 과정에 스키마를 쓰는 과정이 포함되어 역직렬화 시 스키마에 자유롭게 접근할 수 있다.
스키마가 변경될 경우에도 구 버전 스키마를 갖고 있는 데이터와 호환될 수 있다.
ex.) id, name, faxNumber 를 갖고 있는 데이터가 id, name, email 를 갖도록 스키마를 변경하였다면, 단순히 새롭게 생성되는 데이터는 email 값을 갖고 faxNumber 값을 갖지 않아, getEmail 은 유효한 값을, getFaxNumber 는 null 을 반환할 뿐 에러나 예외가 발생하지 않는다.
카프카에서 에이브로 레코드 사용하기
에이브로는 스키마를 결과물에 직접 넣어 약간의 오버헤드를 감수하는 정도였으나, 카프카에서는 레코드별로 스키마가 정의되어야 하는데 이 때문에 레코드 사이즈가 2배 이상이 되어 오버헤드가 너무 커진다.
이를 위해 카프카는 스키마 레지스트리라 불리는 아키텍쳐 패턴을 사용한다.
프로듀서를 생성할 때 key, value 타입을 지정해주면 해당 타입의 시리얼라이저가 알아서 사용되며, 직접 정의한 객체를 사용할 경우 POJO 클래스가 아니라 에이브로의 코드 생성 기능을 사용한 스키마로부터 생성된 에이브로 특화 객체를 사용해야 한다.
스키마 레지스트리
아파치 카프카의 일부가 아니고 여러 오픈소스 구현체 중 하나를 선택해서 사용하면 된다.
스키마를 레코드별로 다 저장하고 있는게 아니라, 스키마 레지스트리에 스키마를 저장하고 스키마의 고유 식별자만 레코드에 넣어주는 방식으로 관리한다.
스키마 레지스트리에 스키마를 저장하고, 가져오는 작업은 각각 시리얼라이저와 디시리얼라이저 내부에서 실행되므로 개발자는 별다른 작업 없이 아파치 에이브로 시리얼라이저를 사용하면 된다.
기본 파티셔너
앞에서 살펴본대로 파티셔너는 키 값을 통해 어느 파티션에 해당 레코드를 넣을지 판단하여 저장하고 사용한다.
기본 파티셔너를 사용 중에 키 값을 null로 아무것도 지정하지 않을 경우 접착성을 고려해 라운드 로빈 방식으로 메시지를 저장한다.
여기서 접착성을 고려한 라운드 로빈 방식이란, 파티션이 지정된(키값이 주어진) 레코드에 아무 파티션이나 들어가도 되는 레코드를 붙여 같은 수의 메시지도 적은 요청으로 전송할 수 있도록 하는 방식이다.
기본 파티셔너를 사용 중에 키 값이 주어진 경우에는 hash 방식을 사용하여 메시지를 저장할 파티션을 특정한다.
특정 키값에 대응되는 파티션은 파티션 수가 변하지 않는 한 변하지 않는다.
커스텀 파티셔너
기본 파티셔너는 파티션간 고르게 메시지가 분포되고, 키 값의 해시를 통해서만 파티션을 결정한다. 그러나 키 값의 해시 말고 다른 방식을 사용하고 싶다거나, 특정 데이터들만 따로 파티션을 관리하고 싶다면 커스텀 파티셔너를 사용하는 것이 좋을 수 있다.
package org.example;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class BananaPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int numPartitions = partitionInfos.size();
if (keyBytes == null || !(key instanceof String))
throw new InvalidRecordException("all messages have customer name as key");
if (((String) key).equals("Banana"))
return numPartitions - 1;
return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
위와 같이 Partitioner 인터페이스의 파티션 메서드를 구현하면 커스텀 파티셔너를 사용할 수 있다.
위의 커스텀 파티셔너는 키 값이 'Banana' 일 경우 numPartitions - 1 번 파티션으로 보내고, 나머지 데이터는 그 외 파티션에 저장된다.
레코드는 키, 밸류 값 외에도 헤더를 포함할 수 있다.
헤더의 주된 역할은 메시지의 전달 내역을 기록하는 것이다.
메시지를 직접 까볼 필요 없이 헤더만 보고 메시지를 라우팅하거나 출처를 추적할 수 있다.
클라이언트의 코드를 수정하지 않으면서 애플리케이션의 동일한 작동을 집어넣는다거나 하는 작업이 필요할 수 있는데 이때 인터셉터를 사용할 수 있다.
ProducerInterceptor 인터페이스를 구현하여 사용할 수 있으며, onSend 메서드는 메시지를 직렬화 하기 전 동작을 정의할 수 있고, onAcknowledgement 메서드는 응답을 클라이언트가 받았을 때 호출되어 메시지를 변경할 수는 없으나 그 안의 정보를 읽을 수는 있다.
인터셉터의 사용 사례로는 모니터링, 정보 추적, 표준 헤더 삽입 등이 있다.
쿼터
쓰기/읽기 속도를 제한할 수 있는 기능. 3가지의 쿼터 타입에 대해 한도를 설정할 수 있다.
쓰기 쿼터 produce quota
읽기 쿼터 consume quota
요청 쿼터 request quota
쓰기/읽기 쿼터는 초당 바이트 수 단위로 제한을 둘 수 있고, 요청 쿼터는 브로커가 요청을 처리하는 시간 비율 단위로 제한을 둘 수 있다.
스로틀링
클라이언트가 할당량을 다 채웠을 경우 브로커는 클라이언트 요청에 대해 스로틀링을 시작한다.
대부분의 클라이언트는 max.in.flight.requests.per.connection 설정 값에 따라 대기하는 요청의 수가 제한되어 자동으로 요청 속도를 줄여 할당량을 지킨다.
오작동하는 클라이언트는 설정을 무시하고 요청을 계속 브로커에게 보낼 수 있으나, 브로커가 해당 클라이언트와의 커뮤니케이션 채널을 일시적으로 무시함으로써 할당량을 맞추고 브로커를 보호한다.