docker 를 이용해 카프카와 주키퍼 서버를 띄우고
스프링 부트 애플리케이션에서 이벤트(메시지)를 발행하고 읽는 과정을 담았다.
도커 설치 이후부터 설명한다.
docker-compose
OS 에 상관없이 개발 환경을 맞출 수 있고 간단한 스크립트로 쉽게 컨테이너를 띄울 수 있다.
version: '3.8' services: zookeeper: image: wurstmeister/zookeeper container_name: zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka container_name: kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock
실행 후 docker desktop 에서 확인하기
정상적으로 실행되었다. cli를 통해 확인할 수 있지만 cli 대신 spring-boot 로 바로 연결한다.
kafka
카프카의 구성요소를 살짝 알아본다.
- zookeeper: 카프카 클러스터를 관리한다. 카프카 클러스터와 관련된 정보 기록 및 관리를 담당한다.
- Kafka cluster: 카프카에서 사용할 메시지 저장소다. 하나의 카프카 클러스터는 여러 개의 브로커로 구성된다. 브로커는 각각의 서버이며 메시지를 나눠서 저장하거나 이중화 처리 장애 대응 등의 역할을 수행한다.
- Kafka producer: 카프카 클러스터에 메시지를 보내는 역할을 수행한다.
- Kafka consumer: 카프카 클러스터에 메시지를 읽어오는 역할을 수행한다.
spring-boot
spring-boot application에서 kafka 를 사용하기 위한 설정 방법이다.
의존성
implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test'
spring boot application 에 카프카 의존성을 추가해준다.
설정 클래스
producer config
@EnableKafka @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { final HashMap<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
"localhost:9092"
: docker 로 띄운 카프카 서버의 호스트와 포트를 지정한다.
"StringSerializer.class"
: 이벤트(메시지)를 보낼 때 key 와 value 를 각각 String 타입으로 직렬화해서 보낸다. (필요에 따라 다른 데이터 타입이나 json 등으로 직렬화 할 수 있다.)consumer config
@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { final HashMap<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { final ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
"localhost:9092"
: 마찬가지로 docker 로 띄운 카프카 서버의 호스트와 포트를 지정한다.
"StringSerializer.class"
: 이벤트(메시지)를 가져올 때 key 와 value 를 각각 String 타입으로 역직렬화해서 읽어온다. (필요에 따라 다른 데이터 타입이나 json 등으로 역직렬화 할 수 있다.)실행해보기
SpringBootApp1
은 port 8100, SpringBootApp2
은 port 8200 으로 띄운 후SpringBootApp2:8200
--- 메시지 발행 --> SpringBootApp1:8100
소비하도록 해보자.producer code (port:8200)
@GetMapping("/kafka") public String getOrderWithKafka() { kafka.send("order-topic", "order 가 보낸 이벤트!"); return "getOrderWithKafka"; }
order-topic
으로 이벤트를 발행하고 문자열을 리턴한다.consumer code (port:8100)
@KafkaListener(topics = "order-topic", groupId = "order-group1") public String getItemWithKafka(String event) { System.out.println("ItemApi.getItemWithKafka: " + event); return "getItemWithKafka"; }