博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty服务端与客户端(源码一)
阅读量:5260 次
发布时间:2019-06-14

本文共 7126 字,大约阅读时间需要 23 分钟。

首先,整理NIO进行服务端开发的步骤:

  (1)创建ServerSocketChannel,配置它为非阻塞模式

  (2)绑定监听配置TCP参数backlog的大小

  (3)创建一个独立的I/O线程,用于轮询多路复用器Selector

  (4)创建Selector,将之前创建的ServerSocketChannel注册到Selector上监听SelectionKeyACCEPT

  (5)启动I/O线程,在循环体中执行Selector.select()方法,轮训就绪的Channel。

  (6)当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端。

  (7)设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数。

  (8)将SocketChannel注册到Selector,监听OP_READ操作位

  (9)如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包。

  (10)如果轮询的Channel为OP_WRITE,则说明还有数据没有发送完成,需要继续发送。  

 Netty时间服务器服务端 TimeServer:

1 package netty; 2  3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel;10 import io.netty.channel.socket.nio.NioServerSocketChannel;11 12 13 public class TimeServer {14     15     public void bind(int port) throws Exception{16         //配置服务端的NIO线程组 一个用于服务端接收客户端的连接,另一个用于进行SocketChannel的网络读写17         EventLoopGroup bossGroup = new NioEventLoopGroup();18         EventLoopGroup workerGroup = new NioEventLoopGroup();19         try{20             ServerBootstrap b = new ServerBootstrap();21             b.group(bossGroup,workerGroup)22             .channel(NioServerSocketChannel.class)23             .option(ChannelOption.SO_BACKLOG, 1024)24             .childHandler(new ChildChannelHandler());25             //绑定端口,同步等待成功26             ChannelFuture f = b.bind(port).sync();27             //等待服务器监听端口关闭28             f.channel().closeFuture().sync();29         }finally{30             //优雅退出,释放线程池资源31             bossGroup.shutdownGracefully();32             workerGroup.shutdownGracefully();33         }34     }35     36     private class ChildChannelHandler extends ChannelInitializer
{37 protected void initChannel(SocketChannel arg0) throws Exception{38 arg0.pipeline().addLast(new TimeServerHandler());39 }40 }41 }

ServerBootstrap是Netty用于启动NIO服务端的辅助类,目的是降低服务端的开发难度。

绑定childChannelHandler,其作用类似于Reactor模式中的handler类,主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。

使用bind绑定监听端口,随后,调用它的同步阻塞方法sync等待绑定操作完成,完成后Netty会返回一个ChannelFuture,主要用于异步操作的通知回调

 

Netty时间服务器服务端 TimeServerHandler:

1 package netty; 2  3 import java.io.IOException; 4 import io.netty.buffer.ByteBuf; 5 import io.netty.buffer.Unpooled; 6 import io.netty.channel.ChannelHandlerAdapter; 7 import io.netty.channel.ChannelHandlerContext; 8  9 public class TimeServerHandler extends ChannelHandlerAdapter{10     11     public void channelRead(ChannelHandlerContext ctx,Object msg) throws IOException{12         //将msg转换成Netty的ByteBuf对象13         ByteBuf buf = (ByteBuf)msg;14         //将缓冲区中的字节数组复制到新建的byte数组中,15         byte[] req = new byte[buf.readableBytes()];16         buf.readBytes(req);17         //获取请求消息18         String body = new String(req,"UTF-8");19         System.out.println("The time server receive order:" + body);20         //如果是"QUERY TIME ORDER"则创建应答消息21         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(22                 System.currentTimeMillis()).toString() : "BAD ORDER";23                 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());24         //异步发送应答消息给客户端25         ctx.write(resp);26     }27     28     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{29         ctx.flush();30     }31     32     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){33         ctx.close();34     }35 }

相比昨天原生的NIO服务端,代码量大大减少。

 

 Netty时间服务器客户端 TimeClient:

1 package netty; 2  3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel;10 import io.netty.channel.socket.nio.NioSocketChannel;11 12 public class TimeClient {13     14     public void connect(int port,String host) throws Exception{15         //创建客户端处理I/O读写的NioEventLoopGroup Group线程组16         EventLoopGroup group = new NioEventLoopGroup();17         try{18             //创建客户端辅助启动类Bootstrap19             Bootstrap b = new Bootstrap();20             b.group(group).channel(NioSocketChannel.class)21             .option(ChannelOption.TCP_NODELAY, true)22             .handler(new ChannelInitializer
(){23 //将ChannelHandler设置到ChannelPipleline中,用于处理网络I/O事件24 @Override25 protected void initChannel(SocketChannel ch) throws Exception {26 ch.pipeline().addLast(new TimeClientHandler());27 }28 });29 //发起异步连接操作,然后调用同步方法等待连接成功。30 ChannelFuture f = b.connect(host,port).sync();31 32 //等待客户端链路关闭33 f.channel().closeFuture().sync();34 }finally{35 //优雅退出,释放NIO线程组36 group.shutdownGracefully();37 }38 }39 40 public static void main(String[] args) throws Exception{41 int port = 8080;42 if(args != null && args.length > 0){43 try{44 port = Integer.valueOf(args[0]);45 }catch(NumberFormatException e){46 //采用默认值47 }48 }49 new TimeClient().connect(port, "127.0.0.1");50 }51 52 }

 Netty时间服务器客户端 TimeClientHandler:

1 package netty; 2  3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7  8 import java.util.logging.Logger; 9 10 public class TimeClientHandler extends ChannelHandlerAdapter{11     12     private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());13     14     private final ByteBuf firstMessage;15     16     public TimeClientHandler(){17         byte[] req = "QUERY TIME ORDER".getBytes();18         firstMessage = Unpooled.buffer(req.length);19         firstMessage.writeBytes(req);20     }21     22     //当客户端与服务端TCP链路简历成功后,Netty的NIO线程会调用该方法,发送查询时间的指令给服务器23     public void channelActive(ChannelHandlerContext ctx){24         //将请求消息发送给服务端25         ctx.writeAndFlush(firstMessage);26     }27     28     //当服务器返回应答消息时,该方法被调用29     public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{30         ByteBuf buf = (ByteBuf) msg;31         byte[] req = new byte[buf.readableBytes()];32         buf.readBytes(req);33         String body = new String(req,"UTF-8");34         System.out.println("Now is :" + body);35     }36     37     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){38         39         //释放资源40         logger.warning("Unexpected exception from downstream :" + cause.getMessage());41         ctx.close();42     }43 }

 运行结果:

Server:

Client:

 

转载于:https://www.cnblogs.com/yangsy0915/p/6139648.html

你可能感兴趣的文章
多线程以及线程池1
查看>>
15软工课后作业01 P18 第四题
查看>>
DataGrid 显示选中的item
查看>>
loadrunner常用函数整理
查看>>
第85节:Java中的JavaScript
查看>>
c#系统中类的方法 Console、Object,ToolStripDropDownItem,string
查看>>
【windows核心编程】DLL相关(1)
查看>>
IOS NSThread 线程间通信
查看>>
11.14
查看>>
CAS单点登录配置[4]:客户端配置
查看>>
Flex beta2+XFire开发实例 (三)
查看>>
jira汉化
查看>>
opencv 训练自己的分类器汇总
查看>>
codeforces
查看>>
tn文本分析语言(四) 实现自然语言计算器
查看>>
Algorithmic Graph Drawing in TikZ
查看>>
『设计模式』学习笔记目录索引
查看>>
linux网络管理/
查看>>
给自己一条退路,再次比较Erlang和Golang
查看>>
LeetCode#48 Rotate Image
查看>>