웹소켓 업그레이드가 요청된 경우 참조 카운트를 retain 메서드를 통해 증가시키고 다음 핸들러인 WebSocketServerProtocolHandler로 넘긴다.
이외의 경로로 요청이 들어온 경우 index.html의 내용을 전송해야 한다. 암호화나 압축 요청이 없는 경우 DefaultFileRegion으로 담아 제로 카피 형태로 효율적으로 전송하도록 한다.
응답의 끝에는 LastHttpContent를 기록하고 flush하여 FullHttpResponse 형태로 응답을 처리하도록 한다.
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class
.getProtectionDomain()
.getCodeSource().getLocation();
try {
String path = location.toURI() + "index.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException(
"Unable to locate index.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
public void channelRead0(ChannelHandlerContext ctx,
FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.getUri())) {
ctx.fireChannelRead(request.retain());
} else {
if (HttpHeaders.is100ContinueExpected(request)) {
send100Continue(ctx);
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r");
HttpResponse response = new DefaultHttpResponse(
request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(
HttpHeaders.Names.CONTENT_TYPE,
"text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) {
response.headers().set(
HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set( HttpHeaders.Names.CONNECTION,
HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response);
if (ctx.pipeline().get(SslHandler.class) == null) {
ctx.write(new DefaultFileRegion(
file.getChannel(), 0, file.length()));
} else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = ctx.writeAndFlush(
LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
웹소켓 프레임 처리
TextWebSocketFrame을 처리하는 핸들러로, 채널 그룹 내의 모든 웹소켓 연결을 추적한다.
userEventTriggered 메서드를 재정의하여 핸드쉐이크가 성공했다는 이벤트를 받으면, 기존에 있던 HttpRequestHandler를 파이프라인에서 제거하고, 모든 웹소켓 클라이언트에 새로운 클라이언트가 연결되었음을 알리고, 새로운 웹소켓 채널을 group에 추가한다.
TextWebSocketFrame를 수신하면 retain()를 통해 참조 카운트를 증가시키고 group에 전송하여 모든 웹소켓 채널에서 데이터를 받도록 한다.
참조 카운트를 증가시키는 이유는 channelRead0 메서드가 반환될 때 참조 카운트가 감소하는데, writeAndFlush 메서드는 비동기이기 때문에 나중에 수행될 때 해제되어버린 데이터에 접근하지 않도록 하기 위함이다.
public class TextWebSocketFrameHandler
extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ChannelGroup group;
public TextWebSocketFrameHandler(ChannelGroup group) {
this.group = group;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx,
Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler
.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.pipeline().remove(HttpRequestHandler.class);
group.writeAndFlush(new TextWebSocketFrame(
"Client " + ctx.channel() + " joined"));
group.add(ctx.channel());
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception {
group.writeAndFlush(msg.retain());
}
}
SSL 암호화 추가
채널파이프라인 초기화 클래스의 생성자에 SslContext를 입력받고 SslHandler 객체를 생성해 채널 파이프라인의 맨 앞에 추가하면 된다.
public class SecureChatServerInitializer extends ChatServerInitializer {
private final SslContext context;
public SecureChatServerInitializer(ChannelGroup group,
SslContext context) {
super(group);
this.context = context;
}
@Override
protected void initChannel(Channel ch) throws Exception {
super.initChannel(ch);
SSLEngine engine = context.newEngine(ch.alloc());
engine.setUseClientMode(false);
ch.pipeline().addFirst(new SslHandler(engine));
}
}
아래는 SslContext 객체를 사용해 채널파이프라인 초기화 클래스인 SecureChatServerInitializer의 객체를 생성하여 부트스트랩에서 사용하도록 하는 코드이다.
public class SecureChatServer extends ChatServer {
private final SslContext context;
public SecureChatServer(SslContext context) {
this.context = context;
}
@Override
protected ChannelInitializer<Channel> createInitializer(
ChannelGroup group) {
return new SecureChatServerInitializer(group, context);
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Please give port as argument");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContext context = SslContext.newServerContext(
cert.certificate(), cert.privateKey());
final SecureChatServer endpoint = new SecureChatServer(context);
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}