본문 바로가기
카프카

카프카 프로듀서 파티셔닝: 왜 라운드 로빈이 아니지?

by mjjang 2023. 9. 4.

카프카 프로듀서는 브로커에 토픽을 전송할 때 키가 있으면 해시 방식으로 보내고 키가 없으면 라운드 로빈 방식으로 전송한다고 알고 있었지만, 카프카 클라이언트 2.4.0 이후부터는 키가 없을 때 라운드 로빈 방식으로 동작하지 않다는 것을 알게 되었습니다.

문제 인식 과정

Spring Kafka(spring boot 3.1.2, kafka clients 3.4.1)를 이용하여 개발을 진행하던 중 @KafkaListener의 concurrency를 테스트하고 있었습니다.

    @KafkaListener(topics = ["sample"], groupId = "first", concurrency = "3")

파티션이 3개인 sample 토픽을 생성하고 위에 코드를 작성했습니다. concurrency를 3으로 설정하면 3개의 컨슈머 쓰레드가 생성되고 각 컨슈머는 파티션과 1대1 매핑이 됩니다.
sample 토픽을 카프로 전송하면 라운드 로빈 방식이기 때문에 3개의 컨슈머 쓰레드가 번갈아 가며 레코드를 수신할 거라 예상했지만 하나의 쓰레드에서만 레코드를 수신했습니다.
카프카 클라이언트가 내부적으로 어떻게 동작하는지 자세하게 알지 못했기 때문에 해당 동작이 이해되지 않았고 어디가 문제인지 어떤 부분을 확인해야 할지 감이 잡히지 않았었습니다.

문제 해결 과정

일단 알고 있는 부분 먼저 검증하기 시작했습니다. 하나의 컨슈머 쓰레드에서만 레코드를 수신한다는 것은 하나의 파티션에만 레코드가 전송되고 있다고 생각해서 컨슈머 그룹인 first를 조회한 결과 파티션이 하나로 몰리는 것을 확인했습니다.
그 후에 파티션 정보는 어디서 가져오는지? 어떻게 전송이 되는지 확인하기 위해 KafkaProducer 클래스를 살펴보았습니다.

다행히, KafkaProducer.doSend 메서드에 명확하게 partition을 가져오는 코드가 있었는데

    int partition = partition(record, serializedKey, serializedValue, cluster);

내부를 자세히 들여보니,,, 1. 토픽에 파티션을 지정했거나 2. 커스텀 파티셔너가 있거나 3. 토픽에 키 값을 주었거나
위 3가지에 해당하지 않으면 RecordMetadata.UNKNOWN_PARTITION(-1)을 리턴했습니다.
친절하게도 주석에 RecordMetadata.UNKNOWN_PARTITION을 리턴한다면 RecordAccumulator에서 각 파티션에 생성된 데이터의 양 등 브로커 부하를 고려하여 파티션을 선택한다고 나와 있었습니다.

RecordAccumulator 내부로

카프카 프로듀서는 레코드를 바로 브로커로 전송하는 것이 아닌 성능 향상을 위해 레코드를 모아서 배치 단위로 전송합니다. 해당 내용은 이 글에 자세히 설명되어 있습니다.

RecordAccumulator.append 메서드를 확인하니

    if (partition == RecordMetadata.UNKNOWN_PARTITION) {
        partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
        effectivePartition = partitionInfo.partition();
    } else {
        partitionInfo = null;
        effectivePartition = partition;
    }

UNKNOWN_PARTITION 일 경우에는 BuildInPartitioner에서 파티션 정보를 가져옵니다.
BuiltInPartitioner.peekCurrentPartitionInfo 메서드에서는 파티션 정보를 캐시해서 캐시에 값이 있으면 해당 파티션을 반환합니다.
이 때문에 라운드 로빈 방식이 아닌 계속 동일한 파티션에 레코드를 저장하게 된 것입니다.

파티션 정보 캐시의 갱신

캐시 정보가 갱신되어야 파티션이 변경될 수 있기 때문에 언제 파티션 정보 캐시가 갱신되는지 추적했습니다.
해당 로직은 BuiltInPartitioner.updatePartitionInfo 에서 확인할 수 있었습니다.

    int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes);

    if (producedBytes >= stickyBatchSize && enableSwitch || producedBytes >= stickyBatchSize * 2) {
        // We've produced enough to this partition, switch to next.
        StickyPartitionInfo newPartitionInfo = new StickyPartitionInfo(nextPartition(cluster));
        stickyPartitionInfo.set(newPartitionInfo);
    }

현재 파티션 정보에 추가할 레코드에 바이트를 누적하고 이 값(producedBytes)이 batchSize보다 큰지 비교하여 파티션 정보를 갱신하고 있습니다.

테스트

카프카 프로듀서 내부 코드를 확인하여 왜 concurrency가 하나의 쓰레드에서만 동작하였는지 이해했습니다. 테스트를 작성하여 이해한 대로 동작하는지 검증해 보겠습니다.

  1. 배치 사이즈가 넘어가면 파티션이 바뀌는지 검증
    var firstPartition = -1
    var secondPartition = -1

    var count = 0
    var max = 1000

    for (i in 0 until max) {
        when (count++) {
            0 -> {
                val send = kafkaTemplate.send("sample", "a")
                firstPartition = send.get().recordMetadata.partition()
            }
            max - 1 -> {
                val send = kafkaTemplate.send("sample", "a")
                secondPartition = send.get().recordMetadata.partition()
            }
            else -> {
                kafkaTemplate.send("sample", "a")
            }
        }
    }

    firstPartition shouldNotBe secondPartition

max 값을 조절하면서 디버깅을 한 결과 기본 batch size 16KB를 넘으면 파티션이 변경되는 것을 확인할 수 있었습니다.

  1. 파티션이 바뀌었을 때 컨슈머 쓰레드가 바뀌는지 검증
    @KafkaListener(topics = ["sample"], groupId = "first", concurrency = "3")
    fun consume(message: String) {
        println("thread: ${Thread.currentThread().id}")
    }

출력문을 통해 간단하게 파티션이 바뀌었을 경우 쓰레드가 바뀌는 것을 확인했습니다.

마지막으로 라운드 로빈 방식으로 동작하는 2.4.0 이전 버전에서 처음 예상했던 동작(컨슈머 스레드가 번갈아 가며 호출)대로 동작하는지 검증했습니다.

또 다른 궁금증

언제 라운드 로빈 방식에서 지금과 같은 방식으로 변경이 됐고 어떤 이유로 바뀌었는지 궁금해졌습니다.
해당 궁금증은 DefaultPartitioner 에서 확인할 수 있었습니다.

    /**
     * NOTE this partitioner is deprecated and shouldn't be used.  To use default partitioning logic
     * remove partitioner.class configuration setting.  See KIP-794 for more info.
     *
     * The default partitioning strategy:
     * <ul>
     * <li>If a partition is specified in the record, use it
     * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
     * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
     * 
     * See KIP-480 for details about sticky partitioning.
     */
    @Deprecated
    public class DefaultPartitioner implements Partitioner {

디폴트 파티셔너는 Deprecated 되어 있었고 자세한 정보가 담겨 있는 카프카 컨플루언스 정보가 있었습니다.
라운드 로빈에서 스티키 방식으로 변경된 이유
스티키 방식에서 한 차례 성능을 더 높인 방법

그리고 디폴트 파티셔너 히스토리를 파악하기 위해 카프카 깃허브를 찾아보았습니다.

DefaultPartitioner

히스토리를 과거에서 현재 순으로 요약하자면

  1. DefaultPartitioner는 라운드 로빈 방식으로 동작
  2. RoundRobinPartitioner를 추가
  3. DefaultPartitioner 로직을 sticky 방식으로 변경
  4. DefaultPartitioner를 Deprecated하고 BuiltInPartitioner를 사용하도록 변경

결론

많은 블로그에도 적혀있듯이, 막연하게 프로듀서에서 키가 없으면 라운드 로빈 방식, 키가 있으면 해시 방식으로 대충 이해하고 넘어갔지만 concurrency 동작을 확인하는 과정에서 그렇지 않다는 것을 알게 되고 카프카 프로듀서 내부에 대해서 조금이라도 알게 되었다.

본인이 사용하는 카프카 클라이언트 버전에 따라 파티셔닝 전략이 어떻게 되는지 파악하고 사용하면 카프카로 개발하는 데 조금이라도 도움이 될 것 같습니다.

다음에는 이 글에서 깊게 다루지 않은 스티키 파티셔닝에 대해 알아보겠습니다.

댓글