RabbitMQ Tutorial -1. Producer, Queue, Consumer의 정의와 Message Send & Receive

2017. 5. 16. 13:59DB&NoSQL/RabbitMQ

RabbitMQ의 튜토리얼을 읽어보도록 합니다


Hello World - The simplest thing that does something

1.Introduction

RabbitMQ는 메시지 브로커 입니다: 메세지들을 받고 전달합니다.
우체국과 같다고 볼수 있는데요, 여러분이 메일을 우체통에 넣을땐, 우체부가 수신자에게 메일을 배달해줄것이라 확신할수 있죠.
이런것처럼, RabbitMQ는 우체통, 우체국 그리고 우체부 랍니다.

RabbitMQ와 우체국가 가장 다른 점은 종이를 다루지 않고, 이진 데이터 덩어리인 메시지들을 받고, 저장하고 전달하는 것입니다.

RabbitMQ 와 일반적인 메시징은 몇가지 특정 용어(jargon : special words or expression that are used bya particular profession or group and are difficult for others to understand) 을 사용합니다.


Producer
Producing 은 보내는일 외에는 아무것도 의미하지 않습니다.
"메세지들을 보내는 프로그램은 producer 입니다"
Queue
Queue는 RabbitMQ안에 살고 있는 우체통에 대한 이름입니다.
메시지들은 RabbitMQ와 여러분의 어플리케이션들을 통해 흐르겠지만, 메시지들은 queue안에  단지 저장될수 있습니다. queue는 host의 메모리와 디스크 한계에 제약이 있을뿐입니다.
queue는 근본적으로 하나의 큰 메시지 공간입니다. 많은 producer들은 메시지들을 보낼수 있습니다. 메세지들은 하나의 queue로 가고, 많은 consumer들은 하나의 queue로부터 데이터를 받으려고 할수 있습니다. 
Consumer
Consuming 은 receiving에 비슷한 의미를 갖고 있습니다.
Consumer는 대부분 메시지들을 받기 위해 대기하고 있는 프로그램입니다.

producer. consumer 그리고 broker 는 같은 host 상에 존재할 필요는 없습니다. 
실제로 대부분은 어플리케이션이 그렇게 하고 있지 않답니다.

2."Hello World" 

두개의 자바 프로그래을 짜보도록 하겠는데,
우선 첫번째는 하나의 메시지를 보내는 producer 와 두번째는 메시지를 받고 출력하는 consumer 이 두가지를 만들어 봅시다.


사전에 필요로 한 Java Client 와 추가적인 디펜던시들을은 갖고 있어야 한답니다.


2-1.Sending


이제부터 우리의 메시지 게시자(publisher, sender)는 Send , 메시지 소비자 (consumer, receiver)는 Recv 라고 부를 겁니다.
publisher는 RabbitMQ에 연결하고, 싱글 메시지들을 보내고 난 뒤 나갑니다. 

public class Send {

    private final static Logger logger = LoggerFactory.getLogger(Send.class);
    private final static String QUEUE_NAME "hello";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection  = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message = "우리나라만세!!!";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));

        logger.info(" [x] Sent ' " + message + "'");

        channel.close();
        connection.close();
    }
}

connection은 소켓 연결을 추상화하고, 프로토콜 버전 협의, 인증 등등에 대한 관리합니다.
로컬머신, localhost에 브로커에 연결합니다.
혹 다른 머신상에 브로커에 연결하려면, name 와 ip address 로 변경하면 됩니다.
그리고 나서, 일을 처리하기 위한 대부분의 API가 포함된 채널을 생성합니다. 
우리는 우리에게 메시지를 보내기 위해 큐를 선언해야 합니다.
그리고 난 후에 우리는 queue에게 메세지를 게시 할 수 있습니다. 

큐를 선언하는 것은 멱등성(idempotent)입니다. 이는 이미 큐가 존재하지 않을 경우,  유일하게 만들어 져야 합니다.
메세지 내용은 바이트 배열 입니다. 그래서 여러분은 좋아하는 뭐든지 인코딩 할수 있습니다.

마지막으로, 채널과 커넥션을 닫아주면 끝

  • idempotent (멱등성)
    • 연산을 여러 번 적용하도라도 결과가 달라지 않은 성질을 멱등성이라고 한다.
      예를 들면, 읽기 전용 메서드와 같이 일반적으로 서버 측의 어떤 상태도 변경하지 못하는 메서드도 멱등이다.
    • denoting an element of a st that is unchanged in value when multiplied or otherwise operated on by itself.


Sending이 안될 경우에는 충분한 디스크 용량이 없어서 그럴것이다.(기본적으로 200MB 정도 여유가 필요하다고 합니다.)
disk_free_limit 을 어떻게 세팅하는지 여기서 http://www.rabbitmq.com/configure.html#config-items확인 해주세요

2-2. Receiving

publisher는 거기까지고,  consumer는 RabbitMQ로부터 메세지를 푸쉬받는다. 그래서 싱글 메시지를 게시하는 publisher와는 다르게,
우린 메세지들을 대기하기 위한 동작을 유지하고 메세지들을 출력할 것이야.


대부분 코드가 비슷하다.
DefaultConsumer 는 Consumer 인터페이스를 구현한 클래스이고, 우리는 서버에서 푸쉬된 메시지들을 채우는데 사용할 것입니다.

셋업은 publisher과 같다. 커넥션과 채널을 열고, 소비할 곳으로부터 큐를 선언합니다.
send 가 publish 하는 큐와 매치를 해야하는 하죠.

public class Recv {
    private final  static String QUEUE_NAME "hello";
    private final static Logger logger = LoggerFactory.getLogger(Recv.class);

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        logger.info(" [*] Waiting for messages. to exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTagEnvelope envelopeAMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                logger.info(" [x] Received ' " + message +"'");
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

마찬가지로, 큐를 역시 선언했다.
publisher 전에 consumer가 시작될지도 모르기 때문에, 큐에서 메시지들을 소비하기 전에, 큐를 존재하는지 확실하길 바라죠.

이제 막 서버에게 큐에서 부터 메세지들을 우리에게 전달해달라고 요청합니다.
비동기적으로  메시지를 우리에게 서버가 푸시하면, 우리가 그메시지들을 사용할 준비가 될 때까지,
메세지들을 버퍼할 객체의 형태안의 콜백을 제공합니다.

그게 DefaultConsumer 서브클래스가 하는 것입니다. 

*buffer : 어떤 장치에서 다른 장치로 데이터를 송신할 때 일어나느 시간의 차이나 데이터 흐름의 속도 차이를 조정하기 위하여
일시적으로 데이터를 기억시키는 장치???  
* buffer storage :  a part of RAM used for temporary storage of data that is waiting to be sent to a device.


Putting it all together

뭐 다 같이 놓고 테스트 해보면 됩니다.
IDE 쓰는 사람들은 대부분 Application 상에서 Running 해보시면 되겠죠?


그리고 난 이후에 테스트를 해보도록 합니다. 하지만 현재 큐에 대한 상태를 보고 싶은데, 보고 싶은 방법이 없네..
아래 명령을 통해 Queue 리스트 볼수 있다.
Send를 통해 "hello" 라는 큐에 100개의 메시지를 날려놓고 ... Receiver 를 실행하지 않고, 보면, "hello" 큐라는 것이 있고, 100개 있다고 나오네요.

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.9\sbin>rabbitmqctl list_queues
Listing queues ...
hello   0

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.9\sbin>rabbitmqctl list_queues
Listing queues ...
hello   100


끝!!!




'DB&NoSQL > RabbitMQ' 카테고리의 다른 글

RabbitMQ Tutorial - 6. RPC  (0) 2017.05.17
RabbitMQ Tutorial - 5. Topics  (0) 2017.05.17
RabbitMQ Tutorial -4. Routing  (0) 2017.05.16
RabbitMQ Tutorial -3. Publish / Subscribe  (0) 2017.05.16
RabbitMQ Tutorial -2. Work Queues  (0) 2017.05.16
Messaging with RabbitMQ  (0) 2017.05.11