Copy public static final class TextFrameHandler extends
SimpleChannelInboundHandler < TextWebSocketFrame > {
@ Override
public void channelRead0 ( ChannelHandlerContext ctx ,
TextWebSocketFrame msg) throws Exception {
// Handle text frame
}
}
public static final class BinaryFrameHandler extends
SimpleChannelInboundHandler < BinaryWebSocketFrame > {
@ Override
public void channelRead0 ( ChannelHandlerContext ctx ,
BinaryWebSocketFrame msg) throws Exception {
// Handle binary frame
}
}
public static final class ContinuationFrameHandler extends
SimpleChannelInboundHandler < ContinuationWebSocketFrame > {
@ Override
public void channelRead0 ( ChannelHandlerContext ctx ,
ContinuationWebSocketFrame msg) throws Exception {
// Handle continuation frame
}
}
Copy public class ChatServer {
private final ChannelGroup channelGroup =
new DefaultChannelGroup( ImmediateEventExecutor . INSTANCE ) ;
private final EventLoopGroup group = new NioEventLoopGroup() ;
private Channel channel;
public ChannelFuture start ( InetSocketAddress address) {
ServerBootstrap bootstrap = new ServerBootstrap() ;
bootstrap . group (group)
. channel ( NioServerSocketChannel . class )
. childHandler ( createInitializer(channelGroup) );
ChannelFuture future = bootstrap . bind (address);
future . syncUninterruptibly ();
channel = future . channel ();
return future;
}
protected ChannelInitializer < Channel > createInitializer (
ChannelGroup group) {
return new ChatServerInitializer(group) ;
}
public void destroy () {
if (channel != null ) {
channel . close ();
}
channelGroup . close ();
group . shutdownGracefully ();
}
public static void main ( String [] args) throws Exception {
if ( args . length != 1 ) {
System . err . println ( "Please give port as argument" );
System . exit ( 1 );
}
int port = Integer . parseInt (args[ 0 ]);
final ChatServer endpoint = new ChatServer() ;
ChannelFuture future = endpoint . start (
new InetSocketAddress(port) );
Runtime . getRuntime () . addShutdownHook ( new Thread() {
@ Override
public void run () {
endpoint . destroy ();
}
});
future . channel () . closeFuture () . syncUninterruptibly ();
}
}
Copy public class ChatServerInitializer extends ChannelInitializer < Channel > {
private final ChannelGroup group;
public ChatServerInitializer ( ChannelGroup group) {
this . group = group;
}
@ Override
protected void initChannel ( Channel ch) throws Exception {
ChannelPipeline pipeline = ch . pipeline ();
pipeline . addLast ( new HttpServerCodec() );
pipeline . addLast ( new ChunkedWriteHandler() );
pipeline . addLast ( new HttpObjectAggregator( 64 * 1024 ) );
pipeline . addLast ( new HttpRequestHandler( "/ws" ) );
pipeline . addLast ( new WebSocketServerProtocolHandler( "/ws" ) );
pipeline . addLast ( new TextWebSocketFrameHandler(group) );
}
}
Copy public class HttpRequestHandler extends SimpleChannelInboundHandler < FullHttpRequest > {
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler . class
. getProtectionDomain ()
. getCodeSource () . getLocation ();
try {
String path = location . toURI () + "index.html" ;
path = ! path . contains ( "file:" ) ? path : path . substring ( 5 );
INDEX = new File(path) ;
} catch ( URISyntaxException e) {
throw new IllegalStateException(
"Unable to locate index.html" , e) ;
}
}
public HttpRequestHandler ( String wsUri) {
this . wsUri = wsUri;
}
@ Override
public void channelRead0 ( ChannelHandlerContext ctx ,
FullHttpRequest request) throws Exception {
if ( wsUri . equalsIgnoreCase ( request . getUri ())) {
ctx . fireChannelRead ( request . retain ());
} else {
if ( HttpHeaders . is100ContinueExpected (request)) {
send100Continue(ctx) ;
}
RandomAccessFile file = new RandomAccessFile(INDEX , "r" ) ;
HttpResponse response = new DefaultHttpResponse(
request . getProtocolVersion() , HttpResponseStatus . OK ) ;
response . headers () . set (
HttpHeaders . Names . CONTENT_TYPE ,
"text/html; charset=UTF-8" );
boolean keepAlive = HttpHeaders . isKeepAlive (request);
if (keepAlive) {
response . headers () . set (
HttpHeaders . Names . CONTENT_LENGTH , file . length ());
response . headers () . set ( HttpHeaders . Names . CONNECTION ,
HttpHeaders . Values . KEEP_ALIVE );
}
ctx . write (response);
if ( ctx . pipeline () . get ( SslHandler . class ) == null ) {
ctx . write ( new DefaultFileRegion(
file . getChannel() , 0 , file . length()) );
} else {
ctx . write ( new ChunkedNioFile( file . getChannel()) );
}
ChannelFuture future = ctx . writeAndFlush (
LastHttpContent . EMPTY_LAST_CONTENT );
if ( ! keepAlive) {
future . addListener ( ChannelFutureListener . CLOSE );
}
}
}
private static void send100Continue ( ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion . HTTP_1_1 , HttpResponseStatus . CONTINUE ) ;
ctx . writeAndFlush (response);
}
@ Override
public void exceptionCaught ( ChannelHandlerContext ctx , Throwable cause)
throws Exception {
cause . printStackTrace ();
ctx . close ();
}
}
Copy public class TextWebSocketFrameHandler
extends SimpleChannelInboundHandler < TextWebSocketFrame > {
private final ChannelGroup group;
public TextWebSocketFrameHandler ( ChannelGroup group) {
this . group = group;
}
@ Override
public void userEventTriggered ( ChannelHandlerContext ctx ,
Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler
. ServerHandshakeStateEvent . HANDSHAKE_COMPLETE ) {
ctx . pipeline () . remove ( HttpRequestHandler . class );
group . writeAndFlush ( new TextWebSocketFrame(
"Client " + ctx . channel() + " joined" ) );
group . add ( ctx . channel ());
} else {
super . userEventTriggered (ctx , evt);
}
}
@ Override
public void channelRead0 ( ChannelHandlerContext ctx ,
TextWebSocketFrame msg) throws Exception {
group . writeAndFlush ( msg . retain ());
}
}
Copy public class SecureChatServerInitializer extends ChatServerInitializer {
private final SslContext context;
public SecureChatServerInitializer ( ChannelGroup group ,
SslContext context) {
super(group);
this . context = context;
}
@ Override
protected void initChannel ( Channel ch) throws Exception {
super . initChannel (ch);
SSLEngine engine = context . newEngine ( ch . alloc ());
engine . setUseClientMode ( false );
ch . pipeline () . addFirst ( new SslHandler(engine) );
}
}
Copy public class SecureChatServer extends ChatServer {
private final SslContext context;
public SecureChatServer ( SslContext context) {
this . context = context;
}
@ Override
protected ChannelInitializer < Channel > createInitializer (
ChannelGroup group) {
return new SecureChatServerInitializer(group , context) ;
}
public static void main ( String [] args) throws Exception {
if ( args . length != 1 ) {
System . err . println ( "Please give port as argument" );
System . exit ( 1 );
}
int port = Integer . parseInt (args[ 0 ]);
SelfSignedCertificate cert = new SelfSignedCertificate() ;
SslContext context = SslContext . newServerContext (
cert . certificate () , cert . privateKey ());
final SecureChatServer endpoint = new SecureChatServer(context) ;
ChannelFuture future = endpoint . start ( new InetSocketAddress(port) );
Runtime . getRuntime () . addShutdownHook ( new Thread() {
@ Override
public void run () {
endpoint . destroy ();
}
});
future . channel () . closeFuture () . syncUninterruptibly ();
}
}