RabbitMQ Tutorial - 5. Topics

2017. 5. 17. 11:13DB&NoSQL/RabbitMQ

Topics : Receiving messages based on a pattern (topics)

이전 장에서는 우리 로깅 시스템을 발전시켰습니다.
오직 더미 방송하는 능력의 fanout Exchange를 사용하는 대신에
direct Exchange를 사용했고, 로그를 선택적으로 받는 가능성을 얻었습니다.

우리 시스템을 발전시키기 위해 direct Exchange를 사용하는 것 역시, 제한을 가지고 있습니다.
- 여러가지 조건(multiple criteria)에 기반한 라우팅을 할수 없습니다.

우리 로깅 시스템에서  오직 severity에 기반한것 외에도, 로그가 발행된 소스 상의 로그들도 구독하길 원할수도 있습니다. severity(info/warn/crit...) 와 facility(auth/cron/kern..) 둘에 기반으로 로그를 라우팅하는 syslog unit tool로 부터 이 컨셉을 알수도 있다.

그건 우리에 많은 유연성을 가져다 준다.
- 우리는 cron 에서 오는  critical error들 뿐만 아니라 'kern'에서 부터 오는 모든 로그들을 리스닝하길 원할수도 있습니다.


1. Topic exchange

topic Exchange에게 보낸 Message들은 임의의 routing_key를 가질수 없습니다.
- 마침표들로 구분되는  단어들의 리스트여야 합니다.
단어들은 어떤것도 가능하나, 일반적으로 메시지에 연결되는 어떤 특징을 명시합니다.
몇개의 유효한 라우팅 키 예제 : 'stock.usd.nyse' , 'nyse.vmw', 'quick.oragne.rabbit' 
여러분이 좋아하는 만큰 라우팅 키로 많은 단어가 있을수 있는데, 최대 255바이트 입니다.

바인딩 키 역시 같은 형식이어야 합니다.
topic Exchange의 로직은 direct Exchange와 비슷합니다.
-특정 라우팅 키와 갖고 보낸  메시지는 바인딩 키와 일치하는 모든 큐들에게 전달되어 질겁니다.  
하지만 바인딩 키를 위한 2개의 중요하고 특수한 경우가 있습니다.

 * (start)는  정확히 1개의 단어를 치환할 수 있습니다.
# (hash)는 0개나 여러개의 단어를 치환 할 수 있습니다.



이 예제에서 동물들을 설명하는 모든 메세지들을 보낼겁니다.
세개의 단어(두개의 마침표)로 구성한 라우팅 키를 포함해서 메세지를 보낼것입니다.
라우팅 키의 첫번째 단어는 속도를 묘사하고, 두번째는 색상을 , 세번째는 종을 명시할겁니다.
"<speed>.<colour>.<species>"

세개의 바인딩을 생성했습니다.
Q1 는 "*.orange.*" 바인딩 키로 바인딩 합니다.
Q2 는 "*.*.rabbit" 과 "lazy.#" 으로 바인딩 합니다.

이들 바인딩들은 아래와 같이 요약되어집니다.

Q1는 모든 orange색의 동물들에 관련있습니다.
Q2는 rabbits에 관련된 모든것과 게으른 동물들에 대한 모든것에 대해 듣고 싶습니다.

"quick.oragne.rabbit"으로 라우팅 키를 갖는 메세지는 두개의 큐에 전달될겁니다.
"lazy.oragne.elephant" 메시지 역시 두개의 큐에 전달될것입니다.
다른 한편 "quick.orange.fox"는 첫번째 큐에 가고, "lazy.brown.fox" 는 두번째 큐에만 갑니다.
"lazy.pink.rabbit"은 두번째큐에만 한번 갑니다. 비록 두개의 바인딩에 매칭되었어도 말이죠
"quick.brown.fox"은 어떤 바인딩과도 매칭되지 않았기에, 버려질겁니다.

만약 우리 계약을 깨고, 하나 또는 네개의 단어들을 갖는 메시지을 보내면 어떻게 될까?
"orange"  나 "quick.orange.male.rabbit" 과 같이?
글쎄, 이 메세지들을 어떤 바인딩과도 매칭이 안되어 있으니, 버려질 겁니다.

다른 한편으로 "lazy.orange.male.rabbit" 은 비록 4개의 단어를 갖었어도, 마지막 바인딩에 매칭되기 때문에,
두번째 큐에 전달 될겁니다.


Topic exchange

Topic Exchange는 강력하고 , 다른 exchange들과 같이 행동할수 있습니다.
큐가 "#"(hash) 바인딩 키를 바인딩하면, 라우팅 키를 무시하고, 모든 메시지를 받을 겁니다. - 마치 fanout Exchange  같이.
특수문자 "*"(star) 와 "#"(hash) 을 바인딩에 사용하지 않았을땐, topic Exchange는 하나의 direct Exchange와 같이 행동 할겁니다.



putting it all together

우리는 이제 앞선 로깅 시스템에 topic Exchange를 사용해보도록 합시다.
로그들의 라우팅 키가  "<facility>.<severity>" 두개의 단어를 갖다는 가정으로 동작하게 할겁니다.

EmitLogTopic


public class EmitLogTopic {

    private static final String EXCHANGE_NAME "topic_logs";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;

        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAMEBuiltinExchangeType.TOPIC);

            String routingKey = getRouting(args);
            String message = getMessage(args);

            channel.basicPublish(EXCHANGE_NAMEroutingKey, null, message.getBytes("UTF-8"));
            System.out.println("[x] Sent '" + routingKey + "':'" + message + "'");
       catch (Exception e) {
            e.printStackTrace();
       finally {
            if (channel != null) {
                try {
                    channel.close();
               catch (Exception ignore) {
                }
            }

            if (connection != null) {
                try {
                    connection.close();
               catch (Exception ignore) {
                }
            }

        }
    }

    private static String getMessage(String[] strings) {
        if(strings.length 2){
            return "Hello World!";
        }
        return joinStrings(strings," " 1);
    }

    private static String joinStrings(String[] stringsString delimiter, int startIndex) {
        int length = strings.length;
        if(length == 0return "";
        if(length < startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for(int i = startIndex+i < lengthi++){
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }

    private static String getRouting(String[] strings) {
        if(strings.length 1)
            return "anonymous.info";

        return strings[0];
    }


}



ReceiveLogTopic


public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME "topic_logs";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAMEBuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();

        if(args.length 1){
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for(String bindingKey : args){
            channel.queueBind(queueName,EXCHANGE_NAME,bindingKey);
        }

        System.out.println("[*] Waiting for message . To exit press Ctrl + C");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTagEnvelope envelopeAMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println(" [x] Received ' " + envelope.getRoutingKey() + "':'"+message+"'");
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}


ReceiveLogsTopic 의 인자값으로 라우팅 키를 넣어주고,
해당 라우팅 키에 맞게 EmitLogsTopic 을 넣어서 실행 시키면, 각각 라우팅 키와 일치되는 것만 선별적으로 출력되는 것을 확인할수 있다.