[vert.x] Vert.x core examples - 3. Event bus examples
2016. 3. 25. 11:58ㆍJava/Vert.x
Event bus examples
이 예제들은 Vert.x 안에 event bus의 사용에 대한 설명을 하고 있습니다.
Point to point
receiver 와 sender 사이에 point to point 메세징을 설명합니다.
receiver는 들어오는 메세지들을 위한 이벤트 버스상의 주소에 리스닝 합니다.
메세지를 받을 때, 메세지에 응답합니다.
sender는 매 초 그 주소상에 메세지를 보냅니다, 응답을 받을때, 그것에 대한 로그를 찍습니다.
@Override public void start() throws Exception { EventBus eb = vertx.eventBus(); eb.consumer("ping-address", message -> { System.out.println("Received message: " + message.body()); // Now send back reply message.reply("pong!"); }); System.out.println("Receiver ready!"); } |
java event bus sender : https://github.com/vert-x3/vertx-examples/blob/master/core-examples/src/main/java/io/vertx/example/core/eventbus/pointtopoint/Sender.java
@Override public void start() throws Exception { EventBus eb = vertx.eventBus(); // Send a message every second vertx.setPeriodic(1000, v -> { eb.send("ping-address", "ping!", reply -> { if (reply.succeeded()) { System.out.println("Received reply " + reply.result().body()); } else { System.out.println("No reply"); } }); }); } |
Java sender 와 receiver 를 여러분의 IDE 이나 컴앤드 라인에서 가동할수 있습니다.
컴앤드 라인에서는 다른 콘솔에서 -cluster 플래그를 사용해서 가동해야 합니다.
vertx run Receiver.java -cluster vertx run Sender.java -cluster |
-cluster 플래그는 이벤트 버스를 싱글 이벤트 버스 안으로 함께 네트워크상의 다른 Vert.x instance들을 클러스터하는 것이 가능토록한다.
Publish / Subscribe
이 예제는 receivers 와 sender 사이에 publish / subscribe 메세징을 설명합니다.
pub/sub 메세징과 함께 여러분은 publishers로 부터 모든 메세지들을 받는다수의 subscriber를 갖을 수 있습니다.
receiver 는 들어오는 메세지들을 위해 이벤트 버스 상에 주소를 리스닝합니다.
메세지를 받았을 때, 로그를 남깁니다.
Sender는 메세지를 그 주소에 매초 마다 전달 하고, 그 응답을 받을 때 마다 로그를 남깁니다.
Java event bus pubsub receiver : https://github.com/vert-x3/vertx-examples/blob/master/core-examples/src/main/java/io/vertx/example/core/eventbus/pubsub/Receiver.java
@Override public void start() throws Exception { EventBus eb = vertx.eventBus(); eb.consumer("news-feed", message -> System.out.println("Received news: " + message.body())); System.out.println("Ready!"); } |
Java event bus pubsub sender:https://github.com/vert-x3/vertx-examples/blob/master/core-examples/src/main/java/io/vertx/example/core/eventbus/pubsub/Sender.java
@Override public void start() throws Exception { EventBus eb = vertx.eventBus(); // Send a message every second vertx.setPeriodic(1000, v -> eb.publish("news-feed", "Some news!")); } |
정말 간단하다.
Java sender 와 receiver 를 여러분의 IDE 이나 컴앤드 라인에서 가동할수 있습니다.
컴앤드 라인에서는 다른 콘솔에서 -cluster 플래그를 사용해서 가동해야 합니다.
vertx run Receiver.java -cluster vertx run Sender.java -cluster |
-cluster 플래그는 이벤트 버스를 싱글 이벤트 버스 안으로 함께 네트워크상의 다른 Vert.x instance들을 클러스터하는 것이 가능토록한다.
MessageCodec
이 예제는 어떠한 객체 타입이든지 send / publish / receive 하기 위한 custom MessageCodec 을 작성하는 법을 설명합니다.
여러분은 String 과 같은 원시 타입 뿐만 아니라 EventBus를 통해 custom data type objects 를 직접 전달 할 수 있다라는 의미 입니다.
이 예제에서는 두가지 타입의 receiver가 있습니다.
첫번째 것은 sender로 부터 배포된 local type 이고, 다른 하나는 cluster의 다른 인스턴스로 부터 런칭된 cluster-wide type 입니다.
local EventBus 와 clustered EventBus 상에서 MessageCodec이 어떻게 다르게 동작하는지 볼 수 있습니다.
Java event bus custom message : https://github.com/vert-x3/vertx-examples/blob/master/core-examples/src/main/java/io/vertx/example/core/eventbus/messagecodec/util/CustomMessage.java
statusCode(int), resultCode(String), summary(String)의 멤버로 구성된 단순한 CustomMessage 입니다.
public class CustomMessage { private final int statusCode; private final String resultCode; private final String summary; public CustomMessage(int statusCode,String resultCode,String summary){ this.statusCode = statusCode; this.resultCode = resultCode; this.summary = summary; } @Override public String toString(){ final StringBuilder sb = new StringBuilder("CustomMessage{"); sb.append("statusCode=").append(statusCode); sb.append(", resultCode='").append(resultCode).append('\''); sb.append(", summary='").append(summary).append('\''); sb.append('}'); return sb.toString(); } public int getStatusCode() { return statusCode; } public String getResultCode() { return resultCode; } public String getSummary() { return summary; } } |
Java event bus custom message codec : https://github.com/vert-x3/vertx-examples/blob/master/core-examples/src/main/java/io/vertx/example/core/eventbus/messagecodec/util/CustomMessageCodec.java
MessageCodec 을 구현한 CustomMessageCodec 이다.
- encodeToWire : wire에 메세지를 마샬링 할 때 Vert.x에 의해 호출됩니다.
- decodeFromWire : wire로 부터 메세지를 디코딩 될 때 Vert.x에 의해 호출됩니다.
- transform : event bus를 건너 locally에서 메세지가 보내졌을때,
이 메소드는 보내진 S타입에서 받는 R타입로 메세지를 변환하기 위해서 호출되어 집니다.
- name : 코덱 이름. 각각의 코덱은 고유의 이름을 가져야 합니다.
이것은 메세지를 보낼때 코덱와 등록되지 않은 코덱들을 구분하기 위해 사용되어 집니다.
- systemCodecID : 시스템 codec들을 구분하기 위해 사용합니다. 사용자 codec은 항상 -1를 리턴합니다.
public class CustomMessageCodec implements MessageCodec<CustomMessage , CustomMessage>{ /** * Called by Vert.x when marshalling a message to the wire. * * @param buffer the message should be written into this buffer * @param customMessage the message that is being sent */ @Override public void encodeToWire(Buffer buffer, CustomMessage customMessage) { //Easiest ways is using JSON object JsonObject jsonToEncode = new JsonObject(); jsonToEncode.put("statusCode", customMessage.getStatusCode()); jsonToEncode.put("resultCode", customMessage.getResultCode()); jsonToEncode.put("summary", customMessage.getSummary()); //Encode object to string String jsonToStr = jsonToEncode.encode(); //Length of JSON : is NOT characters count int length = jsonToStr.getBytes().length; //Write data into given buffer buffer.appendInt(length); buffer.appendString(jsonToStr); } /** * Called by Vert.x when a message is decoded from the wire. * * @param position the position in the buffer where the message should be read from. * @param buffer the buffer to read the message from * @return the read message */ @Override public CustomMessage decodeFromWire(int position, Buffer buffer) { //My custom message starting from this *position* of buffer int _pos = position; //Length of JSON int length = buffer.getInt(_pos); // Get JSON string by it's length // Jump 4 because getInt() == 4 bytes String jsonStr = buffer.getString(_pos+=4,_pos+=length); JsonObject contentJson = new JsonObject(jsonStr); //Get fields int statusCode = contentJson.getInteger("statusCode"); String resultCode = contentJson.getString("resultCode"); String summary = contentJson.getString("summary"); //we can finally create custom message object return new CustomMessage(statusCode,resultCode,summary); } /** * If a message is sent <i>locally</i> across the event bus, this method is called to transform the message from * the sent type S to the received type R * * @param customMessage the sent message * @return the transformed message */ @Override public CustomMessage transform(CustomMessage customMessage) { //If a message is sent *locally* across the event bus. //This example sends message just as is return customMessage; } /** * The codec name. Each codec must have a unique name. This is used to identify a codec when sending a message and * for unregistering codecs. * * @return the name */ @Override public String name() { //Each codec must have a unique name. //This is used to identify a codec when sending a message and for unregistering codecs. return this.getClass().getSimpleName(); } /** * Used to identify system codecs. Should always return -1 for a user codec. * * @return -1 for a user codec. */ @Override public byte systemCodecID() { //Alway -1 return -1; } } |
Java event bus sender : https://github.com/vert-x3/vertx-examples/blob/master/core-examples/src/main/java/io/vertx/example/core/eventbus/messagecodec/Sender.java
1초 마다 eventBus에 clusterWideMessage를 던져서, 받으면... 응답 받았다고 로그를 찍죠.
LocalReceiver를 verticle로 배포합니다.
그리고 성공하면, 2초 간격으로 localMessage를 던져서, 받으면... 응답 받았다고 로그를 찍습니다.
@Override public void start() throws Exception { EventBus eventBus = getVertx().eventBus(); // Register codec for custom message eventBus.registerDefaultCodec(CustomMessage.class, new CustomMessageCodec()); // Custom message CustomMessage clusterWideMessage = new CustomMessage(200, "a00000001", "Message sent from publisher!"); CustomMessage localMessage = new CustomMessage(200, "a0000001", "Local message!"); // Send a message to [cluster receiver] every second getVertx().setPeriodic(1000, _id -> { eventBus.send("cluster-message-receiver", clusterWideMessage, reply -> { if (reply.succeeded()) { CustomMessage replyMessage = (CustomMessage) reply.result().body(); System.out.println("Received reply: "+replyMessage.getSummary()); } else { System.out.println("No reply from cluster receiver"); } }); }); // Deploy local receiver getVertx().deployVerticle(LocalReceiver.class.getName(), deployResult -> { // Deploy succeed if (deployResult.succeeded()) { // Send a message to [local receiver] every 2 second getVertx().setPeriodic(2000, _id -> { eventBus.send("local-message-receiver", localMessage, reply -> { if (reply.succeeded()) { CustomMessage replyMessage = (CustomMessage) reply.result().body(); System.out.println("Received local reply: "+replyMessage.getSummary()); } else { System.out.println("No reply from local receiver"); } }); }); // Deploy failed } else { deployResult.cause().printStackTrace(); } }); } |
Java event bus local receiver : https://github.com/vert-x3/vertx-examples/blob/master/core-examples/src/main/java/io/vertx/example/core/eventbus/messagecodec/LocalReceiver.java
@Override public void start() throws Exception { EventBus eventBus = getVertx().eventBus(); // Does not have to register codec because sender already registered /*eventBus.registerDefaultCodec(CustomMessage.class, new CustomMessageCodec());*/ // Receive message eventBus.consumer("local-message-receiver", message -> { CustomMessage customMessage = (CustomMessage) message.body(); System.out.println("Custom message received: "+customMessage.getSummary()); // Replying is same as publishing CustomMessage replyMessage = new CustomMessage(200, "a00000002", "Message sent from local receiver!"); message.reply(replyMessage); }); } |
Java event bus cluster-wide receiver : https://github.com/vert-x3/vertx-examples/blob/master/core-examples/src/main/java/io/vertx/example/core/eventbus/messagecodec/ClusterReceiver.java
@Override public void start() throws Exception { EventBus eventBus = getVertx().eventBus(); //Register codec for custom message eventBus.registerDefaultCodec(CustomMessage.class, new CustomMessageCodec()); //Receive message eventBus.consumer("cluster-message-receiver", message ->{ CustomMessage customMessage = (CustomMessage) message.body(); System.out.println("Custom Message received : " + customMessage.getSummary()); //Replying is sam as publishing CustomMessage replyMessage = new CustomMessage(200, "a00000002", "Message sent from cluster receiver!"); message.reply(replyMessage); }); } |
Java sender 와 receiver 를 여러분의 IDE 이나 컴앤드 라인에서 가동할수 있습니다.
컴앤드 라인에서는 다른 콘솔에서 -cluster 플래그를 사용해서 가동해야 합니다.
vertx run ClusterReceiver.java -cluster vertx run Sender.java -cluster |
-cluster 플래그는 이벤트 버스를 싱글 이벤트 버스 안으로 함께 네트워크상의 다른 Vert.x instance들을 클러스터하는 것이 가능토록한다.
'Java > Vert.x' 카테고리의 다른 글
[vert.x] Vert.x core examples - 5. Execute blocking & high Availability examples (0) | 2016.03.25 |
---|---|
[vert.x] Vert.x core examples - 4. Verticle examples (0) | 2016.03.25 |
[vert.x] Vert.x core examples - 2. Net Example (0) | 2016.03.25 |
[vert.x] Vert.x core examples - 1.시작 (0) | 2016.03.25 |
[vert.x] Maven Users (0) | 2016.03.22 |
비동기 네트워크 서버 프레임워크 Vert.x - 개요, 실습 (0) | 2016.03.18 |