基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要 向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent 事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。
环境JDK1.8 和netty5
以下是具体的代码实现和介绍:
1公共的Share部分(主要包含消息协议类型的定义)
设计消息类型:
1
2
3
|
public enum MsgType { PING,ASK,REPLY,LOGIN } |
Message基类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!! public abstract class BaseMsg implements Serializable { private static final long serialVersionUID = 1L; private MsgType type; //必须唯一,否者会出现channel调用混乱 private String clientId; //初始化客户端id public BaseMsg() { this .clientId = Constants.getClientId(); } public String getClientId() { return clientId; } public void setClientId(String clientId) { this .clientId = clientId; } public MsgType getType() { return type; } public void setType(MsgType type) { this .type = type; } } |
常量设置:
1
2
3
4
5
6
7
8
9
|
public class Constants { private static String clientId; public static String getClientId() { return clientId; } public static void setClientId(String clientId) { Constants.clientId = clientId; } } |
登录类型消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class LoginMsg extends BaseMsg { private String userName; private String password; public LoginMsg() { super (); setType(MsgType.LOGIN); } public String getUserName() { return userName; } public void setUserName(String userName) { this .userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this .password = password; } } |
心跳检测Ping类型消息:
1
2
3
4
5
6
|
public class PingMsg extends BaseMsg { public PingMsg() { super (); setType(MsgType.PING); } } |
请求类型消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class AskMsg extends BaseMsg { public AskMsg() { super (); setType(MsgType.ASK); } private AskParams params; public AskParams getParams() { return params; } public void setParams(AskParams params) { this .params = params; } } //请求类型参数 //必须实现序列化接口 public class AskParams implements Serializable { private static final long serialVersionUID = 1L; private String auth; public String getAuth() { return auth; } public void setAuth(String auth) { this .auth = auth; } } |
响应类型消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
public class ReplyMsg extends BaseMsg { public ReplyMsg() { super (); setType(MsgType.REPLY); } private ReplyBody body; public ReplyBody getBody() { return body; } public void setBody(ReplyBody body) { this .body = body; } } //相应类型body对像 public class ReplyBody implements Serializable { private static final long serialVersionUID = 1L; } public class ReplyClientBody extends ReplyBody { private String clientInfo; public ReplyClientBody(String clientInfo) { this .clientInfo = clientInfo; } public String getClientInfo() { return clientInfo; } public void setClientInfo(String clientInfo) { this .clientInfo = clientInfo; } } public class ReplyServerBody extends ReplyBody { private String serverInfo; public ReplyServerBody(String serverInfo) { this .serverInfo = serverInfo; } public String getServerInfo() { return serverInfo; } public void setServerInfo(String serverInfo) { this .serverInfo = serverInfo; } } |
2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.
Map:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public class NettyChannelMap { private static Map<String,SocketChannel> map= new ConcurrentHashMap<String, SocketChannel>(); public static void add(String clientId,SocketChannel socketChannel){ map.put(clientId,socketChannel); } public static Channel get(String clientId){ return map.get(clientId); } public static void remove(SocketChannel socketChannel){ for (Map.Entry entry:map.entrySet()){ if (entry.getValue()==socketChannel){ map.remove(entry.getKey()); } } } } |
Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //channel失效,从Map中移除 NettyChannelMap.remove((SocketChannel)ctx.channel()); } @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception { if (MsgType.LOGIN.equals(baseMsg.getType())){ LoginMsg loginMsg=(LoginMsg)baseMsg; if ( "robin" .equals(loginMsg.getUserName())&& "yao" .equals(loginMsg.getPassword())){ //登录成功,把channel存到服务端的map中 NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel()); System.out.println( "client" +loginMsg.getClientId()+ " 登录成功" ); } } else { if (NettyChannelMap.get(baseMsg.getClientId())== null ){ //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录 LoginMsg loginMsg= new LoginMsg(); channelHandlerContext.channel().writeAndFlush(loginMsg); } } switch (baseMsg.getType()){ case PING:{ PingMsg pingMsg=(PingMsg)baseMsg; PingMsg replyPing= new PingMsg(); NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing); } break ; case ASK:{ //收到客户端的请求 AskMsg askMsg=(AskMsg)baseMsg; if ( "authToken" .equals(askMsg.getParams().getAuth())){ ReplyServerBody replyBody= new ReplyServerBody( "server info $$$$ !!!" ); ReplyMsg replyMsg= new ReplyMsg(); replyMsg.setBody(replyBody); NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg); } } break ; case REPLY:{ //收到客户端回复 ReplyMsg replyMsg=(ReplyMsg)baseMsg; ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody(); System.out.println( "receive client msg: " +clientBody.getClientInfo()); } break ; default : break ; } ReferenceCountUtil.release(baseMsg); } } |
ServerBootstrap:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
public class NettyServerBootstrap { private int port; private SocketChannel socketChannel; public NettyServerBootstrap( int port) throws InterruptedException { this .port = port; bind(); } private void bind() throws InterruptedException { EventLoopGroup boss= new NioEventLoopGroup(); EventLoopGroup worker= new NioEventLoopGroup(); ServerBootstrap bootstrap= new ServerBootstrap(); bootstrap.group(boss,worker); bootstrap.channel(NioServerSocketChannel. class ); bootstrap.option(ChannelOption.SO_BACKLOG, 128 ); //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去 bootstrap.option(ChannelOption.TCP_NODELAY, true ); //保持长连接状态 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true ); bootstrap.childHandler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast( new ObjectEncoder()); p.addLast( new ObjectDecoder(ClassResolvers.cacheDisabled( null ))); p.addLast( new NettyServerHandler()); } }); ChannelFuture f= bootstrap.bind(port).sync(); if (f.isSuccess()){ System.out.println( "server start---------------" ); } } public static void main(String []args) throws InterruptedException { NettyServerBootstrap bootstrap= new NettyServerBootstrap( 9999 ); while ( true ){ SocketChannel channel=(SocketChannel)NettyChannelMap.get( "001" ); if (channel!= null ){ AskMsg askMsg= new AskMsg(); channel.writeAndFlush(askMsg); } TimeUnit.SECONDS.sleep( 5 ); } } } |
3 Client端:包含发起登录,发送心跳,及对应消息处理
handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> { //利用写空闲发送心跳检测消息 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case WRITER_IDLE: PingMsg pingMsg= new PingMsg(); ctx.writeAndFlush(pingMsg); System.out.println( "send ping to server----------" ); break ; default : break ; } } } @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception { MsgType msgType=baseMsg.getType(); switch (msgType){ case LOGIN:{ //向服务器发起登录 LoginMsg loginMsg= new LoginMsg(); loginMsg.setPassword( "yao" ); loginMsg.setUserName( "robin" ); channelHandlerContext.writeAndFlush(loginMsg); } break ; case PING:{ System.out.println( "receive ping from server----------" ); } break ; case ASK:{ ReplyClientBody replyClientBody= new ReplyClientBody( "client info **** !!!" ); ReplyMsg replyMsg= new ReplyMsg(); replyMsg.setBody(replyClientBody); channelHandlerContext.writeAndFlush(replyMsg); } break ; case REPLY:{ ReplyMsg replyMsg=(ReplyMsg)baseMsg; ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody(); System.out.println( "receive client msg: " +replyServerBody.getServerInfo()); } default : break ; } ReferenceCountUtil.release(msgType); } } |
bootstrap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
public class NettyClientBootstrap { private int port; private String host; private SocketChannel socketChannel; private static final EventExecutorGroup group = new DefaultEventExecutorGroup( 20 ); public NettyClientBootstrap( int port, String host) throws InterruptedException { this .port = port; this .host = host; start(); } private void start() throws InterruptedException { EventLoopGroup eventLoopGroup= new NioEventLoopGroup(); Bootstrap bootstrap= new Bootstrap(); bootstrap.channel(NioSocketChannel. class ); bootstrap.option(ChannelOption.SO_KEEPALIVE, true ); bootstrap.group(eventLoopGroup); bootstrap.remoteAddress(host,port); bootstrap.handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast( new IdleStateHandler( 20 , 10 , 0 )); socketChannel.pipeline().addLast( new ObjectEncoder()); socketChannel.pipeline().addLast( new ObjectDecoder(ClassResolvers.cacheDisabled( null ))); socketChannel.pipeline().addLast( new NettyClientHandler()); } }); ChannelFuture future =bootstrap.connect(host,port).sync(); if (future.isSuccess()) { socketChannel = (SocketChannel)future.channel(); System.out.println( "connect server 成功---------" ); } } public static void main(String[]args) throws InterruptedException { Constants.setClientId( "001" ); NettyClientBootstrap bootstrap= new NettyClientBootstrap( 9999 , "localhost" ); LoginMsg loginMsg= new LoginMsg(); loginMsg.setPassword( "yao" ); loginMsg.setUserName( "robin" ); bootstrap.socketChannel.writeAndFlush(loginMsg); while ( true ){ TimeUnit.SECONDS.sleep( 3 ); AskMsg askMsg= new AskMsg(); AskParams askParams= new AskParams(); askParams.setAuth( "authToken" ); askMsg.setParams(askParams); bootstrap.socketChannel.writeAndFlush(askMsg); } } } |
具体的例子和相应pom.xml 见 https://github.com/WangErXiao/ServerClient