
所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保 TCP 连接的有效性。
心跳检测机制:客户端每隔一段时间发送PING消息给服务端,服务端接受到后回复PONG消息。客户端如果在一定时间内没有收到PONG响应,则认为连接断开,服务端如果在一定时间内没有收到来自客户端的PING请求,则认为连接已经断开。通过这种来回的PING-PONG消息机制侦测连接的活跃性。
在 Netty 中,本身也提供了 IdleStateHandler 用于检测连接闲置,该Handler可以检测连接未发生读写事件而触发相应事件。
看下它的构造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
复制代码
解释下三个参数的含义:
- eaderIdleTimeSeconds 读超时:当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLE 的 IdleStateEvent 事件。
- riterIdleTimeSeconds 写超时:即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLE 的 IdleStateEvent 事件。
- llIdleTimeSeconds 读/写超时:即当在指定的时间间隔内没有读或写 *** 作时,会触发一个 ALL_IDLE 的 IdleStateEvent 事件。
注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
复制代码
要实现 Netty 服务端心跳检测机制需要在服务器端的 ChannelInitializer 中加入如下的代码:
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS)); 复制代码
初步地看下 IdleStateHandler 源码,先看下 IdleStateHandler 中的 channelRead 方法:
红框代码其实表示该方法只是进行了透传,不做任何业务逻辑处理,让 channelPipe 中的下一个 handler 处理channelRead 方法
我们再看看 channelActive 方法:
这里有个 initialize 的方法,这是 IdleStateHandler 的精髓,接着探究:
这边会触发一个 Task,ReaderIdleTimeoutTask,这个task里的 run 方法源码是这样的:
第一个红框代码是用当前时间减去最后一次 channelRead 方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead 已经是6s之前的事情了,你设置的是5s,那么 nextDelay 则为-1,说明超时了,那么第二个红框代码则会触发下一个 handler 的 userEventTriggered 方法:
如果没有超时则不触发 userEventTriggered 方法。
服务端代码public class HeartBeatServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast("decoder",new StringDecoder());
channelPipeline.addLast("encoder",new StringEncoder());
channelPipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));
channelPipeline.addLast(new HeartBeatServerHandler());
}
});
System.out.println("netty server start。。");
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
复制代码
继承 SimpleChannelInboundHandler 重写 channelRead 方法和 userEventTriggered 方法
public class HeartBeatServerHandler extends SimpleChannelInboundHandler客户端代码{ int readIdleTimes = 0; @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println(" ====== > [server] message received : " + s); if ("Heartbeat Packet".equals(s)){ ctx.channel().writeAndFlush("ok"); }else{ System.out.println(" 其他信息处理..."); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()){ case READER_IDLE: eventType = "读空闲"; readIdleTimes++; // 读空闲的计数加1 break; case WRITER_IDLE: eventType = "写空闲"; // 不处理 break; case ALL_IDLE: eventType = "读写空闲"; // 不处理 break; } System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType); if (readIdleTimes > 3){ System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源"); ctx.channel().writeAndFlush("idle close"); ctx.channel().close(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.err.println("=== " + ctx.channel().remoteAddress() + " is active ==="); } } 复制代码
public class HeartBeatClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast("decoder",new StringDecoder());
channelPipeline.addLast("encoder",new StringEncoder());
channelPipeline.addLast(new HeartBeatClientHandler());
}
});
System.out.println("netty client start。。");
Channel channel = bootstrap.connect("127.0.0.1", 8888).sync().channel();
String text = "Heartbeat Packet";
Random random = new Random();
while(channel.isActive()){
int num = random.nextInt(8);
Thread.sleep(num*1000);
channel.writeAndFlush(text);
}
}catch (Exception e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
static class HeartBeatClientHandler extends SimpleChannelInboundHandler{
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(" client received :" + msg);
if (msg != null && msg.equals("idle close")) {
System.out.println(" 服务端关闭连接,客户端也关闭");
ctx.channel().closeFuture();
}
}
}
} 欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)