RabbitMQ Tutorial -4. Routing

2017. 5. 16. 17:27DB&NoSQL/RabbitMQ

Routing : Receiving messages selectly

앞선 튜토리얼에서는 간단한 로깅 시스템을 만들었죠.
많은 수신자들에게 로그 메시지들을 방송할 수 있었습니다.

이 튜토리얼에서, 거기에 기능 한개를 추가하도록 할께요.
오직 메세지들의 부분집합(subset).. 특정 메시지들만 구독 가능하게 만들겁니다.
예를 들어, 로그 파일(디스크 영역에 저장하기 위해)에 치명적인 에러 메시지만 전달할수 있게 하고, 
반면, 모든 로그 메시지는 콘솔에 출력할께요.

1.Bidings

앞서서 바인딩은 만들었고, 여러분은 재호출 할 코드는 아래와 같습니다. 

channel.queueBind(queueName, EXCHANGE_NAME, "");

바인딩은 exchange 와 queue 간의 관계죠.
이건 "이 큐는 이 exchange로부터 온 메시지에 관심이 있답니다."라고 간단히 해석할 수 있겠습니다.

바인딩은 추가로 routingKey 파라미터를 갖습니다.
basic_publish 파라미터와 혼동을 피하기 위해, 우리는 binding key를 호출 할것입니다.
이것은 하나의 키를 갖는 바인딩을 생성하는 법입니다.

channel.queueBind(queueName, EXCHANGE_NAME, "black");

바인딩 키의 뜻은 exchange 타입에 따라 달라지겠습니다. 
우리가 앞서 사용한 fanout exchanges은 이 값을 무시합니다.

2. Direct exchange

이전 튜토리얼에서 로깅시스템은 모든 consumers들에게 모든 메시지를 방송합니다.
severity에 기반해서 메시지들이 필터링되도록하는 것을  확장하는 것이 필요합니다.
예를 들면, 치명적인 에러들만 받아 디스크에 로그 메세지를 작성하는 프로그램을 원할겁니다.
구지 경고 또는 정보 로그 메시지들로 디스크 공간을 낭비하고 싶지 않습니다.

우리는 많은 유연함을 주지 않은  fanout exchange를 사용했었지요 - 오로지 무관심하게 방송하는 능력만 있습니다.

대신 direct exchange을 사용할것입니다.
direct exchange 뒤에 라운팅 알고리즘은 간단합니다.
- 메시지는 큐의 binding key 와 메시지의 routing key가 정확하게 일치하는 큐로 이동합니다.
그걸 설명하기 위해 ,  아래 구성을 생각해보세요.


이 구성에서, direct Exchange 인 x 는 두개의 queue 와 바운드 되어 있는 걸 볼수 있습니다.
첫번째 큐는 바인딩키가 orange인 바운드 입니다.
그리고 두번째는 두개의 바인딩을 가지고 있는데, black 바인딩 키 와 green 바인딩 키 하나씩입니다.

이 구성에서, orange 라우팅키를 갖고 exchange에 게시한 메세지는 Q1인 큐에 라우팅 했을것입니다.
black 과 green 키를 라운팅키로 갖는 메시지들은 Q2로 갈겁니다.
그외 모든 다른 메시지들은 버렸을겁니다. 


3.Multiple bindings



같은 바인딩 키를 갖는 다수의 큐들에 바인드되는 건  완전 옳습니다. 
예제에서 우리는 X와 black 바인딩 키를 갖는 Q1 사이에 하나의 바인딩을 추가할수 있습니다. 
이 경우 direct exchange는 fanout과 같이 행동하고 모든 매칭하는 큐에게 메시지를 방송할것입니다.
black 라우딩 키를 갖는 메시지는 Q1 과 Q2 둘다 전달했을것입니다.

3.Emitting logs

우리는 이 모델을 우리 로깅시스템에 사용할것입니다.
fanout  대신에 우리는 direct exchange에 메시지를 보낼겁니다.
우리는 log severity 를 routing key로 적용할 것입니다.
그 방법, 받는프로그램은 severity를 선택할수 있을 겁니다. 받기를 원할 것입니다.
우선 emitting logs에 우선 집중합시다.

매번 우리는 exchange을 우선 생성할 필요가 있습니다. 

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

그리고 우리는 메시지를 보낼 준비를 합니다. 

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

대상들을 분류하기위해 , 'severity'는 'info', 'waring', 'error' 중에 하나게 될수 있다라고 가장할 것입니다.

4. Subscribing

메시지를 받는 것은 이전 튜토리얼과 같이 동작할것입니다. 단 하나를 제외하구요.
- 우리가 관심있는 각각의 severity에 대해 새로운 바인딩을 생성할 것입니다.

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
     channel.queueBind(queueName, EXCHANGE_NAME, serverity);
}

5. Putting it all together 





EmitLogDriect.class


package rabbitmq.tutorial04;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Created by dennis on 2017-05-15.
 */
public class EmitLogDirect {

    private static final String EXCHANGE_NAME "direct_logs";

    public static void main(String[] args) throws Exception{

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel= connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAMEBuiltinExchangeType.DIRECT);

        String severity = getSeverity(args);
        String message = getMessage(args);

        channel.basicPublish(EXCHANGE_NAMEseverity, null , message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "'  : '" + message +"'");

        channel.close();
        connection.close();
    }

    private static String getMessage(String[] args) {
        if(args.length 2)
            return "Hello World";
        return joinStrings(args" " 1);
    }

    private static String joinStrings(String[] argsString delimiter, int startIndex) {
        int length = args.length;
        if(length ==0return "";
        if(length < startIndex) return "";
        StringBuilder words = new StringBuilder(args[startIndex]);
        for(int i = startIndex +i <length i++){
            words.append(delimiter).append(args[i]);
        }
        return words.toString();
    }

    private static String getSeverity(String[] args) {
        if(args.length 1)
            return "info";
        return args[0];
    }


}


ReceiveeLogsDirect


public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME"direct_logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAMEBuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        if(args.length 1){
            System.err.println("Usage : ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for(String severity : args){
            channel.queueBind(queueName,EXCHANGE_NAME,severity);
        }
        System.out.println("[*] Waiting for message. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTagEnvelope envelopeAMQP.BasicProperties properties, byte[] body) throws IOException{
                String message = new String(body,"UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message +"'");
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}


파라미터 인자 값에 "[info]" 등을 넣어서 실행을 해보면 되겠죠?