UDP 브로드캐스팅
UDP 개념
UDP는 비연결 프로토콜으로 각 메시지가 독립적으로 전송된다.
모든 핸드쉐이크와 메시지 관리가 배제되어 TCP보다 훨씬 빠르다.
하지만 메시지 전달 순서 보장, 오류 수정 메커니즘 등이 없어 안정성을 보장하지 못한다.
UDP는 여러 수신자에게 전송하는 멀티캐스트 모드, 브로드캐스트 모드를 제공한다.
멀티캐스트 모드를 이용하면 정의된 호스트 그룹으로 메시지를 전송할 수 있다.
브로드캐스트 모드를 이용하면 동일 네트워크 상의 모든 호스트가 메시지를 수신할 수 있다.
아래와 같이 UDP 포트를 수신하는 모든 이벤트 모니터는 브로드캐스팅 메시지를 수신할 수 있다.
브로드캐스터
브로드캐스터는 로그 파일로부터 전달된 LogEvent 메시지를 DatagramPacket으로 변환하여 DatagramChannel에서 원격 피어와 통신할 수 있도록 한다.
이를 위해 내부적으로 LogEventEncoder를 사용해 LogEvent 메시지를 DatagramPacket 메시지로 변환한다.
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
private final InetSocketAddress remoteAddress;
public LogEventEncoder(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
LogEvent logEvent, List<Object> out) throws Exception {
byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc()
.buffer(file.length + msg.length + 1);
buf.writeBytes(file);
buf.writeByte(LogEvent.SEPARATOR);
buf.writeBytes(msg);
out.add(new DatagramPacket(buf, remoteAddress));
}
}
아래는 브로드캐스터 클래스로, run 메서드를 통해 부트스트랩으로 서버를 구동하고 로그 파일에 새로운 내용이 있으면 채널에 write하도록 한다.
-Dlogfile=/var/log/messages -DPort=9999
와 같이 모니터링할 로그 파일과 바인드할 포트를 환경변수로 설정한 후 실행시키면 된다.
public class LogEventBroadcaster {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private final File file;
public LogEventBroadcaster(InetSocketAddress address, File file) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new LogEventEncoder(address));
this.file = file;
}
public void run() throws Exception {
Channel ch = bootstrap.bind(0).sync().channel();
long pointer = 0;
for (;;) {
long len = file.length();
if (len < pointer) {
// file was reset
pointer = len;
} else if (len > pointer) {
// Content was added
RandomAccessFile raf = new RandomAccessFile(file, "r");
raf.seek(pointer);
String line;
while ((line = raf.readLine()) != null) {
ch.writeAndFlush(new LogEvent(null, -1,
file.getAbsolutePath(), line));
}
pointer = raf.getFilePointer();
raf.close();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.interrupted();
break;
}
}
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException();
}
LogEventBroadcaster broadcaster = new LogEventBroadcaster(
new InetSocketAddress("255.255.255.255",
Integer.parseInt(args[0])), new File(args[1]));
try {
broadcaster.run();
}
finally {
broadcaster.stop();
}
}
}
이벤트 모니터
LogEventBroadcaster가 브로드캐스팅하는 UDP DatagramPacket을 수신하고 LogEvent로 디코딩하여 출력하는 역할을 한다.
브로드캐스터와 마찬가지로 내부적으로 LogEventDecoder를 사용해 DatagramPacket 메시지를 LogEvent 메시지로 변환한다.
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx,
DatagramPacket datagramPacket, List<Object> out)
throws Exception {
ByteBuf data = datagramPacket.content();
int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
LogEvent event = new LogEvent(datagramPacket.sender(),
System.currentTimeMillis(), filename, logMsg);
out.add(event);
}
}
그리고 LogEventHandler를 등록해 간단하게 LogEvent 메시지를 출력한다.
public class LogEventHandler
extends SimpleChannelInboundHandler<LogEvent> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead0(ChannelHandlerContext ctx,
LogEvent event) throws Exception {
System.out.println(event.toString());
}
}
아래는 이벤트 모니터 클래스로, 부트스트랩에 파이프라인을 구성하여 서버와 연결하면 브로드캐스트된 데이터를 출력하게 된다.
public class LogEventMonitor {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
public LogEventMonitor(InetSocketAddress address) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler( new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel)
throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LogEventDecoder());
pipeline.addLast(new LogEventHandler());
}
} )
.localAddress(address);
}
public Channel bind() {
return bootstrap.bind().syncUninterruptibly().channel();
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
throw new IllegalArgumentException(
"Usage: LogEventMonitor <port>");
}
LogEventMonitor monitor = new LogEventMonitor(
new InetSocketAddress(Integer.parseInt(args[0])));
try {
Channel channel = monitor.bind();
System.out.println("LogEventMonitor running");
channel.closeFuture().sync();
} finally {
monitor.stop();
}
}
}
Last updated