카프카에는 많은 CLI, GUI 툴들이 있지만 애플리케이션에서 직접 관리 명령을 통해 카프카를 관리해야할 수 있다. 이럴 때 사용할 수 있는 AdminClient 에 대해 알아보자.
비동기적이고 최종적 일관성을 가지는 API
AdminClient 에서 제공하는 API 를 통해 토픽 설정, 브로커 설정 등을 제어할 수 있지만, 카프카 클러스터는 내부적으로 브로커간 메타데이터 설정 등에서 비동기적으로 작동하기 때문에 일시적으로 방금전에 추가한 토픽이 브로커에서 보이지 않거나 하는 등의 일관성이 깨지는 일이 발생할 수 있다.
이렇게 일시적으로 일관성이 깨질 수 있지만, 설정이 모두 전파되고 난 뒤에는 일관성이 유지되는데 이런 속성을 최종적 일관성이라 하며 AdminClient API 는 이 특징을 갖고 있다.
옵션
AdminClient 의 메서드들은 각각의 Options 객체를 인수로 받아 서로 다른 설정들을 통해 메서드를 사용할 수 있다.
AdminClient 의 메서드들이 공통적으로 갖고 있는 인수는 timeoutMs 로 해당 API 의 타임아웃을 설정할 수 있다.
수평 구조
모든 어드민 작업은 KafkaAdminClient 에 구현되어 있는 아파치 카프카 프로토콜을 통해 이루어진다.
KafkaAdminClient 에는 객체 간 의존관계나 네임스페이스 없이 하나의 큰 인터페이스로 설계되어있어 논란이 있기는 하지만, 쉽게 메서드를 찾아보고 사용할 수 있다는 장점이 있다.
추가 참고 사항
클러스터의 상태를 변경하는 모든 작업은 컨트롤러에 의해 수행된다.
클러스터의 상태를 읽기만하는 작업은 부하가 가장 적은 브로커가 담당하여 수행한다.
AdminClient 를 사용하는 방법은 간단하다. AdminClient 객체를 생성하고 메서드를 사용한 뒤 닫아주는 것이다.
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);
adminClient.close();
생성할 때 필수적으로 설정해야 하는 값은 브로커의 주소뿐이다.
이후 AdminClient 의 메서드를 통해 어드민 기능을 사용하고 close 메서드를 통해 닫아준다.
이 외에도 여러 설정 값이 있지만 컨슈머, 프로듀서에 비해 단순하고 설정할 것이 별로 많지 않다.
client.dns.lookup
위에서 BOOTSTRAP_SERVERS_CONFIG 값을 기준으로 호스트를 설정하는데, 두가지 맹점이 있다.
1. DNS 별칭을 사용하는 경우
별칭을 사용할 경우 카프카에서 인증을 위해 SASL을 사용하는데, 실제 브로커의 호스트명과 별칭이 일치하지 않아 중간자 공격으로 받아들여 연결을 거부할 수 있다.
client.dns.lookup 설정 값으로 resolve_canonical_bootstrap_servers_only 를 넣어주면, DNS 별칭에 포함된 모든 브로커의 이름을 일일이 BOOTSTRAP_SERVERS_CONFIG에 설정해준 것과 동일하게 작동한다.
2. 다수의 IP 주소로 연결되는 DNS 이름을 사용하는 경우
하나의 호스트 이름으로 다수의 IP 가 연결되는 것은 로드밸런서, 프록시 등을 통해 매우 흔하게 있는 일이다.
카프카는 첫번째 해석된 호스트명으로 연결을 시도할 뿐이므로, 연결된 로드 밸런서에 문제가 생기면 브로커는 동작하고 있더라도 시스템에 장애가 발생하게 된다.
client.dns.lookup 설정 값으로 use_all_dns_ips 를 넣어 이러한 문제를 방지하는 것이 강력히 권장된다.
request.timeout.ms
AdminClient 의 응답을 기다리는 시간. 기본값은 120초
클라이언트는 응답을 받기까지 재시도하는 시간을 포함해서 최대 request.timeout.ms 만큼 응답을 기다린다.
위에서 생성한 AdminClient 객체를 통해 무엇을 할 수 있는지 살펴보자.
토픽 목록 조회
ListTopicsResult topics = adminClient.listTopics();
topics.names().get().forEach(System.out::println);
> Task :MyAdminClient.main()
CustomerCountry
test
new-topic
listTopics 메서드가 리턴하는 ListTopicsResult 객체는 Future 객체를 감싸는 Result 객체이다.
topics.names 메서드를 통해 Future 객체가 반환되고, get 메서드를 호출하면 실제로 서버가 토픽 이름 집합을 리턴할때까지 기다리거나, 타임아웃 예외를 발생시킨다.
토픽 생성
String topicName = "admin-test";
int numOfPartitions = 3;
short numOfReplicas = 1;
CreateTopicsResult testTopic = adminClient.createTopics(
Collections.singletonList(new NewTopic(topicName, numOfPartitions, numOfReplicas))
);
System.out.println("topic partitions: " + testTopic.numPartitions(topicName).get());
ListTopicsResult topics = adminClient.listTopics();
topics.names().get().forEach(System.out::println);
> Task :MyAdminClient.main()
topic partitions: 3
CustomerCountry
admin-test
test
new-topic
파티션을 생성할때는 파티션의 수, 레플리카의 수를 설정해주어야 하며 설정하지 않을 경우 브로커의 설정값으로 자동으로 설정된다.
CreateTopicsResult 객체를 통해 생성 결과를 얻을 수 있다.
토픽 삭제하기
String topicName = "admin-test";
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topicName));
ListTopicsResult topics = adminClient.listTopics();
topics.names().get().forEach(System.out::println);
> Task :MyAdminClient.main()
CustomerCountry
test
new-topic
deleteTopics 메서드를 통해 토픽을 삭제하고, 삭제한 결과를 가져올 수 있다.
토픽 삭제는 휴지통과 같은 개념이 존재하지 않아 한번 삭제할 경우 되돌릴 수 없으며, AdminClient 를 이용할 경우 주의 표시도 해주지 않기 때문에 조심히 다루어야 한다.
설정 관리
AdminClient 를 사용해 설정값도 제어하고, 조회할 수 있다.
애플리케이션에서 설정값을 관리하지 말고 다른 카프카 관리 툴을 사용하는게 보통이지만, 이 방식 역시 흔하게 사용된다.
String topicName = "test";
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
DescribeConfigsResult configsResult = adminClient.describeConfigs(Collections.singleton(configResource));
Config configs = configsResult.all().get().get(configResource);
configs.entries().forEach(System.out::println);
> Task :MyAdminClient.main()
ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[], type=STRING, documentation=null)
ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[], type=LIST, documentation=null)
...
test 토픽의 설정값을 출력한다.
이 외에도 컨슈머 그룹 관리, 토픽에 파티션 추가, 리더 선출, 레코드 삭제, 레플리카 재할당 등 애플리케이션 상에서 다양한 기능을 수행할 수 있다.
MockAdminClient 를 사용하여 AdminClient 를 mocking 하여 테스트도 돌려볼 수 있다. (실제로는 브로커가 하나도 뜨지 않는다.)