06月28, 2018

使用Netty进行Socket编程

使用Netty进行Socket编程

Netty是业界最流行的Java-nio框架之一,它的健壮性,功能,性能,扩展性等在同类框架中都是首屈一指的存在。Java的nio和aio对于开发者来说显得并不友好。如果没有任何java-nio基础的读者,建议先看看我的上一篇介绍Java-I/O模型的文章。建议初学者跟着代码写一遍,写一遍加深印象能更好的理解java-nio的实现步骤和原理。

HelloWorld 入门示例

首先创建两个线程组yigeNioEventLoopGroup分别用于网络连接和网络读写,然后创建一个服务启动辅助类ServerBootstrap,绑定两个线程组,设置通道类型为NioServerSocketChannel客服端就为NioSocketChannel,然后设置一个ChannelOption.SO_BACKLOG最大socket连接数量,最后设置socket从处理操作。最后绑定端口启动,然后阻塞。

public class NettyService {
    public static void main(String[] args) throws InterruptedException {
        // 两个工作线程组,一个用于网络连接,一个网络读写
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 辅助类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    // 类型为NioServerSocketChannel,ServerSocketChannel
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            //绑定端口
            ChannelFuture future = serverBootstrap.bind(8080).sync();
            //阻塞 到这里
            future.channel().closeFuture().sync();

        }finally {
            //异常退出
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

ChildChannelHandler继承自ChannelInitializer需要实现一个抽象方法initChannel这方法定义了在通道初始化完毕之后,需要对通道中数据进行的处理操作。这里在这个方法中给socketChannel添加一个ChannelHandlerAdapter管道处理适配器,并重写了channelRead方法,这个方法的处理对应着NIO操作中的key.isReadable()中的处理操作,AIO中的socketChannel.read设置的回调处理函数。不过是NIO中的ByteBuffer换成了netty封装好了的ByteBuf。接下来就是写我们需要进行的业务处理。


public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel){
        socketChannel.pipeline().addLast(new ChannelHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf byteBuf = (ByteBuf) msg;
                byte[] req = new byte[byteBuf.readableBytes()];
                byteBuf.readBytes(req);
                //读取请求内容
                String body = new String(req, "UTF-8");
                System.out.println("服务器收到客服端数据:" + body);

                //响应数据
                String response = "客服端你好!收到的数据为{" + body + "}";
                ByteBuf resp = Unpooled.copiedBuffer(response.getBytes());

                ctx.write(resp);
            }

            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) {
                ctx.flush();
            }
        });
    }
}

TCP粘包/拆包

如果你运行了代码,会发现客户端打印出来的服务器返回数据并不完整(客服端在上一篇文章中有),接受服务器数据 : 客服端你好!收到的数据为{你好服务器,没有},而在我们的服务端代码中是有}的。这就是因为在这个过程中发生了TCP的拆包,}被拆到了另外一个数据包中,而我们只接受了第一个数据包,所以打印出来的结果中没有}

socket的数据传输是基于TCP/IP协议进行传输的,而TCP/IP协议传输数据时是以流的形式传输数据,通过java的I/O流我们知道,为了提升性能,流是有一定的缓冲区的,当数据写满这个缓冲时,数据才会真正的被发送。写满一个缓冲区后,这个缓冲区所有的数据会被打成一个包,具体每个包大小为多少跟操作系统或者具体的协议实现有关,而这些都是底层程序自己控制的,我们并不能决定让它们如何拆分数据包。

此时如果是原本应该是一次发送的数据,分成了多个数据包分多次发送的情况较拆包;本应该多次发送的数据被打成一个数据发送了,这种情况叫粘包

那么如何处理TCP/IP的粘包、拆包呢?这里提供三种常见比较简单的处理方式:

  1. 消息定长,例如每个报文的大小固定长度为200,如果不够,空位补空格;
  2. 在包尾增加回车符(或者其他特殊字符)进行分割;例如FTP,HTTP协议。
  3. 将消息分为消息头和消息体,消息头中包含了表示消息总长度的字段。例如HTTP协议中的content-length。

以换行符进行分割 LineBasedFrameDecoder

改造很简单,在ChildChannelHandler.initChannel方法中给socketChannel增加一个管道LineBasedFrameDecoder

channel.pipeline()
      // 按'\n' 结尾为一个数据包的编码器
      // 如果接收了1024个字节的数据后还没有出现换行符,则抛出异常
     .addLast(new LineBasedFrameDecoder(1024))
     .addLast(new ChannelHandlerAdapter() { ... })

但是socket两端处理信息是对等的,也就是说我们还需要对客户端进行修改,这里也使用netty写一个socket客户端。

客户端的创建方式和服务端类似,客户端只需要创建一个线程组,因为客户端不需要像服务器那样处理多个连接的问题。然后是使用辅助类进行绑定,配置等。客户端需要在通道建立之后向服务器发送数据,然后在服务器返回数据之后打印数据,所以这重写了channelActivechannelRead方法。

/**
 * LineBasedFrameDecoder 处理拆包,粘包 客服端
 */
public class NettyFrameClient {

    public static void main(String[] args) {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();

        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                .addLast(new LineBasedFrameDecoder(1024))
                                .addLast(new ChannelHandlerAdapter() {
                                    int count = 0;

                                    /**
                                     * 通道初始化成之后向服务器发送数据
                                     * @param ctx
                                     * @throws Exception
                                     */
                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                        StringBuilder sb = new StringBuilder();
                                        // 这里组装了一个有101个换行符的字符串,在服务端LineBasedFrameDecoder时
                                        // 会被当做101个包处理,也就是会触发 101次 channelRead 方法
                                        sb.append("客服端将向服务器发送100条数据\n");
                                        for (int i = 0; i < 100; i++) {
                                            sb.append("客户端想服务发送的数据,第").append(count++).append("条\n");
                                        }
                                        ByteBuf byteBuf = Unpooled.copiedBuffer(sb.toString().getBytes());
                                        ctx.writeAndFlush(byteBuf);
                                    }

                                    /**
                                     * 接受服务器返回来的数据
                                     * @param ctx
                                     * @param msg
                                     * @throws Exception
                                     */
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        ByteBuf byteBuf = (ByteBuf) msg;
                                        byte[] req = new byte[byteBuf.readableBytes()];
                                        byteBuf.readBytes(req);
                                        System.out.println("已经收到服务器返回消息");
                                        System.out.println(new String(req, "UTF-8"));

                                    }

                                    /**
                                     * 异常处理
                                     * @param ctx
                                     * @param cause
                                     */
                                    @Override
                                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                        // 释放资源
                                        cause.printStackTrace();
                                        ctx.close();
                                    }
                                });
                    }
                });

        ChannelFuture channelFuture = null;
        try {
            channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            eventLoopGroup.shutdownGracefully();
        }
    }
}

运行程序后会发现服务器收到了所有客服端发送的数据,并且触发了101次channelRead方法,但是其实客服端只发送一次数据而已,因为咱们使用的是以换行符进行分割,所以每一个换行符就是一个完整的数据包,101个换行符就是101个完整的数据包。然而还有一个问题就是,服务器返回的数据,客户似乎并没有收到,其实这是因为我们服务返回的数据中没有带上换行符,客户端认为服务器数据还没有发送完,还处于等待状态。我们只要在}的后边加上\n,重启服务器,重新运行客户端就行了。

除了LineBasedFrameDecoder之外,netty还提供了自定义分隔符的解码器DelimiterBasedFrameDecoder和固定长度解码器FixedLengthFrameDecoder用法几乎一样,这里也不再赘述。

编码问题

在前面的服务器和客服端开发过程中,我们发现每次客户服务端接收数据的时候需要从ByteBuf提取出数据,然后发送数据时还要把数据转换成ByteBuf,这都是一些重复的操作,有没有办法让它自动处理呢?答案当然是有的!如果只是简单的字符串处理,Netty都已经提供了现成的解决方案。

服务端和客服端都只需要添加一个管道处理器StringDecoder,但是要注意顺序,先是LineBasedFrameDecoder让程序保证能读取到完整的数据包,再对包里的字节数据进行解码操作。

socketChannel.pipeline()
    .addLast(new LineBasedFrameDecoder(1024))
    .addLast(new StringDecoder())

然后在服务器和客服端的自定义管道处理器中的channelRead方法中,就可以直接把Object参数直接转型成字符串即可。

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String str = (String) msg;
    ...
}                

当然,在实际的项目中我们可以需要更多的功能,比如SpringMVC提供的自定义参数绑定功能,让我们可以不用自己一个个手动把请求中的参数放到bean中,同样,在使用netty开发的过程中,我们也希望在消息接收的时候直接给我们的数据就是Java对象了。到了这里相信你想要实现起来也并不难,所有的处理都是对channel添加一个管道处理器而已,无非就是在添加一个字符串转对象的处理操作罢了。

MessagePack 编码解码

MessagePack是一个高校的二进制序列化框架,它和Json一样支持多种语言的数据交换,但是效率更高,序列化后字节流更小传输更快。首先添加maven依赖

<dependency>
    <groupId>org.msgpack</groupId>
    <artifactId>msgpack</artifactId>
    <version>0.6.12</version>
</dependency>

然后编写一个小例子来简单介绍如何使用。首先创建一个vo,如下,@Message是一个序列化标记,类似于Java序列化中使用的Serializable接口作为一个标识。如果不加上这个注解在反序列化的时候程序会抛出异常。

@Message
public class Person {
    private String name;
    private Integer age;
    private String mobile;
    private String email;
    ... 省略getter,setter和构造器
}


 public static void main(String[] args) throws IOException {
        MessagePack msgpack = new MessagePack();
        Person person = new Person("张三",18,"15912345678","213012387@qq.com");
        byte[] personByte = msgpack.write(person);

        System.out.println(personByte);

        Person value = msgpack.read(personByte,new Person());
        System.out.println(value);
}

在了解到MessagePack的序列化操作之后我们来写一个简单的编码器和解码器

/**
 * 自定义解码器
 */
public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        MessagePack messagePack = new MessagePack();

        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.getBytes(byteBuf.readerIndex(),bytes,0,bytes.length);

        Person person = messagePack.read(bytes, Person.class);
        list.add(person);
    }
}


/**
 * 自定义编码器
 */
public class MessagePackEncoder extends MessageToByteEncoder<Person> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {
        MessagePack messagePack = new MessagePack();

        byte[] write = messagePack.write(msg);
        out.writeBytes(write);
    }
}

现在假设有这样一个场景,客户端给服务器发送一个MessagePack编码的对象,然后服务器给客户端回复一个字符串。即,客户端需要一个MessagePack编码器把对象编码发送给服务器,一个字符串解码器把服务器返回的字符串解码;相反服务端需要一个MessagePack解码器和一个字符串编码器。

//服务端
socketChannel.pipeline()
        .addLast(new StringEncoder())
        .addLast(new MessagePackDecoder())

//客户端
socketChannel.pipeline()
        .addLast(new StringDecoder())
        .addLast(new MessagePackEncoder())

然后把上边客服端发送的字符串直接改成一个Person对象,服务端接收时直接把Object转换成响应的对象就可以了。

源码地址

源码地址:https://gitee.com/qsdy/netty-nio

本文链接:https://www.qiangshuidiyu.xin/post/netty-socket.html

-- EOF --

Comments