NATS

NATS Message Queue 에 대한 간단한 설명&사용법

NATS

오픈 소스 메시징 시스템. Go 로 작성되었다!

Server-Client 간의 Message 전달을 위해 사용한다.

단순 전달을 넘어, Request - Response 까지 지원한다!

NATS.io

Basic Usage

기본은 Kafka 와 유사한 개념으로, Publisher - Broker - Subscriber 로 구성되어 있다

Kafka 에서는 Topic 을 중심으로 서버와 클라이언트가 통신하는데,

NATS 에서는 Subject 를 중심으로 통신한다.

Request - Response 의 형태이나, 아무나(?) Subject 를 바라볼 수 있다! (1:N) 으로!

NATS

출처 : https://medium.com/microservices-learning/building-a-microservices-architecture-with-nats-59fc8a4f331e

NATS in Spring Boot

gradle 기준

build.gradle 파일에 아래 코드를 추가한다.

// https://mvnrepository.com/artifact/io.nats/jnats
implementation group: 'io.nats', name: 'jnats', version: '2.16.0'

Publisher

@Component
public class NatsPublisher {
    //single URL
    Connection nc = io.nats.client.Nats.connect("nats://localhost:4222");

    public NatsPublisher() throws IOException, InterruptedException {
    }

    public String sendNatsMessage(String subject , String message) throws InterruptedException, ExecutionException {
        Message replyMessage = nc.request(subject, message.getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(1));
        return new String(replyMessage.getData(), StandardCharsets.UTF_8);
    }
}

nc.publish 혹은 nc.request 를 통해 메시지를 발행(요청) 한다.

위의 코드에서는 Message 객체로 응답까지 받는다!

Subscriber

@Component
public class NatsConsumer {

    @Bean
    CommandLineRunner commandLineRunner()  {
        return args -> {
            Connection nc = Nats.connect("nats://localhost:4222");
            Dispatcher dispatcher = nc.createDispatcher(message->{
                try {
                    String data = new String(message.getData());
                    if( data.equals("초딩지현")){
                        nc.publish(message.getReplyTo(), createReply(message, "Reply"));
                        System.out.println("Reply");
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            dispatcher.subscribe("findName");
        };
    }

    private byte[] createReply(Message msg, String reply) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        outputStream.write(msg.getData());
        outputStream.write(reply.getBytes());
        byte[] result = outputStream.toByteArray();
        return result;
    }
}

“findName” 이라는 subject 에 대한 요청에 응답하는 수신쪽 코드이다.

@Bean
CommandLineRunner commandLineRunner2() {
    return args -> {
        Connection nc = Nats.connect("nats://localhost:4222");
        Dispatcher dispatcher = nc.createDispatcher(message->{
            try {
                String data = new String(message.getData());
                if( data.equals("초딩지현2")){
                    nc.publish(message.getReplyTo(), createReply(message, "Reply2"));
                    System.out.println("Reply2");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        dispatcher.subscribe("findName");
    };
}

같은 subject 를 구독하는 subscriber 이다.

위에서 data.equals 부분의 if 문을 없애면, commandLineRunner 와 commandLineRunner2 중

먼저 응답을 보낸 내용으로 publisher.request 는 응답받는다!

불편한 부분이라면 (내가 모르는 거겠지만) kafka 처럼 Listener annotation 이 없어서, subject 에 대한 subscribe & reply(publish) 을 직접 작성해줘야 한다.

NATS in MSA

Kafka 도 Message 를 주고 받는 형태로 사용할 수 있다.

하지만, Topic 에 대한 관리 (+ partition, broker 에 대한 정책) 등에 대한 문제와 비용(OSS 임에도 Confluent 와 같은 엔터프라이즈 서비스를 제공받는 다면) 에 대한 장벽이 존재한다.

때문에 Rest 요청&응답은 느리고, Kafka 의 위와 같은 단점이 존재하는 상황에서 대안이 될 수 있으리라 본다!

NATS 를 중심으로 많은 서비스들의 메시지 요청과 응답을 제어할 수 있다. (+ NATS Jetstream 같은 경우에는 kafka 와 유사하게 메시지 내역을 저장하고, replay 도 가능하다고 한다!)

NATS

출처 : https://medium.com/microservices-learning/building-a-microservices-architecture-with-nats-59fc8a4f331e