RabbitMQ Tutorial - 6. RPC

2017. 5. 17. 16:59DB&NoSQL/RabbitMQ


Remote Procedure Call (RPC)

두번째 튜토리얼에서 다수의 작업자들사이에 시간이 소요되는 업무에 대해서 분산처리하는 Work Queue에 대한 사용법에 대해서 배웠습니다.

원격 컴퓨터에서 함수를 호출하고 그 결과를 기다려야 할 필요가 있다면?
글쎄, 그런 좀 어려운 이야기 인데.이 패턴은 일반적으로 RPC 라고 알려진 패턴이죠. 

이 튜토리얼에서는  클라이언트 와 확장가능한 RPC 서버 -RPC 시스템을 빌드하기위해 RabbitMQ을 사용 할 것입니다.
분산처리할 가치가 있는 시간이 걸리는 업무를 갖고 있지 않기 때문에, 우리는  피보나치 수열(Fibonacci numbers)을 리턴하는 더미 RPC 서비스를 만들도록 하겠습니다.

1. Client interface

RPC 서비스를 사용할수 있는 방법에 대해서 설정하기 위해, 우리는 간단한 클라이언트 클래스를 만들도록 하겠습니다.
RPC 요청을 보내고, 응답을 받을 때까지 블록하는 call 이라는 메소드를 내놓을껍니다.


FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println("fib(4) is " + result);


A note on RPC

RPC는 컴퓨팅에서 매우 일반적인 패턴이지만. 가끔씩 비판도 받는다.
문제들은 프로그래머들이 함수콜이 로컬 인지 느린 RPC 인지를 주의하지 않을 때 발생한다.
그런 혼란들은 예상치 못한 시스템의 결과로 초래되고 불필요한 디버깅의 복잡함이 추가된다.
소프트웨어를 단순화하는 대신, 잘못 사용된 RPC는  유지할수 없는 스파게티 코드의 결과를 초래한다.  

그걸 염두해 두고, 아래 충고를 따르길 바랍니다.

함수 호출이 로컬인지 리모트인지를 명확하게 하도록 하세요.
여러분의 시스템을 문서화 하세요.
컴포넌트들을 사이의 의존관계를 명확하게 하세요
에러 케이스를 잘 관리하세요.
오랜 시간동안 RPC 서버가 다운되었을때 클라이언트는 어떻게 행동해야 하나?

의심되는 경우에는 RPC를 웬만하면 피하시고, 가능하면, 블로킹하는 RPC 대신에 비동기 파이프라인을 사용해야 하는게 좋습니다.
결과들은 비동기적으로 다음 계산 단계로  푸쉬되어집니다.


2.Callback queue

일반적으로 RabbitMQ로 RPC를 하는 것은 쉽다.
클라이언트는 요청 메시지를 보내고 서버는 응답메시지로 응답합니다.
응답을 받기 위해 우리는 'callback' 큐 주소를 요청과 함께 보낼 필요가 있습니다.
우린 기본 큐를 사용하겠습니다. ( 자바 클라이언트에만 있는)


callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties.Builder().replayTo(callbackQueueName).build();

channel.basicPublish("","rpc_queue", props, message.getBytes());

//.... then code to read a response message from the callback_queue ...


Message properties

AMQP 0-9-1 프로토콜은 메세지와 함께 전달되는 14가지 프로퍼티 셋을 사전에 정의 했습니다.
아래 내용을 제외하고, 대부분의 프로퍼티는 드물게 사용됩니다. 

deliveryMode : 영속적(2의 값을 포함)이거나 임시적(다른 값)으로  메시지를 표시합니다. 
                      두번째 튜토리얼에서 사용했던 이 속성을 기억하시겠습니까?
contentType : 인코인의 mime-Type을 작성하는데 사용합니다.
                      예를 들면 JSON 인코딩을 종종 사용하기 위해서, "application/json"으로 설정하면 좋습니다. 
replyTo : 일반적으로는 Callback queue 이름으로 사용되어 집니다.
correlationId : 요청들과 함께 RPC 응답들을 대비하는데 유용합니다.

아래 새로운 import 가 필요하겠습니다.

import com.rabbitmq.client.AMQP.BasicProperties;

3.Correlation Id 

위에서 제공된 메소드에, 우리는 모든 RPC request에 Callback queue을 생성하는 것을 제안했습니다.
그건 매우 비효율적이지만, 운좋게 더 나은 방법이 있습니다. 
클라이언트 마다 단독 Callback queue을 생성합시다.

새로운 이슈가 발생하는데, 그 큐에 응답을 받는 것은, 응답이 속한 요청이 명확하지 않습니다.
그것이 바로 correlationId 속성이 사용될 때 입니다.
매 요청에 유니크한 값을 세팅 할 것입니다.
그리고 나서, 우리가 콜백 큐 상에서 메시지를 받을 때, 우리는 이 property를 확인하고 
요청을 포함한 응답이 매칭할수 있을 것에 기반했습니다.
만약 모르는 correlationID 값을 봤을 땐, 우리는 안전하게 이 메시지를 버립니다. - 요건 우리 요청에 속하지 않습니다. 

왜 에러를  포함해 실패하는 것보다 차라리 콜백큐에 모르는 메시지를 무시한 것이 나은지를 궁금할 것이다.
서버측면에서 race 상태의 가능성 때문입니다.
비록 같지 않더라도, 응답을 우리에게 보낸 후,하지만 요청에 대한 acknowledgment 메시지를 보내기 전에 RPC 서버가 죽을 가능성이 있다.
만약 위와 같은 일이 발생하면 재시작한 RPC 서버는 요청 프로세스를 다시 진행 할 것입니다.
그게 바로 우리가 클라이언트 상에 중복 응답 처리를 우와하게 해야하는 이유이고, RPC는 이상적으로 멱등이 되어야만 한다.

Summary


RPC는 아래와 같이 동작할 것입니다.

Client가 기동되었을때, 익명의 독립적인 callback queue가 생성됩니다.
RPC request 동안에, Client는 두개의 properties를 가지고 메시지를 보냅니다. 
     - replyTo(callback queue에 Set), correlationId ( 모든 request에 unique 값을 Set)
RPC worker(aka:Server)는 queue에 requests를 기다린다.
request가 나타났을때, 작업을 하고replyTo 필드로부터 큐를 사용하는 클라이언트에게  돌려줄 결과와 함께 메시지를 보냅니다. 
 클라이언트는 callback queue 상에 데이터를 기다립니다.
메시지가 나타났을때, correlationId 속성을 체크합니다. 
만약  request로부터 값이 매칭되면, 어플케이션에 응답을 리턴합니다.


Putting it all  together

The Fibonacci task

private static int fib(int n){
     if (n == 0) return 0;
     if (n == 1) return 1;
     return fib(n-1) + fib(n-2);
}

피보나치 함수를 선언합니다.
유효한 정수 값이라고 가정합니다. ( 큰수들을 위한 작업을 하기 위한 것이라 기대하지 말자.그리고 아마도 가장 느린 재귀 구현 일것이다.)

RPCServer

public class RPCServer {

    private static final String RPC_QUEUE_NAME "rpc_queue";

    private static int fib(int n){
        if(n==0return 0;
        if(n==1return 1;
        return fib(n-1) + fib(n-2);
    }

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = null;
        try{
            connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
            channel.basicQos(1);

            System.out.println("[x] Awaiting RPC requests");

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTagEnvelope envelopeAMQP.BasicProperties properties, byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                            .Builder()
                            .correlationId(properties.getCorrelationId())
                            .build();

                    String response = "";

                    try{
                        String message = new String(body,"UTF-8");
                        int n = Integer.parseInt(message);

                        System.out.println(" [.] fib(" + message +")");
                        response += fib(n);
                    }catch (RuntimeException e){
                        System.out.println(" [.] " + e.toString());
                    }finally {
                        channel.basicPublish("",properties.getReplyTo()replyProps,response.getBytes("UTF-8"));

                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME,false,consumer);

            //loop to prevent reaching finally block
            while(true){
                try{
                    Thread.sleep(100);
                }catch (InterruptedException _ignore){}
            }

        }catch(IOException | TimeoutException e){
            e.printStackTrace();
        }finally {
            if(connection != null){
                try{
                    connection.close();
                }catch (Exception ignore){}
            }
        }
    }
}

서버코드는 오히려 수월하다 :

평소같이,  connection 과 channel을 생성하고, queue을 선언하는 것으로 시작합니다.
우리는 하나의 서버 프로세스보다 더 많이 구동하기를 원합니다.
다수의 서버상에 동일하게 부하를 퍼트리기 위해서는, 우리는 prefetchCount 세팅은 channel.basicQos 상에 세팅할 필요가 있습니다.
basicConsume은 업무를 처리하고 response을 되돌려주는 (DefaultConsumer ) 객체 형태로 콜백을 제공하는 Queue에 접근하는데 사용합니다.


RPCClient


public class RPCClient {

    private Connection connection;
    private Channel channel;
    private String requestQueueName "rpc_queue";
    private String replyQueueName;

    public RPCClient() throws IOException,TimeoutException{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel connection.createChannel();

        replyQueueName channel.queueDeclare().getQueue();
    }

    public String call(String message) throws IOException,InterruptedException{
        String corrID = UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrID)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("",requestQueueName,props,message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTagEnvelope envelopeAMQP.BasicProperties properties, byte[] body) throws IOException {
                if(properties.getCorrelationId().equals(corrID)){
                    response.offer(new String(body,"UTF-8"));
                }
            }
        });

        return response.take();
    }

    public void close() throws IOException{
        connection.close();
    }

    public static void main(String[] args) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try{
            fibonacciRpc = new RPCClient();

            System.out.println(" [x] Requesting fib(30)");
            response = fibonacciRpc.call("30");
            System.out.println("[.] Got '" + response +"'");
        }catch(IOException | TimeoutException | InterruptedException e){
            e.printStackTrace();
        }finally {
            if(fibonacciRpc != null){
                try{
                    fibonacciRpc.close();
                }catch (IOException _ignore){}
            }
        }
    }
}

클라이언트 코드는 좀 더 있습니다 :

connection과 channel을 만들고, replies 를 위해 독립적인 'callback' queue을 선언했습니다.
우린 'callback' queue를 구독합니다 그래서 RPC response들을 받을수 있습니다.
call 메소드는 실제 RPC 요청을 만듭니다.
여기, 유니크한 correlationId 수를 생성하는 것이 첫번째고 그걸 저장합니다. 
- DefaultConsumer안에 handleDelivery 에 대한 구현은 적절한 response를 잡기위한 값으로 사용합니다.
다음, 우리는 request 메시지를 게시합니다. 두개의 파라미터와 함게
 -  replyTo 와 correlationId
이때, sit back 하고 proper response 가 도착할때 까지 기다립니다.
consumer delivery handing이 분리된 쓰레드 상에 발생하면, 우리는 response가 도착하기 전에,
메인 쓰레드를 중기하기 위한 무언가가 필요합니다. 
BlockingQueue의 사용은 하나의 방법입니다.
여기 우리는 오직 하나의 response에 대기하는데 필요로한 capacity set을 1로 맞춘,  ArrayBlockingQueue를 생성하고 있습니다.
handleDelivery 메소드는 매번 소비된 response 메시지들을 위해 아주 간단한 일을 하고 있습니다.
correlationId가 우리가 찾고 있는  것인지를 체크합니다.
만약 찾으면, BlockingQueu에 응답을 넣어줍니다.
같은 시점에 메인 쓰레드는 BlockingQueue에서 부터  correlationID를 갖고 있는 response에 대해 대기하고 있습니다.
최종적으로 우리는 사용자에게 response를 돌려주게 됩니다.



여기에 표현된 설계는 오직 RPC 서비스 구현만 가능한 건 아니고, 몇가지 중요한 장점들도 갖고 있습니다.
혹시 RPC 서비스가 너무 느리다면, 다른 RPC서버를 구동하는 것만으로 스케일업 할수 있습니다.
두번째 RPCServer를 새로운 콘솔에 실행해보세요.
클라이언트 측면에서, RPC는 오직 한 메시지만 주고 받는 것을 요구합니다.
queueDelcare를 요구한것 같이, 어떤 동기화 호출이 없습니다.
결과적으로 RPC 클라이언트은 싱글 RPC요청에 대해서 하나의 네트워크 라운드 트립만 필요합니다.

우리 코드는 매우 단순하지만, 보다 복잡한 문제들을 해결하기 위해 사용하지 마세요. 아래와 같이

- 가동되는 서버가 없다면, 클라이언트 재동작은 어떻게 해야만 하나?
- 클라이언트는 RPC에 대한 타임아웃과 같은 종류를 가져야만 하나?
- 만약 서버가 엉망으로 동작하고, 예외를 발생시킨다면, 클라이언트에게 어떻게 전달해야 하나?
- 프로세스 처리 전에, 명확하지 않은 유입 메세지들(eg bounds,type 체킹)에 대한 방어 방법.


실험해보기 원하면, queues를 보기 위한 유용한 관리 UI을 찾을 것입니다.