오래간만에 Netty- (3) Writing a Time Server/Client

2017. 5. 22. 17:36Java/Netty

이번에 구현할 프로토콜은 TIME 프로토콜 입니다.
이전 예제와는 다르게, 32-Bit integer를 포함하는 메시지를 보냅니다.
어떠한 요청을 받는 것 없이, 메시지를 한번 보내고 연결이 끊어집니다. 
이번 예제에서는 어떻게 구성하고, 메시지를 보내고, 연결을 닫는지에 대해서 완벽하게 알아보도록 합시다.

받은 데이터를 무시하겠지만, 연결이 이뤄지자 마자 메시지를 보내야 하기에, 이번에는 ChannelRead() 메소드를 사용할수 없습니다.
대신에 , channelActive() 메소드를 오버라이드 해야 합니다. 
아래를 보시옵소서 

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
       
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
   
    @Override
    public void exceptionCaught(ChannelHandlerContext ctxThrowable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

1) 설명한것과 같이 channelActive() 메소드는 연결이 구성되고, 트래픽이 생성이 될때 발생됩니다.
   이 메소드에서 현재 시간을 나타내는 32-bit integer를 작성합니다.

2) 새로운 메시지를 보내기위해선, 메시지를 포함할 새로운 버퍼 할당이 필요합니다.
   32-bit integer를 작성하기 위해, 해당 범위가 적어도 4bytes의 ByteBuf가 필요합니다.
   ChannelHandlerContext.alloc() 메서드를 통해 ByteBufAllocator 를 얻고, 새로운 buffer를 할당합니다. 

3) 일반적으로, 구성된 메시지를 작성합니다.
   잠시만, filp은 어디 있나? NIO에서 메시지를 보내기전에 java.nio.ByteBuffer.flip()를 호출하지 않았었나요?
    ByteBuf는 그런 메소드를 가지고 있지 않다. 왜냐하면 그건 두개의 포인터를 가지고 있기 때문에..
    읽는 동작과 쓰는 동작 이렇게 두개죠.
    Writer 인덱스는 ByteBuf에 뭔가를 쓸때 늘어나고, 반면 reader 인덱스는 변하지 않는다.
   reader 인덱스와 writer 인덱스는 메시지가 시작되고 끝나는 곳은 각각 나타냅니다.

   반대로,  NIO buffer는 filp 메소드 호출이 없이는 메시지 내용이 시작하고 종료하는 곳을 알아낼 명확한 방법을 제공하지 않습니다.
   아무것도 없는 데이터나 잘못된 데이터를 보냈기 때문에, 버퍼를 플립하는 것을 까먹었을땐, 문제가 있을것입니다.
   그런 에러는 네티에서는 발생하지 않습니다. 이유는 다른 동작에 대해서 다른 포인터를 가지고 있기 때문입니다.
   flipping 없는 삶으로 여러분은 보다 윤택한 생활을 누릴수가 있습니다.

   기억해둬야할 또 다른 점은 ChannelHandlerContext.write()(그리고 writeAndFlush()) 메소드는 ChannelFuture를 리턴한다는 점입니다.
   ChannelFuture는 아직 발생하지 않은 I/O 동작을 나타냅니다.
   그 말은 어떤 요청된 동작은 아직 실행되지 않았을 것이란 이야기 인데, 이유는 네티상에서는 모든 동작들이 비동기적이기 때문입니다.
   예를 들어, 아래 코드는 메시지를 보내기 전이라도 연결은 닫힐것입니다.

Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
  그래서 ChannelFuture가 완성된 후에, write() 메소드에 의해 리턴되는  close() 메소드를 호출해야 합니다.
  그리고 write 동작이 완료될때 해당 리스터가 알려줍니다. 
  close()은 connection을 바로 닫지 않을 것이란 것과 ChannelFuture를 리턴한다는 것을 기억해주세요.

4) write 요청이 완료될때, 그땐 어떻게 우리가 통지할까요?
   리턴된 ChannelFuture 에 ChannelFutureListner를 추가함으로써 간단히 해결됩니다.
   여기, 동작이 완료될때 Channel를 닫으라는 익명 ChannelFutureListner를 생성합니다.
   미리 정의된 리스너를 사용해서 코드를 단순화 시킬수도 있습니다.

f.addListener(ChannelFutureListener.CLOSE);

테스트하기 위해서는, 우리 time server가 기대한대로 동작한다면, 여러분은 UNIX rdate 명령을 사용할 수 있습니다.

$ rdate -o <port> -p <host>

<port>는 메인 메소드에 명시해 놓은 포트 넘버이고, <host>는 일반적으로 localhost 입니다.

Writing a Time Client

DISCARD 나 ECHO 서버와는 다르게, TIME 프로토콜을 위해서는 클라이언트가 필요합니다.
이유는 사람이 32-bit binary data를 달력의 데이터로 변환할수 없기 때문이죠.
이번 섹션에서는, 서버가 정확하게 동작하는 것을 확실하게 하고 네티를 이용해 client를 작성하는 방법에 대해서 배워보도록 합시다.

네티에서 클라이언트와 서버간에 가장 크고 오직 다른 것은 다른 Boostrap 과 Channel 구현들이 사용되었다는 것입니다.
아래 코드를 보시죠


package io.netty.example.time;

public class TimeClient {
    public static void main(String[] argsthrows Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
       
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVEtrue); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel chthrows Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
           
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

1) Bootstrap 은 client-side 나 연결을 읽은 channel 과 같은 non-server 채널들에 대한 것을 제외하고는 ServerBootstrap과 비슷합니다.
2) 딱 한개의 EventLoopGroup을 명시했다면, boss group 과 worker group 두가지 모든 형태로 사용될 것입니다.
    Boss worker는 클라이언트 측면라도 사용되지 않습니다. 
3) NioServerSocketChannel 대신에 NioSocketChannel이 client-side Channel 생성을 위해 사용되고 있습니다.
4) ServerBootstrap 에서 사용했던 childOption()를 사용하지 않을 건데, client-side SocketChannel은 부모를 가지고 있지 않기 때문입니다.
5) bind() 메소드 대신에 connect() 메소드를 사용해야 합니다.

위에서 보듯이, 서버 사이드 코드와 정말 다르진 않습니다.
ChannelHandler 구현에 대해서 알고 싶습니까?
서버로 부터 32-bit integer로 받아서,, 인간이 읽기 가능한 포멧으로 변환하고, 변환된 시간을 출력하고, 연결을 닫아야 합니다.

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctxObject msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L* 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctxThrowable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

1) TCP/IP에서는, 네티는 ByteBuf안에 하나의 peer로 부터 보낸 데이터를 읽습니다.

매우 쉬워 보이고, 서버 사이드 예제와 그리 달라 보이지 않습니다.
그러나, 이 핸들러는 가끔 IndexOutofBoundsException을 발생하는 작업을 거절합니다.
다음 장에 왜 이런일이 일어나는지에 대해서 알아보도록 합시다.


TimeServer 

17:24:09.039 [nioEventLoopGroup-2-1] INFO io.netty.handler.logging.LoggingHandler - [id: 0x3d514ca4] REGISTERED
17:24:09.043 [nioEventLoopGroup-2-1] INFO io.netty.handler.logging.LoggingHandler - [id: 0x3d514ca4] BIND: 0.0.0.0/0.0.0.0:8007
17:24:09.047 [nioEventLoopGroup-2-1] INFO io.netty.handler.logging.LoggingHandler - [id: 0x3d514ca4, L:/0:0:0:0:0:0:0:0:8007] ACTIVE

TimeClient 127.0.0.1 8007 

Mon May 22 17:25:52 KST 2017

TimeServer

17:25:52.453 [nioEventLoopGroup-2-1] INFO io.netty.handler.logging.LoggingHandler - [id: 0x3d514ca4, L:/0:0:0:0:0:0:0:0:8007] RECEIVED: [id: 0xf6292158, L:/127.0.0.1:8007 - R:/127.0.0.1:10220]
17:25:52.467 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacity: 32768
17:25:52.467 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
17:25:52.467 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
17:25:52.487 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
17:25:52.490 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@3a93e877



Dealing with a Stream-based Transport

One Small Caveat of Socket Buffer

TCP/IP와 같이 스트림 기반 전송에서, 받은 데이터는 socket receive buffer 안에 저장됩니다.
불행하게도,  스트림 기반 전송의 버퍼는 패킷들의 큐가 아니고 bytes의 큐 입니다.
그 말인즉, 여러분이 두개의 메시지를 두개의 독립적인 패킷들로 보냈어도, OS는 두개의 메시지로 다루지 않고, 하나의 bunch of bytes 로 본다는 것입니다.
그래서 여러분이 읽을 것이 여러분의 원격 peer가 작성한 것이 정확하게 맞는지 보장하지 않는다란 뜻입니다.
예를 들면, OS의 TCP/IP 스택이 세 패킷을 받았다고 가정해봅시다.
  Three packets received as they were sent
스트림 기반 프로토콜의 일반적인 프로퍼티 때문에, 여러분에 어플리케이션에서는 아래 조각으로 쪼개진 형태로 위의 패킷들을 읽는 경우가 높다.
Three packets split and merged into four buffers

그래서 받은 부분은, 서버측인지 클라이언트 측인지 고려하지 않고(regardless), 어플리케이션 로직에 의해서 쉽게 이해될수 있는 좀 더 의미있는 프레임 또한 하나로 받은 데이터를 조합해야 합니다.
위의 예제의 경우에는, 받은 데이터는 아래와 같이 나눠져야 합니다.

Four buffers defragged into three

The First Solution

자, 다시 Time Client 예제로  되 돌아 가봅시다.
여기에는 약간의 문제를 가지고 있었습니다.
32-bit integer는 매우 작은 데이터 양이고, 가끔 쪼개지지 않습니다.
그러나 문제는 쪼개 질수 있다는 것 그리고 쪼개는 가능성은 트래픽 증가로 발전될것입니다.

가장 간단한 해결방법은 internal cumulative buffer 를 생성하는 것이고, 4 bytes 모드가 internal Buffer 안에 받아질때 까지 기다리는 것입니다.
문제를 해결한 수정된 TimeClientHandler  구현을 아래에 보여 드립니다.

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
   
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
   
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
   
    @Override
    public void channelRead(ChannelHandlerContext ctxObject msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
       
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L* 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
   
    @Override
    public void exceptionCaught(ChannelHandlerContext ctxThrowable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

1) ChannelHandler 는 두개의 라이프 사이클 리스터 메소드들을 가지고 있습니다. (handlerAdded() 와 handlerRemoved())
   오랜 시간동안 막지 않는 한, arbitrary (de)initialization 업무를 동작할수 있습니다.
2) 첫번째, 모든 받은 데이터는 buf 안으로 쌓아 올려야 합니다. (be cumulated) 
3) 그 다음, 핸들러는 buf가 충분한 데이터를 가지고 있는지를 체크해야 합니다.
   이 예제에서는  4bytes 이고, 실제 비즈니스 로직으로 진행해야 합니다.
  다른 방법으로(Otherwise), 네티는 channelRead() 메소드를 보다 많은 데이터가 올때 다시 호출 할것입니다.
  그리고 최종적으로 4 bytes는 쌓이게 될것입니다.

The Second Solution

첫번째 해결방법이 Time 클라이언트로 문제를 해결했어도, 수정된 handler가 그리 깔끔해 보이진 않습니다.
다양한 길이의 필드와 같이 다수의 필드들로 구성된 복잡한 프로토콜들을 생각해보세요.
여러분의 ChannelInboundHandler 구현은 아쭈 빠르게 유지할수 없게 됩니다.

여러분이 알아차렸겠지만, ChannelPipeline에는 하나 이상의 ChannelHandler를 추가할수 있습니다.
그래서, 여러분은 어플리케이션의 복잡성을 줄이기 위해 다수의 모듈이 포함된 ChannelHandler를
하나의(monolithic) ChannelHandler로 세분화 할수 있습니다. 
예를 들어, TimeClientHandler를 두개의 handler로 나눌수 있었습니다.
 - 파편화 이슈를 다루는 TimeDecoder , 그리고
 - TimeClientHandler의 초기화 간단 버전.

다행이도, 네티는 박스외 첫번째 handler를 작성하는데 도움을 주는, 확장된 클래스를 제공합니다. 

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctxByteBuf inList<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return// (3)
        }
       
        out.add(in.readBytes(4)); // (4)
    }
}
 1) ByteToMessageDecoder 는 파편화 이슈를 다루는데 쉽게 만들어주는 ChannelInboundHandler의 구현체입니다.
 2) ByteToMessageDecoder는  새로운 데이터를 받을 때 마다,
     내부적으로  유지된 cumulative buffer를 가지고 있는 decode() 메소드라고 불립니다.
3) decode()는 cumulative buffer안에 충분한 데이터가 없는 곳인 out에 아무것도 추가하지 않도록 결정할수 있습니다.
   ByteToMessageDecoder는 좀더 많은 받은 데이터가 있을 때, 다시 decode()를 호출 할수 있습니다.
4) decode()가 out에 객체를 추가하면, decoder는 성공적으로 메시지를 decode 되었다는 뜻입니다.
   ByteToMessageDecoder는 cumulative buffer 의 read 부분을 버를 것입니다.
    다중 메시지들을 디코드 할 필요가 없는 것을 기억하세요.
   ByteToMessageDecoder는 out에 아무것도 추가되지 않을 때까지, decode() 메소드를 호출하는 것을 유지 하고 있을 것입니다.

자, ChannelPipeline안에 추가하기 위한 다른 handler를 갖고 있는데, 
TimeClient 안에 ChannelInitializer  구현체를 수정해야 합니다.

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel chthrows Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

도전심이 강하다면, 그 이상의 decodcer로 간단히 하는 ReplayingDecoder를  시도할것입니다.
여러분은 보다 많은 정보를 위해 API 레퍼런스 에 상담할 필요가 있겠습니다.

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctxByteBuf inList<Object> out) {
        out.add(in.readBytes(4));
    }
}

추가적으로, 네티는 매우 쉽게 대부분의 프로토콜을 구현하도록 out-of-box를 제공하고, 단일(monolithic) 유지 가능하지 않은  핸들러 구현으로 마무리하는 것을 피하도록 돕고 있습니다.
보다 자세한 예제들에 대해서 , 아래 패키지들을 참고하도로 하세요.

- Binary protocol(io.netty.example.factorial) : http://netty.io/4.0/xref/io/netty/example/factorial/package-summary.html
- Text 기반 protocol(io.netty.example.telnet) : http://netty.io/4.0/xref/io/netty/example/telnet/package-summary.html


Speaking in POJO instead of ByteBuf

지금까지 모든 예제들은 프로토콜 메시지의 초기 데이터 구조로, ByteBuf를 사용해서 리뷰를 해왔다.
이번 섹션에서는, ByteBuf  대신에 POJO 를 사용해서 Time 프로토콜 서버와 클라이언트를 개선해보도록 하겠습니다.

여러분의 ChannelHandler에서 POJO를 사용하는 장점은 명백합니다.
여러분의 핸들러는 ByteBuf에서 정보를 추출한 코드를 분리함으로서 재사용성과 유지보수가 좀 더 수월해 주게 됩니다.
Time 클라이언트와 서버 예제에서 , 우리는 32-bit integer을 단지 읽는 것이지, 직접 ByteBuf 를 사용하는 주요 이슈는 아닙니다.
그러나, 실세계 프로토콜을 여러분이 구현하는 것으로의 분리를 만들기 위할 필요성을 찾을수 있을 것입니다.

첫번째, UnixTime 이라고 불리는 새로운 타입을 정의해봅시다.

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
   
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
   
    public UnixTime(long value) {
        this.value = value;
    }
       
    public long value() {
        return value;
    }
       
    @Override
    public String toString() {
        return new Date((value() - 2208988800L* 1000L).toString();
    }
}

ByteBuf 대신에 UnitTime를 생상하는 TimeDecoder로 이제 수정(revise) 할수 있습니다.

@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

수정된 decoder와 함께 TimeClientHandler는 더이상 ByteBuf를 사용하지 않습니다.

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

엄청 간단하고 우아하지 않습니까?
같은 기술이 서버상에도 적용될수 있습니다.
TimeServerHandler 도 우선 적용해보도록 합시다.

@Overridepublic void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}
자, 이제 빼놓은 부분은 ByteBuf 안에 UnixTime를 되돌려 변환하는 ChannelOutboundHandler의 구현체인 encoder 입니다.
decoder 작성하는 것보다 더 많이 쉽습니다. 이유는 패킷 분리(packeet fragmentation)을 다룰 필요가 없고, 메시지를 인코딩할 때 조합할 필요가 없기 때문이죠.

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctxObject msgChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}

1) 이 한줄에 꽤 중요한 것이 있습니다.
   첫째, as-os 기존 ChannelPromise를 전달합니다. Netty가 인코딩된 데이터가 wire 밖에 실제 작성되어질 때, 성공이나 실패냐를 표기하기 위해서
   둘째,  ctx.fulsh()를 호출하지 않았습니다.
         분리된 핸들러 메소드 인 void flush(ChannelHandlerContext ctx)가 있습니다.
         이건 flush() 동작을 오버라이드 하기 위한 목적으로 합니다.

보다 더 간단히 하기 위해서는, 여러분은 MessageToByteEncoder 사용을 만들수 있습니다.

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctxUnixTime msgByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

마지막 남은 작업은 TimeServerHandler 전, 서버단에 ChannelPipeline 안에  TimeEncoder를 추가하는 것입니다.
그리고 별거아닌 연습(trivial exercise)으로 남겨놓습니다. 


Shutting Down Your  Application

여러분이 만들어 놓은 모든 EventLoopsGroup들을 ShutdownGracefully()을 통해 셧다운하는 것으로 일반적으로 간단히 Netty 어플리케이션을 셧다운 시킬수 있습니다.
EventLoopGroup 이 완벽하게 종료되고, 그룹에 속한 모든 채널들이 닫혀질때, 여러분들에게 통지할 Future를 리턴합니다.

Summary

이 챕터에서는, 여러분은 Netty 상위상에서 네트워크 어플리케이션이 완전히 동작하는것을 만드는 방법에 대한 설명으로 짧은 네티 튜어를 했습니다.
다음 챕터에서는 Netty에 대한 보다 디테일한 정보가 있습니다.
io.netty.example pakcage 에서 Netty 예제들을 여러분들이 리뷰하면서 용기를 얻기를 바랍니다.

네티 커뮤니티는 항상 여러분의 질문과 아이디어들,Netty을 발전하고, 여러분들의 피드백에 기반한 문서들을 기다리고 있습니다.

마지막 기여는 19-May-2017