시작이 반

[MSA] Spring Cloud ( Kafka connect - MariaDB 연결 ) 본문

Programming/MSA

[MSA] Spring Cloud ( Kafka connect - MariaDB 연결 )

G_Gi 2021. 8. 16. 23:01
SMALL

https://happycloud-lee.tistory.com/207

 

https://tmdrl5779.tistory.com/188?category=875847 

 

[MSA] Spring Cloud ( Kafka 데이터 동기화 )

이전에는 FeignClient를 사용하여 user-service에서 http통신을 통해 order-service API를 불러왔다. 현재는 서비스마다 각각의 DB를 사용하고 있다. 하지만 order-service가 여러개 실행된다면 어떻게 될까?? 하..

tmdrl5779.tistory.com

이전 포스팅에서 설명했던 order-service가 여러개 켜져있을 경우이다.만약 각 서비스당 하나의 DB를 가지고 있다면 같은 api를 호출해도 각각 다른 DB에 분산되어 저장되기 때문에 일괄적으로 데이터를 가져올때 어려움이 있다.이 때문에 DB를 하나만 사용하면 되는데 여기에 Kafka Connector를 같이 사용하여 Microservice에서 직접 DB에 대한 커넥션과 처리작업을 하지 않고 관련 작업은 Kafka에 일임하도록 해보자.

 


우선 기존에 사용하던 H2를 사용하지 않고 order-service와 MariaDB를 연결하자

 

order-service MariaDB 연결

디펜던시 추가

 

또한 application.yml 파일에서 기존 h2정보를 수정한다.

 

주문 하나를 하여 order-service의 db에 잘 저장되나 확인하자

저장이 성공적으로 됐으며 기존에 order-service와 catalog-service의 동기화 또한 잘 작동하고 있다.
20개주문 : 100 -> 80으로 바뀜

 

 


Kafka Connector 사용

Kafka를 사용하여 DB저장을 일임하자

https://tmdrl5779.tistory.com/188?category=875847 

 

[MSA] Spring Cloud ( Kafka 데이터 동기화 )

이전에는 FeignClient를 사용하여 user-service에서 http통신을 통해 order-service API를 불러왔다. 현재는 서비스마다 각각의 DB를 사용하고 있다. 하지만 order-service가 여러개 실행된다면 어떻게 될까?? 하..

tmdrl5779.tistory.com

이전 포스팅에서 커넥터를 사용하여 어떤형태의 데이터가 들어가는지 파악해보았었다.

토픽에는 위의 형태의 데이터가 저장되고 이러한 데이터는 생성한 Sinck Connector에 의해 불려지고 DB에 저장된다. 만약 이러한 포맷으로 데이터가 들어오지 않는다면 DB에 저장되지 않는다.

 

우선 정해진 포맷과 같은 형태의 dto를 만들자

 

이러한 형태를 커넥터와 연걸된 토픽에 넣어야 한다.

즉, 데이터를 토픽에 보내야 하니까 서비스는 Producer가 있어야한다.

 

Producer를 만들자.

package com.example.orderservice.messagequeue;

import com.example.orderservice.dto.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.List;

@Service
@Slf4j
public class OrderProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = Arrays.asList(new Field("string", true, "order_id"),
            new Field("string", true, "user_id"),
            new Field("string", true, "product_id"),
            new Field("int32", true, "qty"),
            new Field("int32", true, "unit_price"),
            new Field("int32", true, "total_price"));
    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();
   ///////////////////////////////해당 포맷에 맞는 Schema 생성//////////////////////////

    @Autowired
    public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto){
        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();
	////////////////////////////////해당 포맷에 맞는 Payload 생성///////////////////////
        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";

        try {
            jsonInString = mapper.writeValueAsString(kafkaOrderDto); // java object -> json string
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("Kafka Producer sent data from the Order microservice: " + kafkaOrderDto);

        return orderDto;
    }
}

topic이 매개변수로 넘어오면 해당 topic의 이름에 생성한 데이터를 String 형태로 보낸다.

( kafkaTemplate.send(topic, jsonInString); )

 

이제 위에 만든 send함수를 controller에서 호출하여 db에 값을 저장시킨다.

기존에 jpa를 사용하여 DB와 연결하여 저장하는 코드는 삭제하고 kafka를 통해 일임 하는 코드를 넣어주었다.

Topic 의 이름은 orders로 할 것이다.

 

코드를 다 작성한 상태에서 우선 kafka connect 의 목록을 확인해보자.

( 당연히  kafka connect 서버는 켜있어야함 )

이전 포스팅에서 사용한 2개의 커넥터가 있다. 

 

우리는 orders토픽과 MariaDB에 연결된 Sink Connector를 새로 만들것이다.

connector의 이름은 my-order-sink-connect로 해줬다.

( JdbcSinkConnector를 사용하기 위해서는 우선 plugin이 등록이 되어있어야한다. )

class와 url를 보면 db와 연결되게 해줬으며

orders 토픽과 연결되게 하였다.

 

카프카 커넥터의 plugin을 확인해보고 싶다면

GET: /connector-plugins 를 사용하여 확인해보면된다.

 

 

커텍터 등록 확인

또한 orders란 topic도 새로 생겼다.

 

 


TEST

사용자가 하나의 주문을 한다면 orders라는 topic에 데이터가 들어올 것이며 이러한 데이터를 DB에 저장 시킬 것이다.

 

order-service를 2개 작동시켜 해당 데이터가 같은 db에 저장되는지 확인

 

주문을 2개 해보자

주문을 2번하게되면

한번은 order-service1을 호출하고 또한번은 order-service2를 호출한다.

 

consumer를 통해 topic에 값이 들어갔는지 확인

2개의 데이터가 들어갔다.

 

DB에 저장확인

 

 

 

<참고 : Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)>

LIST