1 Channel
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.A channel provides a user:
- the current state of the channel (e.g. is it open? is it connected?),
- the of the channel (e.g. receive buffer size),
- the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
- the which handles all I/O events and requests associated with the channel.
Java NIO中,Channel
的作用类似于Java IO的Stream
(实际上有所不同,参考上一篇文章)。文档里也说的很清楚,在客户端与服务端建立连接后,网络IO操作是在Channel
对象上进行的。
2 ChannelHandler
public interface ChannelHandler
当客户端与服务器建立起连接后,ChannelHandler
的方法是被网络event(这里的event是广义的)触发的,由ChannelHandler
直接处理输入输出数据,并传递到管道中的下一个ChannelHandler
中。
通过Channel
或者ChannelHandlerContext
发生的请求/响应event 就是在管道中ChannelHandler
传递。
ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。
一般就是继承ChannelInboundHandlerAdapter
和ChannelOutboundHandlerAdapter
,因为Adapter把定制(custom) ChannelHandler的麻烦减小到了最低,Adapter本身已经实现了基础的数据处理逻辑(例如将event转发到下一个handler),你可以只重写那些你想要特别实现的方法。
示例:
/**
* Handles a server-side channel.
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter{ // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);// (3)
String body = new String(req, "UTF-8");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
具体使用:
- 一般做法是继承
ChannelInboundHandlerAdapter
,这个类实现了ChannelHandler
接口,ChannelHandler
提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承ChannelHandlerAdapter类而不是你自己去实现接口方法。 - 以上我们覆盖了channelRead()事件处理方法。每当从客户端收到新的数据时,这个方法会在收到消息时被调用,这个例子中,收到的消息的类型是。
- 和NIO一样,读取数据时,它是直接读到缓冲区中;在写入数据时,它也是写入到缓冲区中。在TCP/IP中,NETTY会把读到的数据放到ByteBuf的数据结构中。所以这里读取在ByteBuf的信息,得到服务器返回的内容。
3 ChannelPipeline
public interface ChannelPipeline extends Iterable < Map.Entry < String , ChannelHandler >>
ChannelPipeline
作为放置ChannelHandler
的容器,采用了J2EE的 拦截过滤模式,用户可以定义管道中的ChannelHandler
以哪种规则去拦截并处理事件以及在管道中的ChannelHandler
之间如何通信。每个Channel都有它自己的Pipeline,当一个新的Channel被创建时会自动被分配到一个Pipeline中。
ChannelHandler
按如下步骤安装到ChannelPipeline
中:
- 一个ChannelInitializer接口实现被注册到一个Bootstrap上:
bootstrap.handler(new ChannelInitializer<SocketChannel>() {//(1)
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
pipeline.addLast(nettyClientHandler);//自定义的ChannelInboundHandlerAdapter子类
}
});
注意:上面编码器(encoders),解码器(decoders),和ChannelInboundHandlerAdapter的子类都属于ChannelHandler。
- 当
ChannelInitializer.initChannel()
被调用时,这个ChannelInitializer会往管道(pipeline)中安装定制的一组ChannelHandler - 然后这个ChannelInitializer把自己从ChannelPipeline中移除
4 NioEventLoopGroup
5 Bootstrap
Bootstrap
以及 ServerBootstrap
类都继承自 AbstractBootstrap
。官方API文档中的解释是:
Bootstrap中文翻译就是引导程序,就是作为管理Channel的一个辅助类。可以通过“方法链”的代码形式(类似Builder模式)去配置一个Bootstrap,创建Channel并发起请求。Bootstrap类为一个应用的网络层配置提供了容器,客户端通过它来jianjie。
示例:
EventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(group);
.handler(new ChannelInitializer<SocketChannel>() {//(1)
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
pipeline.addLast(nettyClientHandler);//(2)
}
});
-
是一个特殊的
ChannelHandler
类,作用是帮助使用者配置一个新的。也许你想通过增加一些新的ChannelHandler子类来操作一个新的或者通过其对应的来实现你的网络程序。当你的程序变的复杂时,可能会增加更多的ChannelHandler子类到pipeline上。 - 每个Channel都有ChannelPipeline,在Channel的pipeline中加入handler,这里的ChannelHandler类经常会被用来处理一个最近的已经接收的Channel。所以这里的Channel已经不是NIO中的Channel了,她是netty的Channel。
6 建立连接并发起请求
public class NettyClient {
private Channel channel ;
public void connect(int port,String host){
EventLoopGroup group = new NioEventLoopGroup();
try {//配置Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
pipeline.addLast(new NettyClientHandler () );
}
});
//发起异步连接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channel = channelFuture.channel();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
//关闭,释放线程资源
group.shutdownGracefully();
}
}
public void sendMessage(String msg){//连接成功后,通过Channel提供的接口进行IO操作
try {
if (channel != null && channel.isOpen()) {
channel.writeAndFlush(sendMsg).sync(); //(1)
Log.d(TAG, "send succeed " + sendMsg);
} else {
throw new Exception("channel is null | closed");
}
} catch (Exception e) {
sendReconnectMessage();
e.printStackTrace();
}
}
@Test
public void nettyClient(){
new NettyClient().connect(8989, "localhost");
}
}
- Channel 及对象提供了许多操作,能够触发各种各样的I/O事件和操作。
write(Object)方法不会使消息写入到Channel上,他被缓冲在了内部,你需要调用flush()方法来把缓冲区中数据强行输出。或者可以用更简洁的writeAndFlush(msg)以达到同样的目的。
7 总结
简而言之,在客户端上使用Netty,业务流程如下:
- 构建Bootstrap,其中包括设置好ChannelHandler来处理将来接收到的数据。
- 由Boostrap发起连接。
- 连接成功建立后,得到一个ChannelFuture对象,代表了一个还没有发生的I/O操作。这意味着任何一个请求操作都不会马上被执行,因为在Netty里所有的操作都是异步的。
- 通过Channel对象的
writeAndFlush(Object msg)
方法往服务端发送数据,接收到的数据 会在ChannelHandler的实现类中的channelRead(ChannelHandlerContext ctx, Object msg)
中获取到被读到缓冲区的数据——(ByteBuf) msg
。
8 问题
- TCP连接中,NETTY会把读到的数据放到ByteBuf的数据结构中。基于流的传输并不是一个数据包队列,而是一个字节队列。即使服务器发送了2个独立的消息,客户端也不会作为2次消息处理而仅仅是作为一连串的字节进行读取。因此这是不能保证你远程写入的数据就会准确地读取。尤其是 服务器发回的消息长度 过长的时候,一次消息将有可能被拆分到不同的ByteBuf数据段中,很有可能需要多次读取ByteBuf,才能把一个消息完整拿到。
解决方案:
是的一个实现类,他可以在处理数据拆分的问题上变得很简单。