시작이 반

[MSA] Spring Cloud ( 데이터 동기화 Kafka 활용 ) 본문

Programming/MSA

[MSA] Spring Cloud ( 데이터 동기화 Kafka 활용 )

G_Gi 2021. 8. 11. 14:47
SMALL

https://happycloud-lee.tistory.com/207

서비스의 시나리오를 확인해보자.

user-service에서 회원가입, 로그인을하고 

user는 catalog-service에서 상품목록을 보고

order-service를 통해 해당 상품을 주문한다.

상품을 주문하면 catalog-service의 상품개수가 줄어들어야 한다.

이러한 데이터를 동기화 하기위해 Kafka를 사용해보자.

 

order-service에서 Kafka Topic으로 메시지 전송 -> Producer

catalog-service에서 Kafka Topic에 전송 된 메시지 취득 -> Consumer

 


catalog-service에 Kafka적용 ( Consumer )

우선 kafka관련 디펜던시를 추가해준다.

 

Kafka config와 consumer를 작성해야한다.

 

Kafka config관련 코드 작성

package com.example.catalogservice.messagequeue;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() { //접속하고자 하는 정보 topic
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); //컨슈머 그루핑되어있으면 지금은 1개임
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean //접속 정보를 가지고 리스너 생성
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }

}

접속하느 카프카의 정보를 작성 

- method : ConsumerFactory

카프카 서버주소, group_id를 작성해준다. group_id는 카프카에서 토픽에 쌓여있는 consumer를 그루핑할 수 있다. 여러개의 consumer가 데이터를 가져갈때 특정한 group을 만들어서 사용할 수 있다.

또한 카프카에 json형식으로 보낼 것이기 때문에 key, value의 deserializer를 설정해준다.

(접속정보 작성완료)

 

이제 접속정보를 이용해서 리스너를 생성하자

- method : ConcurrentKafkaListenerContainerFactory

리스너를 생성후 위에서 작성한 설정 정보를 등록한다.

 

 

Kafka Consumer 코드 작성

package com.example.catalogservice.messagequeue;

import com.example.catalogservice.jpa.CatalogEntity;
import com.example.catalogservice.jpa.CatalogRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
@Slf4j
public class KafkaConsumer {

    CatalogRepository repository;

    @Autowired
    public KafkaConsumer(CatalogRepository repository) {
        this.repository = repository;
    }

    @KafkaListener(topics = "order-catalog-topic")
    public void updateQty(String kafkaMessage){ //토픽에서 메세지 가져옴
        log.info("Kafka Message: ->" + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {}); // Json 형태의 string -> json
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        CatalogEntity entity = repository.findByProductId((String) map.get("productId"));
        if (entity != null) {
            entity.setStock(entity.getStock() - (Integer) map.get("qty"));
            repository.save(entity);
        }

    }
}

@KafkaListener어노테이션을 사용하여 topic에 값이 들어오면 감지를 하도록한다.

값을 통해 CatalogRepository (DB)를 업데이트해준다.

 


order-service에 Kafka적용 ( Producer )

마찬가지로 Kafka관련 디펜던시를 추가한다.

 

Kafka config와 producer를 작성한다.

package com.example.orderservice.messagequeue;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() { //접속하고자 하는 정보 topic
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }

}

Consumer와 같이 config정보를 작성해준다.- method : ProducerFactory

 

다음으로는 데이터를 전달하기위한 인스턴스인 KafkaTemplate가 필요하다.- method : KafkaTemplate

 

 

Kafka Producer 코드 작성

데이터를 전달하기위한 Producer코드를 작성하자.

package com.example.orderservice.messagequeue;

import com.example.orderservice.dto.OrderDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto){
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";

        try {
            jsonInString = mapper.writeValueAsString(orderDto); // java object -> json string
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Producer sent data from the Order microservice: " + orderDto);

        return orderDto;
    }
}

KafkaTemplate를 주입받고 이를 이용하여 메시지를 topic에 보낸다.kafkaTemplate.send(토픽이름, Json형태의 String)

 

이제 메시지를 전달하기 위해 이러한 로직이 필요한 Controller에 추가해준다.

우선 Contoller에서 만들어놓은 KafkaProducer를 주입받는다.

 

order-service에서 주문을 할때 topic에 메시지를 보내야 한다.

POST : order-service/{userId}/orders  에서 메시지를 보내는 작업이 필요하다.


코드 작업은 끝났다.

 

이제 동기화가 잘되는지 확인해보자.우선 주키퍼와 카프카 서버를 작동시킨다. 또한 discovery서버, config서버, api-gateway, order-serivce, catalog-service를 작동식켜준다.

 

현재 로직을 테스트함에 있어서는 user-service가 필요없다.

 

consumer 리스너의 이름 정보로 토픽이 생긴다.

 

 

상품목록을 확인해보자.

3개의 상품목록이 있다.

 

이제 catalog-001을 20개 주문해보자

주문이 완료되었다.

 

그러면 주문을 하는 로직에 topic에 메시지를 보내는 코드가 있으므로 topic의 내용을 확인해 보자.

json형태로 메시지가 잘 간것을 볼 수 있다.

 

다시 상품목록을 확인해보면

catalog-001의 상품이 100 -> 80개로 바뀐것을 볼 수 있다. 

 

 

 

 

<참고 : Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)>

LIST