您的位置:首页 > 健康 >

Springboot集成Netty实现TCP通讯|当前头条


【资料图】

Netty测试客户端

package com.coremain;import com.coremain.handler.ServerListenerHandler;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.util.CharsetUtil;import java.util.Scanner;/** * @description: 测试 * @author: GuoTong * @createTime: 2023-05-14 16:46 * @since JDK 1.8 OR 11 **/public class NettyClientTest {    public static void main(String[] args) throws InterruptedException {        // 客户端的线程池        EventLoopGroup workerGroup = new NioEventLoopGroup(8);        try {            // 创建Netty客户端端的启动对象            Bootstrap bootstrap = new Bootstrap();            // 使用链式编程来配置参数            bootstrap.group(workerGroup)                    .channel(NioSocketChannel.class).handler(new ChannelInitializer() {                @Override                protected void initChannel(SocketChannel ch) throws Exception {                    //对workerGroup的SocketChannel设置处理器                    ChannelPipeline pipeline = ch.pipeline();                    // 对于通道加入解码器                    pipeline.addLast("decoder", new StringDecoder());                    // 对于通道加入加码器                    pipeline.addLast("encoder", new StringDecoder());                    // 加入事件回调处理器                    pipeline.addLast(new ServerListenerHandler());                }            });            System.out.println("基于Netty的客户端接入启动完成....");            ChannelFuture cf = bootstrap.connect("127.0.0.1", 18023).sync();            // 获取连接通道            Channel channel = cf.channel();            System.out.println("+++++++" + channel.localAddress() + "=======");            // 客户端输入扫描器            Scanner scanner = new Scanner(System.in);            while (scanner.hasNext()) {                String next = scanner.next();                // 发送到服务端                channel.writeAndFlush(Unpooled.buffer().writeBytes(next.getBytes(CharsetUtil.UTF_8)));            }            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法            cf.channel().closeFuture().sync();        } finally {            workerGroup.shutdownGracefully();        }    }}

Netty的Server启动类

package com.coremain.netty;import com.coremain.handler.NettyServerHTTPHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;/** * @description: Netty服务启动器 * netty服务端会跟随一起启动。 同时,在springboot关闭前,会先销毁netty服务。 * @author: GuoTong * @createTime: 2023-05-14 15:13 * @since JDK 1.8 OR 11 **/@Componentpublic class NettyServerHTTPRunning {    // log4j2的AsyncLogger本身的逻辑采用了缓冲区思想,使用的是disruptor框架来实现一个环形无锁队列。    private static final Logger log = LoggerFactory.getLogger(NettyServerHTTPRunning.class);    /**     * 主线程组     */    private  NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);    /**     * 工作线程组     */    private  NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);    /**     * (http)主要服务端口     */    @Value("${iot.port1:18024}")    private int iot1;    /**     * (http)备用服务端口     */    @Value("${iot.port2:18025}")    private int iot2;    /**     * 启动 netty 服务     */    @PostConstruct    public void startServer() throws InterruptedException {        ServerBootstrap serverBootstrap = new ServerBootstrap();        serverBootstrap.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                // 三次握手中的A、B队列总和最大值(第二次握手加入A, 第三次握手从A移动到B, accept 后从B取出)                .option(ChannelOption.SO_BACKLOG, 1024)                // 解决端口占用问题, 可以共用服务器端口(即使该端口已被其他端口占用)                .option(ChannelOption.SO_REUSEADDR, true)                // 接收消息缓冲区大小                .option(ChannelOption.SO_RCVBUF, 2048)                // 发送消息缓冲区大小                .option(ChannelOption.SO_SNDBUF, 2048)                // 用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;                // 如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送                .option(ChannelOption.TCP_NODELAY, true)                // 用于检测长时间没有数据传输的连接状态,当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文                .option(ChannelOption.SO_KEEPALIVE, true)                // 当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证全部发送成功                // 使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送                .option(ChannelOption.SO_LINGER, 2000)                .childHandler(new NettyServerHTTPHandler());        ChannelFuture service1 = serverBootstrap.bind(iot1).sync();        ChannelFuture service2 = serverBootstrap.bind(iot2).sync();        if (service1.isSuccess()) {            log.info("服务1启动成功, port: {}", iot1);        }        if (service2.isSuccess()) {            log.info("服务2启动成功, port: {}", iot2);        }    }    /**     * 销毁     */    @PreDestroy    public void destroy() {        bossGroup.shutdownGracefully().syncUninterruptibly();        workerGroup.shutdownGracefully().syncUninterruptibly();        log.info("关闭 Netty 成功");    }}

Netty的服务端核心类:ServerBootstrap

package com.coremain.config;import com.coremain.handler.NettyServerHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @description: 配置NettyServer * @author: GuoTong * @createTime: 2023-05-14 14:54 * @since JDK 1.8 OR 11 **/@Configuration@EnableConfigurationPropertiespublic class NettyConfig {    private final NettyProperties nettyProperties;    public NettyConfig(NettyProperties nettyProperties) {        this.nettyProperties = nettyProperties;    }    /**     * boss线程池-进行客户端连接     *     * @return     */    @Bean    public NioEventLoopGroup boosGroup() {        return new NioEventLoopGroup(nettyProperties.getBoss());    }    /**     * worker线程池-进行业务处理     *     * @return     */    @Bean    public NioEventLoopGroup workerGroup() {        return new NioEventLoopGroup(nettyProperties.getWorker());    }    /**     * 服务端启动器,监听客户端连接     *     * @return     */    @Bean    public ServerBootstrap serverBootstrap() {        ServerBootstrap serverBootstrap = new ServerBootstrap()                // 指定使用的线程组                .group(boosGroup(), workerGroup())                // 指定使用的通道                .channel(NioServerSocketChannel.class)                // 三次握手中的A、B队列总和最大值(第二次握手加入A, 第三次握手从A移动到B, accept 后从B取出)                .option(ChannelOption.SO_BACKLOG, 1024)                // 指定连接超时时间                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())                // 支持长连接                .option(ChannelOption.SO_KEEPALIVE, true)                // 接收消息缓冲区大小                .option(ChannelOption.SO_RCVBUF, 2048)                // 发送消息缓冲区大小                .option(ChannelOption.SO_SNDBUF, 2048)                // 指定worker处理器                .childHandler(new NettyServerHandler());        return serverBootstrap;    }}

Netty的通道处理器 ChannelInitializer

package com.coremain.handler;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import org.springframework.stereotype.Component;/** * @description: Netty服务端回调处理 * @ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享。 * @author: GuoTong * @createTime: 2023-05-14 14:57 * @since JDK 1.8 OR 11 **/@ChannelHandler.Sharable@Componentpublic class NettyServerHandler extends ChannelInitializer {    @Override    protected void initChannel(SocketChannel socketChannel) throws Exception {        // 数据分割符        String delimiterStr = "##@##";        ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());        ChannelPipeline pipeline = socketChannel.pipeline();        // 使用自定义处理拆包/沾包,并且每次查找的最大长度为1024字节        pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));        // 将上一步解码后的数据转码为Message实例        pipeline.addLast(new MessageUTF8DecodeHandler());        // 对发送客户端的数据进行编码,并添加数据分隔符        pipeline.addLast(new MessageUTF8EncodeHandler(delimiterStr));        // 对数据进行最终处理        pipeline.addLast(new ServerListenerHandler());    }}

Netty回调处理器SimpleChannelInboundHandler

package com.coremain.handler;import com.coremain.handler.bean.MessageEnum;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.socket.SocketChannel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import java.time.LocalDateTime;/** * @description: 数据处理器,针对不同类型数据分类处理 在处理不同接收数据时使用了枚举类型 * @ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享。 * @author: GuoTong * @createTime: 2023-05-14 15:07 * @since JDK 1.8 OR 11 **/@ChannelHandler.Sharable@Componentpublic class ServerListenerHandler extends SimpleChannelInboundHandler {    private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);    /**     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据     */    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        SocketChannel channel = (SocketChannel) ctx.channel();        String currentData = LocalDateTime.now().format(CommonUtilHandler.dateTimeFormatter);        //通知客户端链接建立成功        String str = "通知客户端链接建立成功" + " " + currentData + " " + channel.localAddress().getHostString() + "\r\n";        ctx.writeAndFlush(str);    }    /**     * 设备接入连接时处理     *     * @param ctx     */    @Override    public void handlerAdded(ChannelHandlerContext ctx) {        log.info("有新的连接:[{}]", ctx.channel().id().asLongText());    }    /**     * 数据处理     *     * @param ctx     * @param msg     */    @Override    protected void channelRead0(ChannelHandlerContext ctx, MessageStrUTF8 msg) {        // 获取消息实例中的消息体        String content = msg.getContent();        // 对不同消息类型进行处理        MessageEnum type = MessageEnum.getStructureEnum(msg);        String currentData = LocalDateTime.now().format(CommonUtilHandler.dateTimeFormatter);        switch (type) {            case CONNECT:                // TODO 心跳消息处理            case STATE:                // TODO 设备状态            default:                log.info(currentData + type.content + " 消息内容" + content);        }    }    /**     * 设备下线处理     *     * @param ctx     */    @Override    public void handlerRemoved(ChannelHandlerContext ctx) {        log.info("设备下线了:{}", ctx.channel().id().asLongText());    }    /**     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据     */    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        log.info("客户端断开链接{}", ctx.channel().localAddress().toString());    }    /**     * 设备连接异常处理     *     * @param ctx     * @param cause     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        // 打印异常        log.info("异常:{}", cause.getMessage());        // 关闭连接        ctx.close();    }}

maven依赖

                    org.springframework.boot            spring-boot-starter                                                org.springframework.boot                    spring-boot-starter-logging                                                                org.springframework.boot            spring-boot-starter-web                                                org.springframework.boot                    spring-boot-starter-tomcat                                                        org.springframework.boot            spring-boot-starter-undertow                            org.projectlombok            lombok            1.18.16                            junit            junit            4.12            test                                    org.springframework.boot            spring-boot-starter-log4j2                                    com.lmax            disruptor            ${disruptor-version}                                    io.netty            netty-all            4.1.68.Final                            com.alibaba.fastjson2            fastjson2            ${fastJson-version}            

项目结构

日志配置文件 log4j2.xml

                        ./NettyStudy/log4j2                application        %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level [%L] - %msg%n        20MB                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    

配置文件 application.yml

# netty 配置netty:  # boss线程数量  boss: 4  # worker线程数量  worker: 2  # 连接超时时间  timeout: 6000  # 服务器主端口  port: 18023  # 服务器备用端口  portSalve: 18026  # 服务器地址  host: 127.0.0.1spring:  application:    name: netty-server  mvc:    servlet:      load-on-startup: 1 #项目启动时执行初始化即可解决。server:  port: 15026  undertow:    accesslog:      enabled: false    direct-buffers: true # 是否分配的直接内存(NIO直接分配的堆外内存)    buffer-size: 1024  #每块buffer的空间大小,越小的空间被利用越充分    threads:      worker: 20 # 阻塞任务线程池, 它的值设置取决于系统线程执行任务的阻塞系数,默认值是IO线程数*8      io: 4  # CPU有几核,就填写几。  servlet:    context-path: /undertow

启动Netty服务端

启动Netty客户端

客户端发送消息

服务端收到

关键词:

相关新闻