카테고리 없음

카프카 파티션과 컨슈머 인스턴스

jw-backend 2024. 7. 1. 19:48
반응형

Q  컨슈머 인스턴스 안에 여러 파티션이잇는건가?  컨슈머 인스턴스 여러개는 어떻게 만들까? 

 

A

 

Kafka에서 컨슈머 인스턴스와 파티션의 개념을 이해하는 것이 중요합니다. 각 토픽은 여러 개의 파티션으로 나뉘어져 있으며, 각 파티션은 순서가 보장된 메시지 스트림을 나타냅니다. 컨슈머 그룹 내에서 각 컨슈머 인스턴스는 하나 이상의 파티션을 소비합니다.

컨슈머 인스턴스 내 여러 파티션

하나의 컨슈머 인스턴스는 여러 파티션으로부터 메시지를 소비할 수 있습니다. 이는 Kafka가 자동으로 관리해 주는 기능입니다. 예를 들어, 만약 하나의 컨슈머 인스턴스가 있고, 하나의 토픽이 3개의 파티션을 가지고 있다면, 그 컨슈머 인스턴스는 3개의 파티션으로부터 메시지를 소비하게 됩니다.

여러 컨슈머 인스턴스 생성

여러 컨슈머 인스턴스를 생성하고 컨슈머 그룹으로 관리하면, 각 컨슈머 인스턴스는 파티션을 나누어 소비하게 됩니다. 만약 컨슈머 인스턴스의 수가 파티션의 수보다 적다면 일부 컨슈머 인스턴스는 여러 파티션을 소비하게 되고, 반대로 컨슈머 인스턴스의 수가 파티션의 수보다 많다면 일부 컨슈머 인스턴스는 아무런 파티션도 할당받지 못하게 됩니다.

Spring Kafka를 사용하여 여러 컨슈머 인스턴스 생성하기

다음은 Spring Kafka를 사용하여 여러 컨슈머 인스턴스를 생성하는 예제입니다.

1. Kafka 설정

java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

2. Kafka 리스너 설정

여러 컨슈머 인스턴스를 생성하려면 @KafkaListener를 사용하여 리스너를 정의하면 됩니다. Spring Kafka는 각 리스너 인스턴스를 별도의 스레드에서 실행합니다.

java

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "your-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
    }
}

3. 애플리케이션 실행

Spring Boot 애플리케이션을 실행하면, Kafka 컨슈머 인스턴스가 여러 개 생성되어 각 파티션을 소비하게 됩니다. 컨슈머 인스턴스의 개수는 @KafkaListener에 설정된 컨슈머 그룹 ID에 따라 자동으로 조정됩니다.

참고 사항

  • 컨슈머 그룹 ID가 같은 컨슈머 인스턴스는 동일한 그룹으로 묶여 파티션을 나누어 소비합니다.
  • 동일한 애플리케이션 내에서 여러 컨슈머 인스턴스를 실행하거나, 여러 애플리케이션 인스턴스를 실행하여 컨슈머 인스턴스를 확장할 수 있습니다.

이렇게 하면 Kafka의 스케일링 기능을 통해 효율적으로 메시지를 소비할 수 있게 됩니다.