【资料图】
Netty测试客户端
package com.coremain;import com.coremain.handler.ServerListenerHandler;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.util.CharsetUtil;import java.util.Scanner;/** * @description: 测试 * @author: GuoTong * @createTime: 2023-05-14 16:46 * @since JDK 1.8 OR 11 **/public class NettyClientTest { public static void main(String[] args) throws InterruptedException { // 客户端的线程池 EventLoopGroup workerGroup = new NioEventLoopGroup(8); try { // 创建Netty客户端端的启动对象 Bootstrap bootstrap = new Bootstrap(); // 使用链式编程来配置参数 bootstrap.group(workerGroup) .channel(NioSocketChannel.class).handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { //对workerGroup的SocketChannel设置处理器 ChannelPipeline pipeline = ch.pipeline(); // 对于通道加入解码器 pipeline.addLast("decoder", new StringDecoder()); // 对于通道加入加码器 pipeline.addLast("encoder", new StringDecoder()); // 加入事件回调处理器 pipeline.addLast(new ServerListenerHandler()); } }); System.out.println("基于Netty的客户端接入启动完成...."); ChannelFuture cf = bootstrap.connect("127.0.0.1", 18023).sync(); // 获取连接通道 Channel channel = cf.channel(); System.out.println("+++++++" + channel.localAddress() + "======="); // 客户端输入扫描器 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String next = scanner.next(); // 发送到服务端 channel.writeAndFlush(Unpooled.buffer().writeBytes(next.getBytes(CharsetUtil.UTF_8))); } // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法 cf.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } }}
Netty的Server启动类
package com.coremain.netty;import com.coremain.handler.NettyServerHTTPHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;/** * @description: Netty服务启动器 * netty服务端会跟随一起启动。 同时,在springboot关闭前,会先销毁netty服务。 * @author: GuoTong * @createTime: 2023-05-14 15:13 * @since JDK 1.8 OR 11 **/@Componentpublic class NettyServerHTTPRunning { // log4j2的AsyncLogger本身的逻辑采用了缓冲区思想,使用的是disruptor框架来实现一个环形无锁队列。 private static final Logger log = LoggerFactory.getLogger(NettyServerHTTPRunning.class); /** * 主线程组 */ private NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); /** * 工作线程组 */ private NioEventLoopGroup workerGroup = new NioEventLoopGroup(10); /** * (http)主要服务端口 */ @Value("${iot.port1:18024}") private int iot1; /** * (http)备用服务端口 */ @Value("${iot.port2:18025}") private int iot2; /** * 启动 netty 服务 */ @PostConstruct public void startServer() throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 三次握手中的A、B队列总和最大值(第二次握手加入A, 第三次握手从A移动到B, accept 后从B取出) .option(ChannelOption.SO_BACKLOG, 1024) // 解决端口占用问题, 可以共用服务器端口(即使该端口已被其他端口占用) .option(ChannelOption.SO_REUSEADDR, true) // 接收消息缓冲区大小 .option(ChannelOption.SO_RCVBUF, 2048) // 发送消息缓冲区大小 .option(ChannelOption.SO_SNDBUF, 2048) // 用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法; // 如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送 .option(ChannelOption.TCP_NODELAY, true) // 用于检测长时间没有数据传输的连接状态,当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .option(ChannelOption.SO_KEEPALIVE, true) // 当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证全部发送成功 // 使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送 .option(ChannelOption.SO_LINGER, 2000) .childHandler(new NettyServerHTTPHandler()); ChannelFuture service1 = serverBootstrap.bind(iot1).sync(); ChannelFuture service2 = serverBootstrap.bind(iot2).sync(); if (service1.isSuccess()) { log.info("服务1启动成功, port: {}", iot1); } if (service2.isSuccess()) { log.info("服务2启动成功, port: {}", iot2); } } /** * 销毁 */ @PreDestroy public void destroy() { bossGroup.shutdownGracefully().syncUninterruptibly(); workerGroup.shutdownGracefully().syncUninterruptibly(); log.info("关闭 Netty 成功"); }}
Netty的服务端核心类:ServerBootstrap
package com.coremain.config;import com.coremain.handler.NettyServerHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @description: 配置NettyServer * @author: GuoTong * @createTime: 2023-05-14 14:54 * @since JDK 1.8 OR 11 **/@Configuration@EnableConfigurationPropertiespublic class NettyConfig { private final NettyProperties nettyProperties; public NettyConfig(NettyProperties nettyProperties) { this.nettyProperties = nettyProperties; } /** * boss线程池-进行客户端连接 * * @return */ @Bean public NioEventLoopGroup boosGroup() { return new NioEventLoopGroup(nettyProperties.getBoss()); } /** * worker线程池-进行业务处理 * * @return */ @Bean public NioEventLoopGroup workerGroup() { return new NioEventLoopGroup(nettyProperties.getWorker()); } /** * 服务端启动器,监听客户端连接 * * @return */ @Bean public ServerBootstrap serverBootstrap() { ServerBootstrap serverBootstrap = new ServerBootstrap() // 指定使用的线程组 .group(boosGroup(), workerGroup()) // 指定使用的通道 .channel(NioServerSocketChannel.class) // 三次握手中的A、B队列总和最大值(第二次握手加入A, 第三次握手从A移动到B, accept 后从B取出) .option(ChannelOption.SO_BACKLOG, 1024) // 指定连接超时时间 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout()) // 支持长连接 .option(ChannelOption.SO_KEEPALIVE, true) // 接收消息缓冲区大小 .option(ChannelOption.SO_RCVBUF, 2048) // 发送消息缓冲区大小 .option(ChannelOption.SO_SNDBUF, 2048) // 指定worker处理器 .childHandler(new NettyServerHandler()); return serverBootstrap; }}
Netty的通道处理器 ChannelInitializer
package com.coremain.handler;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import org.springframework.stereotype.Component;/** * @description: Netty服务端回调处理 * @ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享。 * @author: GuoTong * @createTime: 2023-05-14 14:57 * @since JDK 1.8 OR 11 **/@ChannelHandler.Sharable@Componentpublic class NettyServerHandler extends ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 数据分割符 String delimiterStr = "##@##"; ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes()); ChannelPipeline pipeline = socketChannel.pipeline(); // 使用自定义处理拆包/沾包,并且每次查找的最大长度为1024字节 pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); // 将上一步解码后的数据转码为Message实例 pipeline.addLast(new MessageUTF8DecodeHandler()); // 对发送客户端的数据进行编码,并添加数据分隔符 pipeline.addLast(new MessageUTF8EncodeHandler(delimiterStr)); // 对数据进行最终处理 pipeline.addLast(new ServerListenerHandler()); }}
Netty回调处理器SimpleChannelInboundHandler
package com.coremain.handler;import com.coremain.handler.bean.MessageEnum;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.socket.SocketChannel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import java.time.LocalDateTime;/** * @description: 数据处理器,针对不同类型数据分类处理 在处理不同接收数据时使用了枚举类型 * @ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享。 * @author: GuoTong * @createTime: 2023-05-14 15:07 * @since JDK 1.8 OR 11 **/@ChannelHandler.Sharable@Componentpublic class ServerListenerHandler extends SimpleChannelInboundHandler { private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class); /** * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { SocketChannel channel = (SocketChannel) ctx.channel(); String currentData = LocalDateTime.now().format(CommonUtilHandler.dateTimeFormatter); //通知客户端链接建立成功 String str = "通知客户端链接建立成功" + " " + currentData + " " + channel.localAddress().getHostString() + "\r\n"; ctx.writeAndFlush(str); } /** * 设备接入连接时处理 * * @param ctx */ @Override public void handlerAdded(ChannelHandlerContext ctx) { log.info("有新的连接:[{}]", ctx.channel().id().asLongText()); } /** * 数据处理 * * @param ctx * @param msg */ @Override protected void channelRead0(ChannelHandlerContext ctx, MessageStrUTF8 msg) { // 获取消息实例中的消息体 String content = msg.getContent(); // 对不同消息类型进行处理 MessageEnum type = MessageEnum.getStructureEnum(msg); String currentData = LocalDateTime.now().format(CommonUtilHandler.dateTimeFormatter); switch (type) { case CONNECT: // TODO 心跳消息处理 case STATE: // TODO 设备状态 default: log.info(currentData + type.content + " 消息内容" + content); } } /** * 设备下线处理 * * @param ctx */ @Override public void handlerRemoved(ChannelHandlerContext ctx) { log.info("设备下线了:{}", ctx.channel().id().asLongText()); } /** * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("客户端断开链接{}", ctx.channel().localAddress().toString()); } /** * 设备连接异常处理 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); // 打印异常 log.info("异常:{}", cause.getMessage()); // 关闭连接 ctx.close(); }}
maven依赖
org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-logging org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-tomcat org.springframework.boot spring-boot-starter-undertow org.projectlombok lombok 1.18.16 junit junit 4.12 test org.springframework.boot spring-boot-starter-log4j2 com.lmax disruptor ${disruptor-version} io.netty netty-all 4.1.68.Final com.alibaba.fastjson2 fastjson2 ${fastJson-version}
项目结构
日志配置文件 log4j2.xml
./NettyStudy/log4j2 application %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level [%L] - %msg%n 20MB
配置文件 application.yml
# netty 配置netty: # boss线程数量 boss: 4 # worker线程数量 worker: 2 # 连接超时时间 timeout: 6000 # 服务器主端口 port: 18023 # 服务器备用端口 portSalve: 18026 # 服务器地址 host: 127.0.0.1spring: application: name: netty-server mvc: servlet: load-on-startup: 1 #项目启动时执行初始化即可解决。server: port: 15026 undertow: accesslog: enabled: false direct-buffers: true # 是否分配的直接内存(NIO直接分配的堆外内存) buffer-size: 1024 #每块buffer的空间大小,越小的空间被利用越充分 threads: worker: 20 # 阻塞任务线程池, 它的值设置取决于系统线程执行任务的阻塞系数,默认值是IO线程数*8 io: 4 # CPU有几核,就填写几。 servlet: context-path: /undertow
启动Netty服务端
启动Netty客户端
客户端发送消息
服务端收到
关键词: