Netty的UDP確實顯得繁瑣,一些參考書中的例子也顯得更加複雜,因此本文給出幾段示例代碼。

1.最簡單的接收者和發送者

接收者只需要監聽一個接收UDP報文的埠,然後實現一個解碼器即可。這裡的解碼器是將UDP報文解碼為一個String對象,然後列印出來。為了簡單起見,連程序的結束代碼都沒有添加,需手動結束程序:

public class UdpReciever {
private static int R_PORT = 2222; //Reciever的埠
public static void main(String[] args) {
//1.NioEventLoopGroup是執行者
NioEventLoopGroup group = new NioEventLoopGroup();
System.out.println("NioEventLoopGroup in main :"+group);
//2.啟動器
Bootstrap bootstrap = new Bootstrap();
//3.配置啟動器
bootstrap.group(group)//3.1指定group
.channel(NioDatagramChannel.class)//3.2指定channel
.option(ChannelOption.SO_BROADCAST,true)//3.3指定為廣播模式
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
nioDatagramChannel.pipeline().addLast(new MyUdpDecoder());//3.4在pipeline中加入解碼器
}
});
try {
//4.bind到指定埠,並返回一個channel,該埠就是監聽UDP報文的埠
Channel channel = bootstrap.bind(R_PORT).sync().channel();
//5.等待channel的close
channel.closeFuture().sync();
//6.關閉group
group.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private static class MyUdpDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> list) throws Exception {
ByteBuf buf = datagramPacket.content();
String msg = buf.toString(CharsetUtil.UTF_8);
System.out.println("UdpReciever :"+msg);
}
}
}

發送者需要實現一個編碼器,將String對象編碼為一個UDP報文(DatagramPacket),然後朝著指定埠廣播出去即可,同上,也沒有負責結束的代碼:

public class UdpSender {
private static int S_PORT = 1111; //Sender的埠
private static int R_PORT = 2222; //Reciever的埠

public static void main(String[] args) {
//1.NioEventLoopGroup是執行者
NioEventLoopGroup group = new NioEventLoopGroup();
//2.啟動器
Bootstrap bootstrap = new Bootstrap();
//3.配置啟動器
bootstrap.group(group) //3.1指定group
.channel(NioDatagramChannel.class) //3.2指定channel
.option(ChannelOption.SO_BROADCAST,true) //3.3指定為廣播模式
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
nioDatagramChannel.pipeline().addLast(new MyUdpEncoder()); //3.4在pipeline中加入編碼器
}
});
try {
//4.bind並返回一個channel
Channel channel = bootstrap.bind(S_PORT).sync().channel();
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
//5.發送數據
channel.writeAndFlush("Send msg :" + i);
System.out.println("Send msg :" + i);
}

//6.等待channel的close
channel.closeFuture().sync();
//7.關閉group
group.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//編碼器,將要發送的消息(這裡是一個String)封裝到一個DatagramPacket中
private static class MyUdpEncoder extends MessageToMessageEncoder<String> {
//這裡是廣播的地址和埠
private InetSocketAddress remoteAddress = new InetSocketAddress("255.255.255.255", R_PORT);
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
byte[] bytes = s.getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc().buffer(bytes.length);
buf.writeBytes(bytes);
DatagramPacket packet = new DatagramPacket(buf, remoteAddress);
list.add(packet);
}
}
}

2.互相收發UDP報文

上個例子是UdpSender發送報文,UdpReciever接收報文並列印到控制檯。為了發送報文,UdpSender的pipeline中添加了一個編碼器,將String轉換為DatagramPacket,然後發送出去;為了接收報文,UdpReciever用bind綁定某埠,對其進行監聽,並實現了一個解碼器,用來解析收到的DatagramPacket對象,並將其還原為String。

本例中,UdpReciever會在接收到報文後,添加一個後綴,並將報文對著UdpSender監聽的埠發送回去。為此,UdpSender的pipeline中需增加一個解碼器,而UdpReciever的pipeline中則需增加一個編碼器。代碼如下:

public class UdpSender2 {
private static int S_PORT = 1111; //Sender的埠
private static int R_PORT = 2222; //Reciever的埠

public static void main(String[] args) {
//1.NioEventLoopGroup是執行者
NioEventLoopGroup group = new NioEventLoopGroup();
//2.啟動器
Bootstrap bootstrap = new Bootstrap();
//3.配置啟動器
bootstrap.group(group) //3.1指定group
.channel(NioDatagramChannel.class) //3.2指定channel
.option(ChannelOption.SO_BROADCAST,true) //3.3指定為廣播模式
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
//3.4在pipeline中加入編碼器,和解碼器(用來處理返回的消息)
nioDatagramChannel.pipeline().addLast(new MyUdpEncoder()).addLast(new DecoderInSender());
}
});
try {
//4.bind並返回一個channel
Channel channel = bootstrap.bind(S_PORT).sync().channel();
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
//5.發送數據
channel.writeAndFlush("Send msg-" + i);
System.out.println("Send msg-" + i);
}

//6.等待channel的close
channel.closeFuture().sync();
//7.關閉group
group.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//編碼器,將要發送的消息(這裡是一個String)封裝到一個DatagramPacket中
private static class MyUdpEncoder extends MessageToMessageEncoder<String> {
//這裡是廣播的地址和埠
private InetSocketAddress remoteAddress = new InetSocketAddress("255.255.255.255", R_PORT);
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
byte[] bytes = s.getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc().buffer(bytes.length);
buf.writeBytes(bytes);
DatagramPacket packet = new DatagramPacket(buf, remoteAddress);
list.add(packet);
}
}

//解碼器,用來處理返回的數據
private static class DecoderInSender extends MessageToMessageDecoder<DatagramPacket>{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> list) throws Exception {
ByteBuf buf = datagramPacket.content();
String msg = buf.toString(CharsetUtil.UTF_8);
System.out.println("Sender receive: "+ msg);
}
}
}

接收器代碼如下:

public class UdpReciever2 {
private static int S_PORT = 1111; //Sender的埠
private static int R_PORT = 2222; //Reciever的埠
public static void main(String[] args) {
//1.NioEventLoopGroup是執行者
NioEventLoopGroup group = new NioEventLoopGroup();
//2.啟動器
Bootstrap bootstrap = new Bootstrap();
//3.配置啟動器
bootstrap.group(group)//3.1指定group
.channel(NioDatagramChannel.class)//3.2指定channel
.option(ChannelOption.SO_BROADCAST,true)//3.3指定為廣播模式
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
//3.4在pipeline中加入解碼器,和編碼器(用來發送UDP)
nioDatagramChannel.pipeline().addLast(new MyUdpDecoder()).addLast(new EncoderInReciever());
}
});
try {
//4.bind到指定埠,並返回一個channel,該埠就是監聽UDP報文的埠
Channel channel = bootstrap.bind(R_PORT).sync().channel();
//5.等待channel的close
channel.closeFuture().sync();
//6.關閉group
group.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private static class MyUdpDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> list) throws Exception {
ByteBuf buf = datagramPacket.content();
String msg = buf.toString(CharsetUtil.UTF_8);
System.out.println("UdpReciever :"+msg);

//將接收到的消息改變一下再發出去
msg += " from UdpReciever";
channelHandlerContext.channel().writeAndFlush(msg);
}
}

private static class EncoderInReciever extends MessageToMessageEncoder<String> {
//這裡是廣播的地址和埠
private InetSocketAddress remoteAddress = new InetSocketAddress("255.255.255.255", S_PORT);
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
byte[] bytes = s.getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc().buffer(bytes.length);
buf.writeBytes(bytes);
DatagramPacket packet = new DatagramPacket(buf, remoteAddress);
list.add(packet);
}
}
}

如此就實現了UDP報文的互相收發。

3.在其他線程中收發報文

前面兩個例子都是在主線程中發送報文,實際上則很少這樣做。因為NioEventLoopGroup本身就是一個線程池,因此Netty應用程序一般在此線程池中取某個線程來進行各種操作。看下面的例子:

public class UdpSenderInOtherThread {
private static int S_PORT = 1111; //Sender的埠
private static int R_PORT = 2222; //Reciever的埠

public static void main(String[] args) {
//1.NioEventLoopGroup是執行者,也是線程池,線程池中初始化兩個線程
NioEventLoopGroup group = new NioEventLoopGroup(2);
//2.啟動器
Bootstrap bootstrap = new Bootstrap();
//3.配置啟動器
bootstrap.group(group) //3.1指定group
.channel(NioDatagramChannel.class) //3.2指定channel
.option(ChannelOption.SO_BROADCAST, true) //3.3指定為廣播模式
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
//3.4在pipeline中加入編碼器,和解碼器(用來處理返回的消息)
nioDatagramChannel.pipeline().addLast(new MyUdpEncoder()).addLast(new DecoderInSender());
}
});
try {
//4.bind並返回一個channel
Channel channel = bootstrap.bind(S_PORT).sync().channel();
System.out.println("main thread is "+Thread.currentThread());

//從線程池中取一個線程,每隔兩秒發送一個報文,group其實就是一個ServiceExecutor
int times = 0;
ScheduledFuture future = group.scheduleAtFixedRate(new Runnable() {
private int times = 0;
@Override
public void run() {
//5.發送數據
System.out.println(Thread.currentThread()+ " in scheduleAtFixedRate.");
channel.writeAndFlush("Send msg-" + times++);
System.out.println("Send msg-" + times);
}
}, 2, 2, TimeUnit.SECONDS);

//從線程池中取一個線程,10秒後運行代碼,結束定時發送報文,並關閉channel
group.schedule(new Runnable() {
@Override
public void run() {
System.out.println("cancel sending msg.");
future.cancel(true);
System.out.println("close channel.");
channel.close();
}
}, 10, TimeUnit.SECONDS);

//6.等待channel的close
channel.closeFuture().sync();
//7.關閉group
group.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//編碼器,將要發送的消息(這裡是一個String)封裝到一個DatagramPacket中
private static class MyUdpEncoder extends MessageToMessageEncoder<String> {
//這裡是廣播的地址和埠
private InetSocketAddress remoteAddress = new InetSocketAddress("255.255.255.255", R_PORT);

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
byte[] bytes = s.getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc().buffer(bytes.length);
buf.writeBytes(bytes);
DatagramPacket packet = new DatagramPacket(buf, remoteAddress);
list.add(packet);
//查看encode執行的線程,是group線程池中的某個線程
System.out.println(Thread.currentThread()+ " in encode.");
}
}

//解碼器,用來處理返回的數據
private static class DecoderInSender extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> list) throws Exception {
ByteBuf buf = datagramPacket.content();
String msg = buf.toString(CharsetUtil.UTF_8);
System.out.println("Sender receive: " + msg);
}
}
}

運行本例子的結果如下:

main thread is Thread[main,5,main]
Thread[nioEventLoopGroup-2-2,10,main] in scheduleAtFixedRate.
Send msg-1
Thread[nioEventLoopGroup-2-1,10,main] in encode.
Thread[nioEventLoopGroup-2-2,10,main] in scheduleAtFixedRate.
Send msg-2
Thread[nioEventLoopGroup-2-1,10,main] in encode.
Thread[nioEventLoopGroup-2-2,10,main] in scheduleAtFixedRate.
Send msg-3
Thread[nioEventLoopGroup-2-1,10,main] in encode.
Thread[nioEventLoopGroup-2-2,10,main] in scheduleAtFixedRate.
Send msg-4
Thread[nioEventLoopGroup-2-1,10,main] in encode.
Thread[nioEventLoopGroup-2-2,10,main] in scheduleAtFixedRate.
Send msg-5
Thread[nioEventLoopGroup-2-1,10,main] in encode.
cancel sending msg.
close channel.

這說明,由於限定了線程池的大小為2,因此程序一共三個工作線程。主線程中運行了netty配置相關代碼。運行定期發送報文scheduleAtFixedRate方法的是子線程2,而編碼器encode方法則在子線程1中運行。schedule方法也在子線程1中運行(這一點並不一定)。通過擴大線程池的大小,可以觀察到方法也會在其他子線程(例如子線程3)中執行。這說明,netty會盡量對各個線程代碼進行負載平衡。

程序員也應善用EventLoopGroup來進行多線程代碼的實現和管理。同時,本例子中,使用了ScheduledFuture 來優雅的結束了Channel。

4.UDP單播

上面的例子都是廣播,下面給出一個UDP單播的代碼實例。

發送方:

public class UpdUnicastSender {
private static int S_PORT = 1111; //Sender的埠
private static int R_PORT = 2222; //Reciever的埠
//目的地址
private static InetSocketAddress remoteAddress = new InetSocketAddress("192.168.31.196", R_PORT);

public static void main(String[] args) {
//1.NioEventLoopGroup是執行者,也是線程池,線程池中初始化兩個線程
NioEventLoopGroup group = new NioEventLoopGroup(10);
//2.啟動器
Bootstrap bootstrap = new Bootstrap();
//3.配置啟動器
bootstrap.group(group) //3.1指定group
.channel(NioDatagramChannel.class) //3.2指定channel
.remoteAddress(remoteAddress) //3.3指定目的地址
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
//3.4在pipeline中加入編碼器
nioDatagramChannel.pipeline().addLast(new MyUdpEncoder());
}
});
try {
//4.bind並返回一個channel
Channel channel = bootstrap.bind(S_PORT).sync().channel();
//從線程池中取一個線程,每隔兩秒發送一個報文
int times = 0;
ScheduledFuture future = group.scheduleAtFixedRate(new Runnable() {
private int times = 0;
@Override
public void run() {
//5.發送數據
String msg = "Send msg-" + times++;
channel.writeAndFlush(msg);
System.out.println(msg);
}
}, 2, 2, TimeUnit.SECONDS);

//從線程池中取一個線程,10秒後運行代碼,結束定時發送報文,並關閉channel
group.schedule(new Runnable() {
@Override
public void run() {
System.out.println("cancel sending msg.");
future.cancel(true);
System.out.println("close channel.");
//發送一個"close",提醒接收方關閉channel
channel.writeAndFlush("close");
channel.close();
}
}, 10, TimeUnit.SECONDS);

//6.等待channel的close
channel.closeFuture().sync();
//7.關閉group
group.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

//編碼器,將要發送的消息封裝到一個DatagramPacket中
private static class MyUdpEncoder extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
byte[] bytes = s.getBytes(CharsetUtil.UTF_8 );
ByteBuf buf = channelHandlerContext.alloc().buffer(bytes.length);
buf.writeBytes(bytes);
DatagramPacket packet = new DatagramPacket(buf, remoteAddress);
list.add(packet);
}
}
}

接收方:

public class UdpUnicastReciever {
private static int R_PORT = 2222; //Reciever的埠
//本地ip和埠
private static InetSocketAddress localAddress = new InetSocketAddress("192.168.31.196", R_PORT);
public static void main(String[] args) {
//1.NioEventLoopGroup是執行者
NioEventLoopGroup group = new NioEventLoopGroup();
//2.啟動器
Bootstrap bootstrap = new Bootstrap();
//3.配置啟動器
bootstrap.group(group)//3.1指定group
.channel(NioDatagramChannel.class)//3.2指定channel
.localAddress(localAddress) //3.3指定本地地址
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
//3.4在pipeline中加入解碼器,和編碼器(用來發送UDP)
nioDatagramChannel.pipeline().addLast(new MyUdpDecoder());
}
});
try {
//4.bind到指定埠,並返回一個channel,該埠就是監聽UDP報文的埠
Channel channel = bootstrap.bind(R_PORT).sync().channel();
//5.等待channel的close
channel.closeFuture().sync();
//6.關閉group
group.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private static class MyUdpDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> list) throws Exception {
ByteBuf buf = datagramPacket.content();
InetSocketAddress sender = datagramPacket.sender();
InetSocketAddress recipient = datagramPacket.recipient();
String msg = buf.toString(CharsetUtil.UTF_8);
String msgInfo = msg +",sender = "+sender+", recipient = "+recipient;
System.out.println("UdpReciever :"+msgInfo);
//若接收到"close",則關閉channel
if (msg.equals("close")) {
System.out.println("close channel.");
channelHandlerContext.channel().close();
}
}
}
}

其中還加入了關閉channel的代碼。

推薦閱讀:

相關文章