일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- spring oauth
- JPA
- java
- Coroutine
- 백준 15685
- 프로래머스
- springboot
- 백준 파이썬
- 백준
- 백준 16719
- with recursive
- spring security
- Spring
- MSA
- MySQL
- 백준 16236
- 백준 17626
- Kotlin
- 백준 19238
- 파이썬
- re.split
- 백준 16235
- java 기술면접
- 프로그래머스
- sql 기술면접
- spring cloud
- 웹어플리케이션 서버
- 백준 17779
- JVM
- Spring Boot
- Today
- Total
시작이 반
[MSA] Spring Cloud ( 데이터 동기화 Kafka 활용 ) 본문
서비스의 시나리오를 확인해보자.
user-service에서 회원가입, 로그인을하고
user는 catalog-service에서 상품목록을 보고
order-service를 통해 해당 상품을 주문한다.
상품을 주문하면 catalog-service의 상품개수가 줄어들어야 한다.
이러한 데이터를 동기화 하기위해 Kafka를 사용해보자.
order-service에서 Kafka Topic으로 메시지 전송 -> Producer
catalog-service에서 Kafka Topic에 전송 된 메시지 취득 -> Consumer
catalog-service에 Kafka적용 ( Consumer )
우선 kafka관련 디펜던시를 추가해준다.
Kafka config와 consumer를 작성해야한다.
Kafka config관련 코드 작성
package com.example.catalogservice.messagequeue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() { //접속하고자 하는 정보 topic
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); //컨슈머 그루핑되어있으면 지금은 1개임
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean //접속 정보를 가지고 리스너 생성
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
접속하느 카프카의 정보를 작성
- method : ConsumerFactory
카프카 서버주소, group_id를 작성해준다. group_id는 카프카에서 토픽에 쌓여있는 consumer를 그루핑할 수 있다. 여러개의 consumer가 데이터를 가져갈때 특정한 group을 만들어서 사용할 수 있다.
또한 카프카에 json형식으로 보낼 것이기 때문에 key, value의 deserializer를 설정해준다.
(접속정보 작성완료)
이제 접속정보를 이용해서 리스너를 생성하자
- method : ConcurrentKafkaListenerContainerFactory
리스너를 생성후 위에서 작성한 설정 정보를 등록한다.
Kafka Consumer 코드 작성
package com.example.catalogservice.messagequeue;
import com.example.catalogservice.jpa.CatalogEntity;
import com.example.catalogservice.jpa.CatalogRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
@Slf4j
public class KafkaConsumer {
CatalogRepository repository;
@Autowired
public KafkaConsumer(CatalogRepository repository) {
this.repository = repository;
}
@KafkaListener(topics = "order-catalog-topic")
public void updateQty(String kafkaMessage){ //토픽에서 메세지 가져옴
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {}); // Json 형태의 string -> json
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
CatalogEntity entity = repository.findByProductId((String) map.get("productId"));
if (entity != null) {
entity.setStock(entity.getStock() - (Integer) map.get("qty"));
repository.save(entity);
}
}
}
@KafkaListener어노테이션을 사용하여 topic에 값이 들어오면 감지를 하도록한다.
값을 통해 CatalogRepository (DB)를 업데이트해준다.
order-service에 Kafka적용 ( Producer )
마찬가지로 Kafka관련 디펜던시를 추가한다.
Kafka config와 producer를 작성한다.
package com.example.orderservice.messagequeue;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() { //접속하고자 하는 정보 topic
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
Consumer와 같이 config정보를 작성해준다.- method : ProducerFactory
다음으로는 데이터를 전달하기위한 인스턴스인 KafkaTemplate가 필요하다.- method : KafkaTemplate
Kafka Producer 코드 작성
데이터를 전달하기위한 Producer코드를 작성하자.
package com.example.orderservice.messagequeue;
import com.example.orderservice.dto.OrderDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(orderDto); // java object -> json string
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Order microservice: " + orderDto);
return orderDto;
}
}
KafkaTemplate를 주입받고 이를 이용하여 메시지를 topic에 보낸다.kafkaTemplate.send(토픽이름, Json형태의 String)
이제 메시지를 전달하기 위해 이러한 로직이 필요한 Controller에 추가해준다.
우선 Contoller에서 만들어놓은 KafkaProducer를 주입받는다.
order-service에서 주문을 할때 topic에 메시지를 보내야 한다.
POST : order-service/{userId}/orders 에서 메시지를 보내는 작업이 필요하다.
코드 작업은 끝났다.
이제 동기화가 잘되는지 확인해보자.우선 주키퍼와 카프카 서버를 작동시킨다. 또한 discovery서버, config서버, api-gateway, order-serivce, catalog-service를 작동식켜준다.
현재 로직을 테스트함에 있어서는 user-service가 필요없다.
consumer 리스너의 이름 정보로 토픽이 생긴다.
상품목록을 확인해보자.
3개의 상품목록이 있다.
이제 catalog-001을 20개 주문해보자
주문이 완료되었다.
그러면 주문을 하는 로직에 topic에 메시지를 보내는 코드가 있으므로 topic의 내용을 확인해 보자.
json형태로 메시지가 잘 간것을 볼 수 있다.
다시 상품목록을 확인해보면
catalog-001의 상품이 100 -> 80개로 바뀐것을 볼 수 있다.
'Programming > MSA' 카테고리의 다른 글
[MSA] Spring Cloud ( Kafka connect - MariaDB 연결 ) (5) | 2021.08.16 |
---|---|
[MSA] Spring Cloud ( Kafka 사용법 ) (0) | 2021.08.09 |
[MSA] Spring Cloud ( Microservice간 통신 - feignClient) (0) | 2021.08.08 |
[MSA] Spring Cloud ( Config, Cloud bus ) (0) | 2021.08.07 |
[MSA] Spring Cloud ( Config, actuator refresh ) (0) | 2021.08.07 |