RabbitMQ Tutorial -2. Work Queues

2017. 5. 16. 14:53DB&NoSQL/RabbitMQ

정리하면 아래와 같다.

다수의 Consumer를 만들어 대기를 시키면... RabbitMQ에서는 Round-Robin 분배를 통해,
비어있는 Consumer에게 메시지를 보냅니다.

공정하게 분배 하는 설정도 있으며,
basicQos 메소드를 prefetchCount = 1 설정하면, RabbitMQ에게 작업자가 한번에 하나 이상의 메시지를 주지 않도록 한다.
Consumer는 RabbitMQ에게 ack(nowlgdegment)을 되돌려 주는데.
이는 특정 메시지가 처리되었음을 알려주고, RabbitMQ가 저장된 해당 메시지를 지울 수 있게 해 준다.

또한 RabbitMQ 서버가 죽어 있더라도.. 메시지들이 사라지지 않는 내구력을 보장해주고 있는데, 
RabbitMQ가 queue을 잊지 않기 위해, queue를 durable 로 선언해야 한다.


이번에는 work queue 를 만들어 봅시다.

첫번째 튜토리얼에서는 하나의 이름지어진  queue로 부터 메세지를 주고, 받고 하는 프로그램을 작성해 봤다.
이번에는 Work Queue를 생성할건데,, 다수의 작업자들사이에 시간이 소요되는 작업을 분배하는데 사용되어 진다.

Work Queues(aka : Task Queues) 이전에 메인 아이디어는 직접적으로 자원 집약적(resource-intensive) 업무을 즉시 하거나, 
그 작업을 완료하는데 까지 대기하는 것들을 피하기 위함이다.
대신 이후에 마치도록 업무를 스케줄하는 것입니다.
하나의 메시지로 업무를 캡슐화 하고, 그걸 큐에 보냅니다.
백그라운드에서 동작하는 Work process는 그 업무를 끄집어 내고 최종적으로 작업을 실행합니다.
많은 작업자들을 운영할때,위 업무들은 작업자들사이에 공유 될 것입니다.

해당 컨셉은 그것은 짧은 HTTP request windows 동안에 복잡한 업무를 다루기 불가능 한  웹 어플리케이션들에겐 아주 유용합니다. 

1.Preparation

앞서서 "Hello World!"가 포함된 메시지를 보냈는데, 이젠  복잡한 업무에 견디는 문자열들을 보낸것이 있을 것입니다.
우리는 리사이징 되는 이미지 또는 렌더링되는 pdf 파일들 과 같이 현실세계 업무를 가지고 있지 않습니다.
그래서 바쁜척 하는 것으로 속이도록 할겁니다. ( Thread.sleep() 함수를 사용해서요)
복잡성을 표현하기 위해, 문자열안에 마침표 개수를 넣을 예정인데;각각의 마침표는  1초가 소요되는 " work"로 간주할겁니다.
 
예를 들어 "Hello ..."와 같은 속임수 업무는 3초가 걸리겠죠 
커맨드 라인에서 보낸 임의의 메시지들을 허용하기 위해 Send.java 코드를 약간 수정합니다.
이 프로그램은 우리 work queue에게 작업을 스케줄을 걸껀데... 보시죠 





2.NewTask.class

public class NewTask {
    private static final String TASK_QUEUE_NAME "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = getMessage(argv);

        channel.basicPublish(""TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }

    private static String getMessage(String[] strings) {
        if (strings.length 1)
            return "Hello World!. This is my style.... hahaha";
        return joinStrings(strings" ");
    }

    private static String joinStrings(String[] stringsString delimiter) {
        int length = strings.length;
        if (length == 0return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1i < lengthi++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}


3.Worker.class

public class Worker {

    private static final String TASK_QUEUE_NAME "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTagEnvelope envelopeAMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body"UTF-8");

                System.out.println(" [x] Received '" + message + "'");
                try {
                    doWork(message);
               finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            System.out.println(ch);
            if (ch == '.') {
                try {
                    Thread.sleep(5000);
               catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}


4. Round-robin dispatching (라운드-로빈 분배하기)

Task queue를 사용하는 이점 중에 하나는 쉽게 병렬 작업을 할수 있는 능력이다.
작업의 잔무를 처리하고 있다면, 작업자를 좀 더 추가하여 쉽게 규모를 키울수 있다.
동시에 두개의 Worker 클래스를 실행하자. 이들은 모두 queue로부터 메시지를 얻으려고 하겠지만, 정확시 어떻게?
두개는 Work 프로그램 돌리고, 하나는 newTasks 를 게시하면, 순서대로 차근차근 나눠서 분배가 된다.

RabbitMQ는 기본적으로 순차적으로  consumer에게 메세지를 각각 전달한다. (이게 Round-Robin 방식이다.)


5.Message acknowledgement(메시지 승인)

작업을 수행하는 것은 몇초가 걸릴 수 있다.
만약 consumer 중에 하나가 긴 작업을 시작했고, 그 작업을 부분적으로 수행하고 죽는다면 무슨일이 발생할지 궁금하다.
현재 코드 상에서는 RabbitMQ가 메시지를 consumer에게 전달하면 즉시 메모리로부터 그 메시지를 제거한다.
이런 경우에 만약 consumer를 죽이면 진행중이던 그 메시지를 잃게 된다.
메시지를 결코 잃어버리지 않게 하기 위해서, RabbitMQ는 message acknowledgement를 제공한다.
Consumer는 RabbitMQ에게 ack(nowlgdegment)을 되돌려 주는데.
이는 특정 메시지가 처리되었음을 알려주고, RabbitMQ가 그 메시지를 지울 수 있게 해준다.

consumer가 ack 전송없이 죽는다면(channel 이 닫히거나, connection이 닫히거나, TCP connection 이 종료된다면)
RabbitMQ는 그 메시지가 완전하게 처리되지 않았다고 판단하여 그 메시지를 다시 큐잉한다.

메시지 타임아웃은 없다.
RabbitMQ는 consumer가 죽을 때 그 메시지를 재배달할 것이다.
이것은 메시지 처리에 매우매우 긴 시간이 필요할 때 유용하다.

message acknowledgement 는 기본적으로 동작하게 되어 있다.
autoAck 파라미터를 false 로 설정 이 기능을 껐었지..
 

channel.basicConsume(TASK_QUEUE_NAME, false, consumer);



*Forgotten acknowledgment

basicAct를 놓치는 일반적인 실수이다.쉬운 에러이나, 결과는 치명적이다.
여러분의 클라이언트가 나갔을때 ( 랜덤 재전송과 같이 보일지도 모른다) 메세지들은 제전송되겠지만, 
RabbitMQ은 어떤 unacked 메시지들을 릴리즈 할수 없을 정도로 메모리를 좀 더 많이 먹을 것이다.
이런 종류의 실수를 디버깅하기 위해서는 message_unacknowledged 필드를 프린트하는 rabbitmqctl 을 사용 하도록 하세요

rabbitmqctl list_queues name message_ready message_unacknowledged



6.Message durability(메시지 내구력)

consumer가 죽더라도 작업을 잃지 않는 방법을 배웠으나 , RabbitMQ 서버가 중지되면 모든 작업을 잃게 된다.
RabbitMQ가 queue을 잊지 않기 위해, queue를 durable 로 선언해야 한다.

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

위에 두번째 파라미터가 durable 이다.

위 명령이 올바르다 하더라도,  동작하지 않는건 내구성이 없는 hello queue을 이전에 선언했기 때문에,
RabbitMQ는 이미 존재하는 queue를 다른 파라미터로 재정의 하는 것을 허용하지 않는다.
다른 이름으로 해라.

그렇게 하면 이젠 RabbitMQ가 재시작되더라도 잃어버리지 않는다.

이번에는 메시지가 영속적이 되도록 해야한다.

channel.basicPublish(""TASK_QUEUE_NAME,
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes("UTF-8"));



Note on message persistence

persistent 라고 메시지를 표시했어도, 메시지를 잃어버리지 않겠다라고 완전 장담하지 않는다.
RabbitMQ가 비록 메세지를 디스크에 저장한다고 했어도, RabbitMQ가 메시지를 받고 그걸 저장하는데 까진, 아주 짧은 순간이다.
그래도 RabbitMQ은 fsync 를 모든 메시지에 하지 않는다. --- 캐시하기 위해 저장하고, 디스크에 정말 기록하지 않는다. 
Persistence guarantees 는 강하진 않지만, 우리 단순한 task queue에 대해서는 충분하다.
보다 강력한 guarantee가 필요하다면 publisher confirm(https://www.rabbitmq.com/confirms.html)를  사용해라.





7. Fair dispatch(공평한 분배) 
basicQos 메소드를 prefetchCount = 1 설정으로 사용할수 있다.
이것은 RabbitMQ에게 작업자가 한번에 하나 이상의 메시지를 주지 않도록 한다.


// accept only one unack-ed message at a time 
channel.basicQos(1);

 그렇게 되면 큐는 점점 채워지겠지.. 작업을 하는 중이라면 


acknowledgments 와 prefetchCount 를 사용하는 것은 여러분이 work queue를 셋업할수 있다.
내구력 옵션은 RabbitMQ를 재시작하더라도 작업이 유지시킨다.

그렇다....


끝...