您的当前位置:首页Android开发之使用Netty进行Socket编程(二)

Android开发之使用Netty进行Socket编程(二)

2024-12-15 来源:哗拓教育

1 Channel

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind.A channel provides a user:

  1. the current state of the channel (e.g. is it open? is it connected?),
  2. the of the channel (e.g. receive buffer size),
  3. the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
  4. the which handles all I/O events and requests associated with the channel.

Java NIO中,Channel的作用类似于Java IO的Stream(实际上有所不同,参考上一篇文章)。文档里也说的很清楚,在客户端与服务端建立连接后,网络IO操作是在Channel对象上进行的。

2 ChannelHandler

public interface ChannelHandler

当客户端与服务器建立起连接后,ChannelHandler的方法是被网络event(这里的event是广义的)触发的,由ChannelHandler直接处理输入输出数据,并传递到管道中的下一个ChannelHandler中。
通过Channel或者ChannelHandlerContext发生的请求/响应event 就是在管道中ChannelHandler传递。

ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。
一般就是继承ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter,因为Adapter把定制(custom) ChannelHandler的麻烦减小到了最低,Adapter本身已经实现了基础的数据处理逻辑(例如将event转发到下一个handler),你可以只重写那些你想要特别实现的方法。

示例:

/**
 * Handles a server-side channel.
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter{ // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);// (3)
        String body = new String(req, "UTF-8");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

具体使用:

  1. 一般做法是继承ChannelInboundHandlerAdapter,这个类实现了ChannelHandler接口,ChannelHandler提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承ChannelHandlerAdapter类而不是你自己去实现接口方法。
  2. 以上我们覆盖了channelRead()事件处理方法。每当从客户端收到新的数据时,这个方法会在收到消息时被调用,这个例子中,收到的消息的类型是。
  3. 和NIO一样,读取数据时,它是直接读到缓冲区中;在写入数据时,它也是写入到缓冲区中。在TCP/IP中,NETTY会把读到的数据放到ByteBuf的数据结构中。所以这里读取在ByteBuf的信息,得到服务器返回的内容。

3 ChannelPipeline

public interface ChannelPipeline extends Iterable < Map.Entry < String , ChannelHandler >>

ChannelPipeline作为放置ChannelHandler的容器,采用了J2EE的 拦截过滤模式,用户可以定义管道中的ChannelHandler以哪种规则去拦截并处理事件以及在管道中的ChannelHandler之间如何通信。每个Channel都有它自己的Pipeline,当一个新的Channel被创建时会自动被分配到一个Pipeline中。
ChannelHandler按如下步骤安装到ChannelPipeline中:

  1. 一个ChannelInitializer接口实现被注册到一个Bootstrap上:
bootstrap.handler(new ChannelInitializer<SocketChannel>() {//(1)
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
                            pipeline.addLast(nettyClientHandler);//自定义的ChannelInboundHandlerAdapter子类
                        }
                    });

注意:上面编码器(encoders),解码器(decoders),和ChannelInboundHandlerAdapter的子类都属于ChannelHandler

  1. ChannelInitializer.initChannel()被调用时,这个ChannelInitializer会往管道(pipeline)中安装定制的一组ChannelHandler
  2. 然后这个ChannelInitializer把自己从ChannelPipeline中移除

4 NioEventLoopGroup

5 Bootstrap

Bootstrap以及 ServerBootstrap类都继承自 AbstractBootstrap。官方API文档中的解释是:

Bootstrap中文翻译就是引导程序,就是作为管理Channel的一个辅助类。可以通过“方法链”的代码形式(类似Builder模式)去配置一个Bootstrap,创建Channel并发起请求。Bootstrap类为一个应用的网络层配置提供了容器,客户端通过它来jianjie。
示例:

EventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
              .group(group);
              .handler(new ChannelInitializer<SocketChannel>() {//(1)
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
                            pipeline.addLast(nettyClientHandler);//(2)
                        }
                    });
  1. 是一个特殊的ChannelHandler类,作用是帮助使用者配置一个新的。也许你想通过增加一些新的ChannelHandler子类来操作一个新的或者通过其对应的来实现你的网络程序。当你的程序变的复杂时,可能会增加更多的ChannelHandler子类到pipeline上。
  2. 每个Channel都有ChannelPipeline,在Channel的pipeline中加入handler,这里的ChannelHandler类经常会被用来处理一个最近的已经接收的Channel。所以这里的Channel已经不是NIO中的Channel了,她是netty的Channel。

6 建立连接并发起请求

public class NettyClient {
    private Channel channel ;

    public void connect(int port,String host){
        EventLoopGroup group = new NioEventLoopGroup();
        try {//配置Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
                            pipeline.addLast(new NettyClientHandler () );
                }
            });

            //发起异步连接操作
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            
             channel = channelFuture.channel();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            //关闭,释放线程资源
            group.shutdownGracefully();
        }
    }


  public void sendMessage(String msg){//连接成功后,通过Channel提供的接口进行IO操作
   try {
         if (channel != null && channel.isOpen()) {
            channel.writeAndFlush(sendMsg).sync();     //(1)
            Log.d(TAG, "send succeed " + sendMsg);
                        } else {
                            throw new Exception("channel is null | closed");
                        }
                    } catch (Exception e) {
                        sendReconnectMessage();
                        e.printStackTrace();
                    }  
  } 
    @Test
    public void nettyClient(){
        new NettyClient().connect(8989, "localhost");
    }
    
}
  1. Channel 及对象提供了许多操作,能够触发各种各样的I/O事件和操作。
    write(Object)方法不会使消息写入到Channel上,他被缓冲在了内部,你需要调用flush()方法来把缓冲区中数据强行输出。或者可以用更简洁的writeAndFlush(msg)以达到同样的目的。

7 总结

简而言之,在客户端上使用Netty,业务流程如下:

  1. 构建Bootstrap,其中包括设置好ChannelHandler来处理将来接收到的数据。
  2. 由Boostrap发起连接。
  3. 连接成功建立后,得到一个ChannelFuture对象,代表了一个还没有发生的I/O操作。这意味着任何一个请求操作都不会马上被执行,因为在Netty里所有的操作都是异步的。
  4. 通过Channel对象的writeAndFlush(Object msg)方法往服务端发送数据,接收到的数据 会在ChannelHandler的实现类中的channelRead(ChannelHandlerContext ctx, Object msg)中获取到被读到缓冲区的数据——(ByteBuf) msg

8 问题

  1. TCP连接中,NETTY会把读到的数据放到ByteBuf的数据结构中。基于流的传输并不是一个数据包队列,而是一个字节队列。即使服务器发送了2个独立的消息,客户端也不会作为2次消息处理而仅仅是作为一连串的字节进行读取。因此这是不能保证你远程写入的数据就会准确地读取。尤其是 服务器发回的消息长度 过长的时候,一次消息将有可能被拆分到不同的ByteBuf数据段中,很有可能需要多次读取ByteBuf,才能把一个消息完整拿到。
    解决方案:
    是的一个实现类,他可以在处理数据拆分的问题上变得很简单。

相关链接

显示全文