Spring AMQP in Reactive Applications을 번역해봅시다.

2018. 8. 8. 17:44Java/Spring Boot

아주아주 오래간만에 글을 올립니다.
제가 직접 작성한 글은 아니구 역시나 번역을 목적으로 하기 보단, 실습을 위주로 정리한 글입니다.

실제 출처는  http://www.baeldung.com/spring-amqp-reactive 입니다.
직접 가서 보시면 보다 자세한 설명을 맞이할수 있습니다.


1. Overview

간단한 Spring Boot Reactive Application을 만드는데,  HTTP-AMQP Gateway입니다.
여기선 point-to-point 와 publish-subscribe 시나리오 두가지를 다루게 될것입니다.

기본적으로는 AMQP, RabbitMQ, Spring Boot 등은 당연히 알아야 하며,
그 안에 Exchanges, Queue, Topics 등도 당근 알아야 겠죠.
이에 대해서 좀더 알고 싶다면, 아래 링크를 참고해주시면 감사하겠습니다.


2. RabbitMQ Server Setup

로컬에서 RabbitMQ를 셋업 할 수 있지만,  개발 장비에 그런 환경을 시뮬레이션 하기 위해서,
Docker에 올려놓고 사용해봅시다.
다음 명령은 단독 RabbitMQ 서버를 시작할수 있습니다.

$ docker run -d --name rabbitmq -p55672:5672 rabbitmq:3

$sudo docker run -d --name rabbitmq -p 5672:5672 -p 5672:15672 --restart=unless-stopped -e RABBITMQ_DEFAULT_USER=ㅎㅁㅎㅁ -e RABBITMQ_DEFAULT_PASS=ㅎㅁㅎㅁ rabbitmq:management
어떤 영속적인 볼륨을 선언하지 않았습니다. 

docker logs 명령으로 서버 로그를 체크 할 수 있습니다. 
$ sudo docker logs -f  77f3e283383a
2018-08-06 04:45:42.908 [info] <0.33.0> Application lager started on node rabbit@77f3e283383a
2018-08-06 04:45:43.141 [info] <0.33.0> Application jsx started on node rabbit@77f3e283383a
2018-08-06 04:45:43.219 [info] <0.33.0> Application mnesia started on node rabbit@77f3e283383a
2018-08-06 04:45:43.219 [info] <0.33.0> Application crypto started on node rabbit@77f3e283383a
2018-08-06 04:45:43.219 [info] <0.33.0> Application recon started on node rabbit@77f3e283383a
2018-08-06 04:45:43.219 [info] <0.33.0> Application cowlib started on node rabbit@77f3e283383a
2018-08-06 04:45:43.223 [info] <0.33.0> Application os_mon started on node rabbit@77f3e283383a
2018-08-06 04:45:43.223 [info] <0.33.0> Application xmerl started on node rabbit@77f3e283383a
2018-08-06 04:45:43.271 [info] <0.33.0> Application inets started on node rabbit@77f3e283383a
2018-08-06 04:45:43.271 [info] <0.33.0> Application asn1 started on node rabbit@77f3e283383a
2018-08-06 04:45:43.271 [info] <0.33.0> Application public_key started on node rabbit@77f3e283383a
2018-08-06 04:45:43.314 [info] <0.33.0> Application ssl started on node rabbit@77f3e283383a
2018-08-06 04:45:43.318 [info] <0.33.0> Application ranch started on node rabbit@77f3e283383a
2018-08-06 04:45:43.318 [info] <0.33.0> Application ranch_proxy_protocol started on node rabbit@77f3e283383a
2018-08-06 04:45:43.320 [info] <0.33.0> Application cowboy started on node rabbit@77f3e283383a
2018-08-06 04:45:43.320 [info] <0.33.0> Application rabbit_common started on node rabbit@77f3e283383a
2018-08-06 04:45:43.327 [info] <0.197.0>
Starting RabbitMQ 3.7.7 on Erlang 20.3.8.3
Copyright (C) 2007-2018 Pivotal Software, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.7.7. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: <stdout>

              Starting broker...

서버 상태 정보를 아래 명령을 통해 가져올수 있습니다.

$ sudo docker exec rabbitmq rabbitmqctl status
Status of node rabbit@77f3e283383a ...
[{pid,385},
{running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","3.7.7"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.7"},
      {amqp_client,"RabbitMQ AMQP Client","3.7.7"},
      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.7"},
      {rabbit,"RabbitMQ","3.7.7"},
      {mnesia,"MNESIA  CXC 138 12","4.15.3"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.7.7"},
      {cowboy,"Small, fast, modern HTTP server.","2.2.2"},
      {ranch_proxy_protocol,"Ranch Proxy Protocol Transport","1.5.0"},
      {ranch,"Socket acceptor pool for TCP protocols.","1.5.0"},
      {ssl,"Erlang/OTP SSL application","8.2.6.1"},
      {public_key,"Public key infrastructure","1.5.2"},
      {asn1,"The Erlang ASN1 compiler version 5.0.5","5.0.5"},
      {inets,"INETS  CXC 138 49","6.5.2.2"},
      {xmerl,"XML parser","1.3.16"},
      {os_mon,"CPO  CXC 138 46","2.4.4"},
      {cowlib,"Support library for manipulating Web protocols.","2.1.0"},
      {recon,"Diagnostic tools for production use","2.3.2"},
      {crypto,"CRYPTO","4.2.2"},
      {jsx,"a streaming, evented json parsing toolkit","2.8.2"},
      {lager,"Erlang logging framework","3.6.3"},
      {goldrush,"Erlang event stream processor","0.1.9"},
      {compiler,"ERTS  CXC 138 10","7.1.5"},
      {syntax_tools,"Syntax tools","2.1.4.1"},
      {syslog,"An RFC 3164 and RFC 5424 compliant logging framework.","3.4.2"},
      {sasl,"SASL  CXC 138 11","3.1.2"},
      {stdlib,"ERTS  CXC 138 10","3.4.5"},
      {kernel,"ERTS  CXC 138 10","5.4.3.2"}]},
{os,{unix,linux}},
{erlang_version,
     "Erlang/OTP 20 [erts-9.3.3.2] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:64] [hipe] [kernel-poll:true]\n"},
{memory,
     [{connection_readers,0},
      {connection_writers,0},
      {connection_channels,0},
      {connection_other,24536},
      {queue_procs,0},
      {queue_slave_procs,0},
      {plugins,1344280},
      {other_proc,20970848},
      {metrics,195528},
      {mgmt_db,277192},
      {mnesia,79944},
      {other_ets,2241568},
      {binary,112752},
      {msg_index,58208},
      {code,28565132},
      {atom,1131721},
      {other_system,11411995},
      {allocated_unused,16104312},
      {reserved_unallocated,0},
      {strategy,rss},
      {total,[{erlang,66413704},{rss,73068544},{allocated,82518016}]}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"},{http,15672,"::"}]},
{vm_memory_calculation_strategy,rss},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,3227592294},
{disk_free_limit,50000000},
{disk_free,37339201536},
{file_descriptors,
     [{total_limit,65436},
      {total_used,4},
      {sockets_limit,58890},
      {sockets_used,0}]},
{processes,[{limit,1048576},{used,384}]},
{run_queue,0},
{uptime,955},
{kernel,{net_ticktime,60}}]
다른 유용한 명령들을 소개합니다.

  • list_exchanges : 선언된 모든 Exchange 목록
  • list_queues : 선언된 모든 Queue 목록, 읽지 않은 메세지들의 수도 포함합니다..
  • list_bindings : exchange와 queue간에 라우팅 키들을 포함하여, 선언된 모든 바인딩들 

3. Spring AMQP Project Setup

Spring AMQP 모듈Spring Boot starter를 이용하여 메시징 서버에 메세지를 게시하거나 수신하도록 합니다.
amqp 와 webflux를 추가합시다. 


dependencies {
  compile('org.springframework.boot:spring-boot-starter-webflux')
  compile('org.springframework.boot:spring-boot-starter-amqp')
}
spring-boot-amqpAMQP 관련된 모든 것을 가져오는데 반해(whereas),
spring-boot-start-webflux반응형 REST 서버를 구현하는데 사용되는 핵심 Dependency입니다.

4.Scenario 1 : Point-to-Point Messaging

클라이언트로부터 메시지를 받는 broker에 논리적 엔티티인 Direct Exchange를 사용할 것입니다.
Direct Exchange는 모든 유입 메시지를 클라이언트에 의해 소비하는 하나의 대기열로 라우팅 합니다.
다수의 클라이언트는 같은 큐를 구독하는데, 그중 하나의 클라이언트만 주어진 메시지를 받을 수 있습니다.

4.1. Exchange and Queues Setup

시나리오에서, DestinationInfo 객체를 사용합니다.
DestinationInfo 객체exchange 이름라우팅 key를 캡슐화하고 있습니다.
Destination name이 key인 map은 모든 가능한 destination들을 저장하는데 사용됩니다. 

아래 @PostConstruct 메소드는 해당 초기화 셋업에 대해 책임을 지고 있습니다.

@Bean
public CommandLineRunner setupQueueDestinations(AmqpAdmin amqpAdmin, DestinationsConfig destinationsConfig){
     return (args) ->{
         log.info("[I48 Creating Destinations...");

         destinationsConfig.getQueues()
                  .forEach((key,destination) -> {
                           log.info("[I54 Creating directExchange: key={},name={},routingKey={}"
                                    key
                                    ,destination.getExchange()
                                    ,destination.getRoutingKey());

                           Exchange ex = ExchangeBuilder
                                                      .directExchange(destination.getExchange())
                                                      .durable(true)
                                                      .build();
                           amqpAdmin.declareExchange(ex);

                           Queue q = QueueBuilder
                                             .durable(destination.getRoutingKey())
                                             .build();
                           amqpAdmin.declareQueue(q);

                           Binding b = BindingBuilder
                                             .bind(q)
                                             .to(ex)
                                             .with(destination.getRoutingKey())
                                             .noargs();
                           amqpAdmin.declareBinding(b);

                           log.info("[I70] Binding successfully created.");
         });
};
setupQueueDestinations() 메소드는 Exchange, Queues을 선언하고, 
이들을 제공된 라우팅 키와 함께 바인딩하기 위해 Spring에 의해 생성된 adminAmqp 빈즈를 사용합니다.

모든 destination들은 우리 예제에서 사용된 @ConfigurationProperties 클래스인 
DestinationsConfig bean으로 부터 오게됩니다.

해당 클래스는 application.yml 설정 파일로 부터 읽어 매핑에서 작성된 
DestinationInfo 객체로 채워진(be populated with) 속성을 가지고 있습니다.

** DestinationsConfig 클래스

@Data
@ConfigurationProperties("destinations")
public class DestinationsConfig {

    private Map<String,DestinationInfo> queues = new HashMap<>();

    private Map<String,DestinationInfo> topics = new HashMap<>();

   // Destination stores the Exchange name and routing key used
   // by our producers when posting message
   @Data
   public static class DestinationInfo{

      private String exchange;
      private String routingKey;

   }
}

** Application.yml 파일

spring:
  rabbitmq:
    host: 192.168.1.159
    port: 5672
    username: ㅎㅁㅎㅁ
    password: ㅎㅁㅎㅁ

destinations:
  queues:
    NYSE:
      exchange: nyse
      routing-key: NYSE



4.2. Producer Endpoint

Producers는 "/queue/[name] 에 HTTP POST을 보내는 것"에 의해 메시지를 보냅니다.
이게 reactive endpoint 라서, 간단한 ack(acknowledgement)를 리턴하는 Mono를 사용합니다.

@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
@RestController
public class Application {

    //Specifies a basic set of AMQP operations.
    @Autowired
    private AmqpTemplate amqpTemplate;

    //Specifies a basic set of portable AMQP administrative operations for AMQP &gt; 0.9.
    @Autowired
    private AmqpAdmin amqpAdmin;

    @Autowired
    private DestinationsConfig destinationsConfig;

    @PostMapping(value = "/queue/{name}")
    public Mono<ResponseEntity<?>> sendMessageToQueue(@PathVariable String name, @RequestBody String payload) {

    // Lookup exchange details
    final DestinationInfo d = destinationsConfig
        .getQueues()
        .get(name);

    if (d == null) {
    // Destination not found
        return Mono.just(ResponseEntity.notFound().build());
    }

    return Mono.fromCallable(() -> {

    log.info("[I51] sendMessageToQueue: queue={}, routingKey={}"
        , d.getExchange()
        , d.getRoutingKey());

    amqpTemplate.convertAndSend(
        d.getExchange(),
        d.getRoutingKey(),
        payload);

    return ResponseEntity.accepted().build();
    });
}
이름 파라미터가 유효한 종착점에 부합하는지 여부를 우선 체크하고, 만약 그렇다면, 
autowired amqpTemplate 인스턴스를  실제  단순한 String 메세지인 payload를 MQ에  보내는데 사용합니다.


4.3. MessageListenerContainer Factory

비동기적으로 메시지를 받기 위해서, Spring AMQP는 MessageContainerListener 추상 클래스를 사용합니다.  
이 클래스는 AMQP Queue들과 어플리케이션에 의해 제공된 listener들에서의 정보 흐름을 중재합니다.  

메시지 리너스들을 추가하기 위해서 이 클래스들을 구체적으로 구현할 필요해야 하므로,
실제 구현에서 컨트롤러 코드를 독립시키는 팩토리를 정의합니다.

이 경우에는,  createMessageListenerContainer 메소드를 호출할 때 마다
팩토리 메소드는 new SimpleMessageContainerListener를 리턴합니다. 

@Component
public class MessageListenerContainerFactory {

     @Autowired
     private ConnectionFactory connectionFactory;

     public MessageListenerContainerFactory() {}

     public MessageListenerContainer createMessageListnerContainer(String queueName){
          SimpleMessageListenerContainer messageListenerContainer =
               new SimpleMessageListenerContainer(connectionFactory);
          messageListenerContainer.addQueueNames(queueName);
          messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

          return messageListenerContainer;
     }
}


4.4. Consumer Endpoint

Consumers는 메시지를 받기위해 producer(/queue/[name])가 사용한 동일 endpoint address에 접근합니다. 
엔드포인트는 각각의 이벤트들이 받은 메시지에 일치하는 이벤트들의 Flux를 리턴합니다. 


@GetMapping(value="/queue/{name}", produces= MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessageFromQueue(@PathVariable String name){

     final DestinationInfo d = destinationsConfig.getQueues().get(name);

     if(d == null){
          return Flux.just(ResponseEntity.notFound().build());
     }

     MessageListenerContainer mlc =                     
          messageListenerContainerFactory.createMessageListnerContainer(d.getRoutingKey());

     Flux<String> f = Flux.<String> create(emitter ->{
          mlc.setupMessageListener((MessageListener) m ->{
          String payload = new String(m.getBody());
          emitter.next(payload);
          });
          emitter.onRequest(v ->{
               mlc.start();
          });
          emitter.onDispose(()->{
               mlc.stop();
          });
     });

     return Flux.interval(Duration.ofSeconds(5))
          .map(v -> "No news is good news")
          .mergeWith(f);
}

Those dummy messages play an important function in our case:
without them, we’d only detect a client disconnection upon receiving a message and failing to send it,
which can take a long time depending on your particular use case.

4.5. Testing

Queue들은 destinations.queues.<name>를 사용해서 정의하는데,
<name>은 destination name으로 사용됩니다.
여기 단독 destination name으로 된 "NYSE" 라우팅 키와 RabbitMQ에 "nyse" exchange에 메시지를 보낼
"NYSE" 을 정의 했습니다.

한번 command line 또는 IDE에서 서버를 시작하면, 우리는 sending과  receiving 메시지를 시작할수 잇씁니다.
우리는 curl 유틸리티를 사용할 것입니다. 이건 공통 유틸리티입니다. Windows, Mac, Linux OS등에서 다 사용가능합니다.

아래 리스팅은 우리 destination에 메세지와 서버로부터의 예상된 응답을 어떻게 보내는지 보여줍니다. 

$ curl -v -d "Test message" http://192.168.11.62:8080/queue/NYSE
* About to connect() to 192.168.11.62 port 8080 (#0)
*   Trying 192.168.11.62...
* Connected to 192.168.11.62 (192.168.11.62) port 8080 (#0)
> POST /queue/NYSE HTTP/1.1
> User-Agent: curl/7.29.0
> Host: 192.168.11.62:8080
> Accept: */*
> Content-Length: 12
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 12 out of 12 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host 192.168.11.62 left intact
위 명령을 실행후에, 우리는 RabbitMQ에서 메시지를 수신되었고,
다음 명령을 발행(issuing)할 준비가 되었음을 검증 할 수 있습니다.

***"HTTP/1.1 202 Accepted"
The HyperText Transfer Protocol(HTTP) 202 Accepted  response status code indicates that
the request has been received but noy yet acted upon.
It is non-committal, meaning that there is no way for the HTTP to later send an asynchronous response 
indicating the outcome of processing the request.
It is intended for cases where another process or server handles the request, or for batch processing.

요청은 받았으나, 아직 실행되지 않았음을 나타냄.
비공식(non-committal)이고, 의미하는 것은  HTTP가 나중에 요청을 처리한 결과를 
나타내는 비동기 응답을 나중에 보낼 수 있는 방법을 의미함.
다른 프로세스 또는 서버가 요청을 처리하는 일 처리를 위해 사용됨.


$ sudo docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    1
IBOV    0

이제, 우리는 다음 명령으로 메시지를 읽을수 있죠.

$ sudo curl -v http://192.168.11.62:8080/queue/NYSE
* About to connect() to 192.168.11.62 port 8080 (#0)
*   Trying 192.168.11.62...
* Connected to 192.168.11.62 (192.168.11.62) port 8080 (#0)
> GET /queue/NYSE HTTP/1.1
> User-Agent: curl/7.29.0
> Host: 192.168.11.62:8080
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:Test message

data:No news is good news

data:No news is good news

data:No news is good news

... same message repeating every 5 secs
여러분이 보시기에도 말이죠.
사전에 저장된 메시지를 받고, 그 다음에 우리는 5초마다 더미 데이터를 받기 시작합니다.

만약 우리가  queue 리스팅하는 명령을 다시 해보면, 저장된 메시지가 없음을 알수 있습니다.



5. Scenario 2 : Publish-Subscribe


 또 다른 시나리오는 단일 메시지를 다수의 Consumers에게 보내져야 하는 Publish-Subscribe 패턴입니다.
RabbitMQ는 이러한 종류의 어플리케이션을 지원하는 두가지 타입의 exchange을 제공합니다. : 
Fan-out 과 Topic 입니다.

이들 두 종류간의 주요 차이점은 Topic은  등록시에 제공되는 라우팅 키 패턴(e.g. "alarm.mailserver.*")을 기반으로
받을 메시지들을 필터링 할 수 있다는 것입니다.
반면, 전자(Fan-out)는 단순히 유입되는 메시지를 모든 바운드 대기열에 복제합니다.

RabbitMQ는 또한 Header Exchanges를 지원합니다.
Header Exchanges는 보다 복잡한 메시지를 필터링하지만, 그것은 사용에 대해서는 본 아티클밖입니다. 

5.1. Destinations Setup

Pub/Sub destinations을 기동시(startup time)에 다른 @PostContruct 메소드와 함께 정의합니다.

@PostConstruct
public void setupTopicDestinations(){
     destinationsConfig.getTopics()
          .forEach((key, destination) -> {
               Exchange ex = ExchangeBuilder
               .topicExchange(destination.getExchange())
               .durable(true)
               .build();
          amqpAdmin.declareExchange(ex);
     });
}

5.2. Publisher Endpoint

모든 연결된 클라이언트들에게 보낼 메시지들을 포스팅 하기위해서
클라이언트들은 /topic/[name]에서 사용할수 있는 publisher endpoint를 사용할 것입니다.

앞선 시나리오에서와 같이, 우리는 @PostMapping을 사용합니다.
이것은 메세지를 보낸 후에 상태와 함께 Mono를 리턴합니다.


@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(@PathVariable String name, @RequestBody String payload){

      DestinationInfo d = destinationsConfig
     .getTopics()
     .get(name);

     if (d == null){
          return Mono.just( ResponseEntity.notFound().build() );
     }

     return Mono.fromCallable(() -> {
               amqpTemplate.convertAndSend(
                    d.getExchange()
                    , d.getRoutingKey()
                    ,payload
                 );

               return ResponseEntity.accepted().build();
     });
}
5.3. Subscriber Endpoint

Subscriber endpoint는 /topic/[name]에 위치해 있고, 연결된 클라이언트들에 대한 Flux 메시지들을 제공할 것입니다.
이들 메세지들은 받은 메시지들과 5초마다 생성된 더미 메시지들을 포함합니다.

@GetMapping(value = "/topic/{name}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
      DestinationInfo d = destinationsConfig
          .getTopics()
          .get(name);

     if (d == null) {
          return Flux.just(ResponseEntity.notFound().build());
     }

     Queue topicQueue = createTopicQueue(d);
     String qname = topicQueue.getName();
     MessageListenerContainer mlc 
          =messageListenerContainerFactory.createMessageListenerContainer(qname);
     Flux<String> f = Flux.<String> create(emitter -> {
          mlc.setupMessageListener((MessageListener) m -> {
               String payload = new String(m.getBody());
               emitter.next(payload);
          });
          emitter.onRequest(v -> {
               mlc.start();
          });
          emitter.onDispose(() -> {
               amqpAdmin.deleteQueue(qname);
               mlc.stop();
          });
     });

     return Flux.interval(Duration.ofSeconds(5))
          .map(v -> "No news is good news")
          .mergeWith(f);
}

이 코드들은 기본적으로 앞선 예제와 거의 동일하나, 아래와 같이 다른점을 가지고 있는데 
첫번째는, 새로운 큐를 모든 새로운 subscriber에 대해서 생성합니다.
독점적이고, 비영속성(non-durable) queue를 생성하기 위한 DestinationInfo 인스턴스의 정보를 사용하여,  
createTopicQueue() 메소드를 호출하는 것으로 실행합니다.
설정된 라우팅 키를 사용한 그때 Exchange에 바인딩합니다.


private Queue createTopicQueue(DestinationInfo destination) {

      Exchange ex = ExchangeBuilder
          .topicExchange(destination.getExchange())
          .durable(true)
          .build();
     amqpAdmin.declareExchange(ex);

     Queue q = QueueBuilder
          .nonDurable()
          .build();
     amqpAdmin.declareQueue(q);

     Binding b = BindingBuilder
          .bind(q)
          .to(ex)
          .with(destination.getRoutingKey())
          .noargs();
     amqpAdmin.declareBinding(b);

     return q;
}


두번째 다른점은 subscriber가 연결이 끊어지면 이번에는 Queue를 지우는 것을  
onDispose() 메소드에 전달하는  람다식에 있습니다.


5.4. Testing

위 시나리오를 테스트 하기 위해서는 topic destination을 우선 application.yml에 정의해야 합니다.


destinations:
queues:
NYSE:
exchange: nyse
routing-key: NYSE
IBOV:
exchange: ibov
routing-key: IBOV

topics:
weather:
exchange: alerts
routing-key: WEATHER
여기  /topic/weather 에서 이용할 topic endpoint를 정의 해놧습니다.
이 endpoint는 WEATHER 라우팅 키를 가지고 RabbitMQ에 alerts Exchange에 메시지를 포스팅하는데 사용됩니다.

서버를 켜시고, rabbitmqctl 명령을 사용해, exchange가 생성되었는지 확인할수 있습니다.

$ sudo docker exec rabbitmq rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.direct      direct
        direct
amq.rabbitmq.trace      topic
alerts  topic
우리 destination을 구독하는 몇몇 subscriber들을 command shell을 열고, 다음 명령을 출력해서 시작해봅시다.

* About to connect() to 192.168.11.62 port 7088 (#0)
*   Trying 192.168.11.62...
* Connected to 192.168.11.62 (192.168.11.62) port 7088 (#0)
> GET /topic/weather HTTP/1.1
> User-Agent: curl/7.29.0
> Host: 192.168.11.62:7088
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:No news is good news...

data:No news is good news...

마지막으로 아래 메세지를 publish 해봅시다.

$ curl -v -d "Hurricane approaching" http://192.168.11.62:7088/topic/weather
* About to connect() to 192.168.11.62 port 7088 (#0)
*   Trying 192.168.11.62...
* Connected to 192.168.11.62 (192.168.11.62) port 7088 (#0)
> POST /topic/weather HTTP/1.1
> User-Agent: curl/7.29.0
> Host: 192.168.11.62:7088
> Accept: */*
> Content-Length: 21
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 21 out of 21 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host 192.168.11.62 left intact
그럼 아래와 같이 

$ sudo docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        IBOV    queue   IBOV    []
        exchange        MCC     queue   MCC     []
        exchange        NYSE    queue   NYSE    []
        exchange        spring.gen-d7gS4docTGawJpW5X2mUJQ       queue   spring.gen-d7gS4docTGawJpW5X2mUJQ       []
        exchange        spring.gen-kRxAD8N1QwWsJPke3YLj3Q       queue   spring.gen-kRxAD8N1QwWsJPke3YLj3Q       []
alerts  exchange        spring.gen-d7gS4docTGawJpW5X2mUJQ       queue   WEATHER []
alerts  exchange        spring.gen-kRxAD8N1QwWsJPke3YLj3Q       queue   WEATHER []
ibov    exchange        IBOV    queue   IBOV    []
nyse    exchange        NYSE    queue   NYSE    []

Ctrl-C 를 Subscriber 쉘에서 치면, 게이트웨이는 클라이언트가 비연결되었다는 것을 감지하게 될것 이고,
이들의 바인딩은 모두 제거 될것 입니다.

6. Conclusion

이번 기사에서는, spring-amqp 모듈을 사용하여 RabbitMQ와 반응하는 
간단한 반응형 어플리케이션을 어떻게 만드는지에 대해서 설명했습니다.
코드 몇라인으로, Point-to-Point 와 Publish-Subscribe integration patterns을 
지원하는 HTTP-to-AMQP 게이트웨이를 만들수 있었습니다.
표준 Spring 기능들의 추가로 보안과 같은 추가적인 기능을 쉽게 확장할수 있습니다.

코드는 Github에서 확인 바랍니다.


음냐 여기까지 입니다.
날이 덥네요... 더워...