[vert.x] Vert.x core examples - 3. Event bus examples

2016. 3. 25. 11:58Java/Vert.x


Event bus examples

이 예제들은 Vert.x 안에 event bus의 사용에 대한 설명을 하고 있습니다. 

Point to point

receiver 와 sender 사이에 point to point 메세징을 설명합니다.

receiver는 들어오는 메세지들을 위한 이벤트 버스상의 주소에 리스닝 합니다.
메세지를 받을 때, 메세지에 응답합니다.

sender는 매 초 그 주소상에 메세지를 보냅니다, 응답을 받을때, 그것에 대한 로그를 찍습니다. 

@Override
public void start() throws Exception {
    EventBus eb = vertx.eventBus();
    eb.consumer("ping-address"message -> {
        System.out.println("Received message: " + message.body());
        // Now send back reply
        message.reply("pong!");
    });
    System.out.println("Receiver ready!");
}




@Override
public void start() throws Exception {
    EventBus eb = vertx.eventBus();
    // Send a message every second
    vertx.setPeriodic(1000v -> {
        eb.send("ping-address""ping!"reply -> {
            if (reply.succeeded()) {
                System.out.println("Received reply " + reply.result().body());
           else {
                System.out.println("No reply");
            }
        });
    });
}

Java sender 와 receiver 를 여러분의 IDE 이나 컴앤드 라인에서 가동할수 있습니다.
컴앤드 라인에서는 다른 콘솔에서 -cluster 플래그를 사용해서 가동해야 합니다.

vertx run Receiver.java -cluster

vertx run Sender.java -cluster

-cluster 플래그는 이벤트 버스를   싱글 이벤트 버스 안으로 함께 네트워크상의 다른 Vert.x instance들을 클러스터하는 것이 가능토록한다.

Publish / Subscribe

이 예제는 receivers 와 sender 사이에 publish / subscribe 메세징을 설명합니다. 
pub/sub 메세징과 함께 여러분은 publishers로 부터 모든 메세지들을 받는다수의 subscriber를 갖을 수 있습니다. 

receiver 는 들어오는 메세지들을 위해 이벤트 버스 상에 주소를 리스닝합니다. 
메세지를 받았을 때, 로그를 남깁니다.

Sender는 메세지를 그 주소에 매초 마다 전달 하고, 그 응답을 받을 때 마다 로그를 남깁니다.



@Override
public void start() throws Exception {
    EventBus eb = vertx.eventBus();
    eb.consumer("news-feed"message -> System.out.println("Received news: " + message.body()));
    System.out.println("Ready!");
}




@Override
public void start() throws Exception {
    EventBus eb = vertx.eventBus();
    // Send a message every second
    vertx.setPeriodic(1000v -> eb.publish("news-feed""Some news!"));
}

정말 간단하다. 

Java sender 와 receiver 를 여러분의 IDE 이나 컴앤드 라인에서 가동할수 있습니다.
컴앤드 라인에서는 다른 콘솔에서 -cluster 플래그를 사용해서 가동해야 합니다.

vertx run Receiver.java -cluster

vertx run Sender.java -cluster

-cluster 플래그는 이벤트 버스를   싱글 이벤트 버스 안으로 함께 네트워크상의 다른 Vert.x instance들을 클러스터하는 것이 가능토록한다.

MessageCodec

이 예제는 어떠한 객체 타입이든지 send / publish / receive 하기 위한 custom MessageCodec 을 작성하는 법을 설명합니다.
여러분은  String 과 같은 원시 타입 뿐만 아니라 EventBus를 통해 custom data type objects 를 직접 전달 할 수 있다라는 의미 입니다.

이 예제에서는 두가지 타입의 receiver가 있습니다.
첫번째 것은 sender로 부터 배포된 local type 이고, 다른 하나는 cluster의 다른 인스턴스로 부터 런칭된 cluster-wide type 입니다. 
local EventBus 와 clustered EventBus 상에서 MessageCodec이 어떻게 다르게 동작하는지 볼 수 있습니다.


statusCode(int), resultCode(String), summary(String)의 멤버로 구성된 단순한 CustomMessage 입니다.
public class CustomMessage {
    private final int statusCode;
    private final String resultCode;
    private final String summary;

    public CustomMessage(int statusCode,String resultCode,String summary){
        this.statusCode = statusCode;
        this.resultCode = resultCode;
        this.summary = summary;
    }

    @Override
    public String toString(){
        final StringBuilder sb = new StringBuilder("CustomMessage{");
        sb.append("statusCode=").append(statusCode);
        sb.append(", resultCode='").append(resultCode).append('\'');
        sb.append(", summary='").append(summary).append('\'');
        sb.append('}');
        return sb.toString();
    }

    public int getStatusCode() {
        return statusCode;
    }

    public String getResultCode() {
        return resultCode;
    }

    public String getSummary() {
        return summary;
    }
}



MessageCodec 을 구현한 CustomMessageCodec 이다.

- encodeToWire : wire에 메세지를 마샬링 할 때 Vert.x에 의해 호출됩니다.
- decodeFromWire : wire로 부터 메세지를 디코딩 될 때 Vert.x에 의해 호출됩니다.
- transform : event bus를 건너 locally에서 메세지가 보내졌을때,
                          이 메소드는 보내진 S타입에서 받는 R타입로 메세지를 변환하기 위해서 호출되어 집니다.
- name : 코덱 이름. 각각의 코덱은 고유의 이름을 가져야 합니다.
                  이것은 메세지를 보낼때 코덱와 등록되지 않은 코덱들을 구분하기 위해 사용되어 집니다. 
- systemCodecID : 시스템 codec들을 구분하기 위해 사용합니다. 사용자 codec은 항상 -1를 리턴합니다.

public class CustomMessageCodec implements MessageCodec<CustomMessage CustomMessage>{

    /**
     * Called by Vert.x when marshalling a message to the wire.
     *
     * @param buffer        the message should be written into this buffer
     * @param customMessage the message that is being sent
     */
    @Override
    public void encodeToWire(Buffer bufferCustomMessage customMessage) {
        //Easiest ways is using JSON object
        JsonObject jsonToEncode = new JsonObject();
        jsonToEncode.put("statusCode"customMessage.getStatusCode());
        jsonToEncode.put("resultCode"customMessage.getResultCode());
        jsonToEncode.put("summary"customMessage.getSummary());

        //Encode object to string
        String jsonToStr = jsonToEncode.encode();

        //Length of JSON : is NOT characters count
        int length = jsonToStr.getBytes().length;

        //Write data into given buffer
        buffer.appendInt(length);
        buffer.appendString(jsonToStr);
    }

    /**
     * Called by Vert.x when a message is decoded from the wire.
     *
     * @param position    the position in the buffer where the message should be read from.
     * @param buffer the buffer to read the message from
     * @return the read message
     */
    @Override
    public CustomMessage decodeFromWire(int positionBuffer buffer) {
        //My custom message starting from this *position* of buffer
        int _pos = position;

        //Length of JSON
        int length = buffer.getInt(_pos);

        // Get JSON string by it's length
        // Jump 4 because getInt() == 4 bytes
        String jsonStr = buffer.getString(_pos+=4,_pos+=length);
        JsonObject contentJson = new JsonObject(jsonStr);

        //Get fields
        int statusCode = contentJson.getInteger("statusCode");
        String resultCode = contentJson.getString("resultCode");
        String summary = contentJson.getString("summary");

        //we can finally create custom message object
        return new CustomMessage(statusCode,resultCode,summary);
    }

    /**
     * If a message is sent <i>locally</i> across the event bus, this method is called to transform the message from
     * the sent type S to the received type R
     *
     * @param customMessage the sent message
     * @return the transformed message
     */
    @Override
    public CustomMessage transform(CustomMessage customMessage) {
        //If a message is sent *locally* across the event bus.
        //This example sends message just as is
        return customMessage;
    }

    /**
     * The codec name. Each codec must have a unique name. This is used to identify a codec when sending a message and
     * for unregistering codecs.
     *
     * @return the name
     */
    @Override
    public String name() {
        //Each codec must have a unique name.
        //This is used to identify a codec when sending a message and for unregistering codecs.
        return this.getClass().getSimpleName();
    }

    /**
     * Used to identify system codecs. Should always return -1 for a user codec.
     *
     * @return -1 for a user codec.
     */
    @Override
    public byte systemCodecID() {
        //Alway -1
        return -1;
    }
}



 1초 마다 eventBus에 clusterWideMessage를 던져서, 받으면... 응답 받았다고 로그를 찍죠.

LocalReceiver를 verticle로 배포합니다.
그리고 성공하면, 2초 간격으로 localMessage를 던져서, 받으면... 응답 받았다고 로그를 찍습니다.

@Override
public void start() throws Exception {
    EventBus eventBus = getVertx().eventBus();

    // Register codec for custom message
    eventBus.registerDefaultCodec(CustomMessage.class, new CustomMessageCodec());

    // Custom message
    CustomMessage clusterWideMessage = new CustomMessage(200"a00000001""Message sent from publisher!");
    CustomMessage localMessage = new CustomMessage(200"a0000001""Local message!");

    // Send a message to [cluster receiver] every second
    getVertx().setPeriodic(1000_id -> {
        eventBus.send("cluster-message-receiver"clusterWideMessagereply -> {
            if (reply.succeeded()) {
                CustomMessage replyMessage = (CustomMessage) reply.result().body();
                System.out.println("Received reply: "+replyMessage.getSummary());
           else {
                System.out.println("No reply from cluster receiver");
            }
        });
    });


    // Deploy local receiver
    getVertx().deployVerticle(LocalReceiver.class.getName()deployResult -> {
        // Deploy succeed
        if (deployResult.succeeded()) {
            // Send a message to [local receiver] every 2 second
            getVertx().setPeriodic(2000_id -> {
                eventBus.send("local-message-receiver"localMessagereply -> {
                    if (reply.succeeded()) {
                        CustomMessage replyMessage = (CustomMessage) reply.result().body();
                        System.out.println("Received local reply: "+replyMessage.getSummary());
                   else {
                        System.out.println("No reply from local receiver");
                    }
                });
            });

            // Deploy failed
       else {
            deployResult.cause().printStackTrace();
        }
    });
}




@Override
public void start() throws Exception {
    EventBus eventBus = getVertx().eventBus();

    // Does not have to register codec because sender already registered
/*eventBus.registerDefaultCodec(CustomMessage.class, new CustomMessageCodec());*/

    // Receive message
    eventBus.consumer("local-message-receiver"message -> {
        CustomMessage customMessage = (CustomMessage) message.body();

        System.out.println("Custom message received: "+customMessage.getSummary());

        // Replying is same as publishing
        CustomMessage replyMessage = new CustomMessage(200"a00000002""Message sent from local receiver!");
        message.reply(replyMessage);
    });
}





@Override
public void start() throws Exception {
    EventBus eventBus = getVertx().eventBus();

    //Register codec for custom message
    eventBus.registerDefaultCodec(CustomMessage.class, new CustomMessageCodec());

    //Receive message
    eventBus.consumer("cluster-message-receiver"message ->{
        CustomMessage customMessage = (CustomMessage) message.body();

        System.out.println("Custom Message received : " + customMessage.getSummary());

        //Replying is sam as publishing
        CustomMessage replyMessage = new CustomMessage(200"a00000002""Message sent from cluster receiver!");
        message.reply(replyMessage);
    });
}

Java sender 와 receiver 를 여러분의 IDE 이나 컴앤드 라인에서 가동할수 있습니다.
컴앤드 라인에서는 다른 콘솔에서 -cluster 플래그를 사용해서 가동해야 합니다.

vertx run ClusterReceiver.java -cluster

vertx run Sender.java -cluster

-cluster 플래그는 이벤트 버스를   싱글 이벤트 버스 안으로 함께 네트워크상의 다른 Vert.x instance들을 클러스터하는 것이 가능토록한다.