RabbitMQ Tutorial -3. Publish / Subscribe

2017. 5. 16. 15:23DB&NoSQL/RabbitMQ

요약

메시지가 특정 큐에 추가되어야만 하나? 메시지가 많은 큐들에 추가되어야만 하나?
아니면 메시지가 버려져야 하나? 이것에 대한 규칙은 exchange type에 의해 정의된다.
queueDelcare()에 파라미터를 넣지 않았을때, 비영속적이고, 베타적이며, 자동 삭제되는 큐를 생성해준다. 
exchange와 큐와의 관계를 binding이라고 합니다.

앞서선, work queue를 만들었죠.
work queue은 정확하게 하나의 작업자에게 배달 된다는 것이다라는 것이 work queu의 뒤에 가정입니다.
이번에는 하나의 메시지를 여러개의 작업자에게 배달하는 것이다.
해당 패턴이 publish / subscribe 로 알려졌다.

해당 패턴을 설명하기 위해서, 간단한 로깅 시스템을 만들어 본다.
이것은 두개의 프로그램으로 구성되어 있는데, 첫번째는 로그 메시지를 방출하고,
두번째는 그것을 받아서 출력할 것이다.

이 로깅 시스템에서 매번 동작하는 receiver 프로그램의 복사본은 메시지를 받을 껀데,
하나의 receiver는 그 로그를 디스크에 지시하고, 동시에 다른 receiver는 스크린에 출력한다.

필수적으로, 게시된 로그 메시지는 모든 receiver들에게  방송 될 것이다. 



1. Exchanges

앞에서는 메시지를 큐로부터 주고 받았지만, Rabbit의 전체 메시징 모델을 소개 할 때가 되었다.

빠르게 이전 튜토리얼에서 봤던걸 보면,
producer는 메세지를 보내는 어플리케이션이다.
queue는 메시지를 저장하는 buffer 이다.
consumer는 메세지를 받는 사용자 어플리케이션이다.

RabbitMQ에서 메시징 모델의 중심생각은 Producer는 결코 어떤 메시지라도 큐에 직접 전송하지 않는다는 것이다.
실제 Producer는 메시지가 어떤 큐에 전달되었는지 여부를 꽤 자주 알지 못한다.
대신 Producer는 오직 메시지를 exchange에게 전송할 뿐이다.exchange는 매우 간단하다.
한 측면에서 exchange는 producer로 부터 메시지를 받고, 다른 측면에서 그 메시지를 queue에 건네준다.
exchange 는 전달받은 메시지를 가지고 무엇을 할지 정확하게 알아야만 한다.
메시지가 특정 큐에 추가되어야만 하나? 메시지가 많은 큐들에 추가되어야만 하나?
아니면 메시지가 버려져야 하나? 이것에 대한 규칙은 exchange type에 의해 정의된다.

사용가능한 exchange type들이 있답니다 :  direct  , topic , headers ,fanout.
우리는 마지막 fanout 에 집중 할 것이다.

channel.ExchangeDeclare("logs", "fanout");

fanout exchange 은 매우  간단하다. 이름에서 유추해 낼수 있다시피, 모든 메시지들을 알고 있는 모든 큐에게 전달한다는 뜻이다.
이것은 정확하게 우리의 로깅 시스템이 필요로 하는 것이다.


Listring exchanges 

서버상에 exchange들을 리스트업하기 위한 명령어는 :  rabbitmqctl list_exchanges
amq.* exchanges 와 default(unnamed) exchange가 기본으로 생성될것입니다. 이 시점에 이걸 사용할 필요가 없다.

Nameless exchange

이전장에서는 exchanges에 대해서는 아무것도 알지 못했지만, queue까진 메세지를 보낼수 있었다.
빈 문자열에 의해 구분되는 기본 exchange 를 사용하고 있기 때문에 가능했습니다. 

channel.basicPublish ("" , "hello" , null , message.getBytes());

첫번째 파라미터가  exchange 이름이다.


이제 우리는 우리의 이름있는 exchange에 게시 할 수 있게 되었다.

channel.basicPulish("logs","",null,message.getBytes());

2.Temporary queues

우리는 이전에 특정 이름을 갖는 큐를 사용했는데,
큐가 이름을 갖는 것은 아주 중요한데, 작업자에게 동일한 큐를 지정해야 하기 때문이였다.

하지만 우리의 로거의 경우에는 그렇지 않다.우리 로거의 부분이 아닌 모든 메시지를 듣기 원하지, 단지 메시지들의 교집합만 원하는게 아니다.
우리는 또한 오래된 로그가 아닌 현재 나오고 있는 로그에 관심이 있다.
이것을 해결하기 위해 우리는 2가지가 필요하다.

먼저 Rabbit에 연결할 때 마다 우리는 새롭고 빈 큐가 필요하다.
이렇게 하기 위해서 우리는 임의의 이름을 가지는 , 즉 서버가 우리를 위해 임의의 이름을 골라주는 큐를 생성할 것이다.
두번째로, 우리가 consumer의 연결을 종료하면 큐는 자동적으로 삭제되어야 한다.

자바 클라이언트에서는 우리가 queueDelcare()에 파라미터를 넣지 않았을때, 비영속적이고, 베타적이며, 자동 삭제되는 큐를 생성해준다. 

String queueName = channel.queueDeclare().getQueue()

이 시점에 queueName은 랜덤 큐 이름을 포함한다.
예를 들면, amp.gen-JzTY20BRgKO-HjmUJj0wLg 와 같이 보일 꺼랍니다.

3.Bindings 


이미 fanout exchange 와 큐를 생성했다.
우리 큐에게 메세지들을 전달하라고 Exchang에게 지시해야 합니다.
exchange와 큐와의 관계를 binding이라고 합니다.

channel.queueBind(queueName, "logs","");

logs 라는 Exchange는 우리 큐에게 메시지를 추가할 것입니다.

Listing bindings
 
사용하고 있는 바인딩 리스트를 보려면 : rabbitmqctl list_bindings



4.Putting it all together


로그 메세지들을 발생하는 producer 프로그램은 이전 튜토리얼과 크게 달라보이진 않습니다.
가장 큰 변화는 이름없는 exchange 대신에 우리 logs exchange에 메시지들을 게시하는 것입니다.
발송할때, routingKey를 적용할 필요가 있지만, fanout exchange인 경우에는 무시해도 됩니다.

EmitLog.class 

public class EmitLog {

    private static final String EXCHANGE_NAME"logs";

    public static void main(String[] args) throws Exception{
        ConnectionFactory  factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAMEBuiltinExchangeType.FANOUT);

        String message = getMessage(args);

        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
        System.out.println("[x] Sent '"+ message +"'");

        channel.close();
        connection.close();
    }

    private static String getMessage(String[] strings) {
        if(strings.length 1)
            return "info: Hello World!";

        return joinStrings(strings" ");
    }

    private static String joinStrings(String[] stringsString delimiter) {
        int length = strings.length;
        if(length == 0return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for(int i = i < length i++){
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }

}


ReceiveLogs.class

public class EmitLog {

    private static final String EXCHANGE_NAME"logs";

    public static void main(String[] args) throws Exception{
        ConnectionFactory  factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAMEBuiltinExchangeType.FANOUT);

        String message = getMessage(args);

        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
        System.out.println("[x] Sent '"+ message +"'");

        channel.close();
        connection.close();
    }

    private static String getMessage(String[] strings) {
        if(strings.length 1)
            return "info: Hello World!";

        return joinStrings(strings" ");
    }

    private static String joinStrings(String[] stringsString delimiter) {
        int length = strings.length;
        if(length == 0return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for(int i = i < length i++){
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }

}