netty实战之微信登录群聊4构建服务端与客户端pipeline
我们可以把 解码,登录验证,消息处理,编码 设计成一个个channlhandler,每个handler处理相应的逻辑
package com.juejing.im.attribute;import io.netty.util.AttributeKey;
public interface Attributes {
AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");
}
package com.juejing.im.client.handle;import com.juejing.im.protocol.request.LoginRequestPacket;
import com.juejing.im.protocol.response.LoginResponsePacket;
import com.juejing.im.util.LoginUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Date;
import java.util.UUID;
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 创建登录对象
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setUserId(UUID.randomUUID().toString());
loginRequestPacket.setUserId("flash");
loginRequestPacket.setPassword("pwd");
// 写数据
ctx.channel().writeAndFlush(loginRequestPacket);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) throws Exception {
if(loginResponsePacket.isSuccess()){
System.out.println(new Date() + ": 客户端登录成功");
LoginUtil.markAsLogin(ctx.channel());
} else {
System.out.println(new Date() + ": 客户端登录失败,原因:" + loginResponsePacket.getReason());
}
}
}
package com.juejing.im.client.handle;
import com.juejing.im.protocol.response.MessageResponsePacket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Date;
public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageResponsePacket messageResponsePacket) throws Exception {
System.out.println(new Date() + ": 收到服务端的消息: " + messageResponsePacket.getMessage());
}
}
package com.juejing.im.client;
import com.juejing.im.client.handle.LoginResponseHandler;
import com.juejing.im.client.handle.MessageResponseHandler;
import com.juejing.im.codec.PacketDecoder;
import com.juejing.im.codec.PacketEncoder;
import com.juejing.im.protocol.request.MessageRequestPacket;
import com.juejing.im.util.LoginUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Date;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
public class NettyClient {
private static final int MAX_RETRY = 5;
private static final String HOST = "127.0.0.1";
private static final int PORT = 8000;
public static void main(String[] args) {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginResponseHandler());
ch.pipeline().addLast(new MessageResponseHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
connect(bootstrap, HOST, PORT, MAX_RETRY);
}
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 连接成功,启动控制台线程……");
Channel channel = ((ChannelFuture) future).channel();
startConsoleThread(channel);
} else if (retry == 0) {
System.err.println("重试次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
.SECONDS);
}
});
}
private static void startConsoleThread(Channel channel) {
new Thread(() -> {
while (!Thread.interrupted()) {
if (LoginUtil.hasLogin(channel)) {
System.out.println("输入消息发送至服务端: ");
Scanner sc = new Scanner(System.in);
String line = sc.nextLine();
channel.writeAndFlush(new MessageRequestPacket(line));
}
}
}).start();
}
}
package com.juejing.im.codec;import com.juejing.im.protocol.PacketCodeC;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class PacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
list.add(PacketCodeC.INSTANCE.decode(in));
}
}
package com.juejing.im.codec;
import com.juejing.im.protocol.Packet;
import com.juejing.im.protocol.PacketCodeC;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class PacketEncoder extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Packet packet, ByteBuf byteBuf) throws Exception {
PacketCodeC.INSTANCE.encode(byteBuf,packet);
}
}
package com.juejing.im.protocol.command;
public interface Command {
//登录请求
Byte LOGIN_REQUEST = 1;
//登录响应
Byte LOGIN_RESPONSE = 2;
//消息请求
Byte MESSAGE_REQUEST = 3;
//消息响应
Byte MESSAGE_RESPONSE = 4;
}
package com.juejing.im.protocol.request;
import com.juejing.im.protocol.Packet;
import lombok.Data;
import lombok.NoArgsConstructor;
import static com.juejing.im.protocol.command.Command.LOGIN_REQUEST;
/**
* 登录请求 消息体
*/
@Data
@NoArgsConstructor
public class LoginRequestPacket extends Packet {
private String userId;
private String password;
public LoginRequestPacket(String userId, String password) {
this.userId = userId;
this.password = password;
}
@Override
public Byte getCommand() {
return LOGIN_REQUEST;
}
}
package com.juejing.im.protocol.request;
import com.juejing.im.protocol.Packet;
import lombok.Data;
import lombok.NoArgsConstructor;
import static com.juejing.im.protocol.command.Command.MESSAGE_REQUEST;
/**
* 发送---消息体
*/
@Data
@NoArgsConstructor
public class MessageRequestPacket extends Packet {
private String message;
public MessageRequestPacket(String message) {
this.message = message;
}
@Override
public Byte getCommand() {
return MESSAGE_REQUEST;
}
}
package com.juejing.im.protocol.response;
import com.juejing.im.protocol.Packet;
import lombok.Data;
import static com.juejing.im.protocol.command.Command.LOGIN_RESPONSE;
/**
* 登录响应消息体
*/
@Data
public class LoginResponsePacket extends Packet {
private String reason;
private boolean success;
@Override
public Byte getCommand() {
return LOGIN_RESPONSE;
}
}
package com.juejing.im.protocol.response;
import com.juejing.im.protocol.Packet;
import lombok.Data;
import static com.juejing.im.protocol.command.Command.MESSAGE_RESPONSE;
/**
* 响应消息体
*/
@Data
public class MessageResponsePacket extends Packet {
private String message;
@Override
public Byte getCommand() {
return MESSAGE_RESPONSE;
}
}
package com.juejing.im.protocol;
import lombok.Data;
/**
* 协议版本
*/
@Data
public abstract class Packet {
//版本
private Byte version = 1;
/**
* 协议类型 --登录请求,登录响应、消息请求,消息响应
*
* @return
*/
public abstract Byte getCommand();
}
package com.juejing.im.protocol;
import com.juejing.im.protocol.request.LoginRequestPacket;
import com.juejing.im.protocol.request.MessageRequestPacket;
import com.juejing.im.protocol.response.LoginResponsePacket;
import com.juejing.im.protocol.response.MessageResponsePacket;
import com.juejing.im.serialize.JSONSerializer;
import com.juejing.im.serialize.Serializer;
import com.juejing.im.serialize.SerializerAlogrithm;
import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.Map;
import static com.juejing.im.protocol.command.Command.*;
/**
* 消息体序列化工具类
*/
public class PacketCodeC {
private static final int MAGIC_NUMBER = 0x12345678;
public static final PacketCodeC INSTANCE = new PacketCodeC();
/**
* 协议版本类型
*/
private final Map<Byte, Class<? extends Packet>> packetTypeMap;
/**
* 协议序列化map
*/
private final Map<Byte, Serializer> serializerMap;
private PacketCodeC() {
packetTypeMap = new HashMap<>();
packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);
packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);
packetTypeMap.put(MESSAGE_REQUEST, MessageRequestPacket.class);
packetTypeMap.put(MESSAGE_RESPONSE, MessageResponsePacket.class);
serializerMap = new HashMap<>();
Serializer serializer = new JSONSerializer();
serializerMap.put(serializer.getSerializerAlogrithm(), serializer);
}
public void encode(ByteBuf byteBuf, Packet packet) {
byte[] bytes = Serializer.DEFAULT.serialize(packet);
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlogrithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
}
public Packet decode(ByteBuf byteBuf) {
//跳过 magic number
byteBuf.skipBytes(4);
//跳过版本
byteBuf.skipBytes(1);
// 序列化算法
byte serializeAlgorithm = byteBuf.readByte();
//指令
byte command = byteBuf.readByte();
//读取消息长度
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
//序列化要返回的类型
Class<? extends Packet> requestType = getRequestType(command);
//需要的序列化算法
Serializer serializer = getSerializer(serializeAlgorithm);
if (requestType != null && serializer != null) {
return serializer.deserialize(requestType, bytes);
} else {
return null;
}
}
private Serializer getSerializer(byte serializeAlgorithm) {
return serializerMap.get(serializeAlgorithm);
}
private Class<? extends Packet> getRequestType(byte command) {
return packetTypeMap.get(command);
}
}
package com.juejing.im.serialize;
import com.alibaba.fastjson.JSON;
public class JSONSerializer implements Serializer {
@Override
public byte getSerializerAlogrithm() {
return SerializerAlogrithm.JSON;
}
@Override
public byte[] serialize(Object object) {
return JSON.toJSONBytes(object);
}
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
return JSON.parseObject(bytes, clazz);
}
}
package com.juejing.im.serialize;
/**
* 序列华化算法
*/
public interface Serializer {
Serializer DEFAULT = new JSONSerializer();
/**
* 获取 序列化算法
*
* @return
*/
byte getSerializerAlogrithm();
/**
* java 对象 转化成二进制
*
* @param object
* @return
*/
byte[] serialize(Object object);
/**
* 二进制转java对象
*
* @param clazz
* @param bytes
* @param <T>
* @return
*/
<T> T deserialize(Class<T> clazz, byte[] bytes);
}
package com.juejing.im.serialize;
/**
* 序列化种类
*/
public interface SerializerAlogrithm {
/**
* json序列化
*/
byte JSON = 1;
}
package com.juejing.im.server.handle;
import com.juejing.im.protocol.request.LoginRequestPacket;
import com.juejing.im.protocol.response.LoginResponsePacket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) throws Exception {
System.out.println("收到客户端请求-----------------");
LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
loginResponsePacket.setVersion(loginRequestPacket.getVersion());
if (valid(loginRequestPacket)) {
loginResponsePacket.setSuccess(true);
System.out.println("user=" + loginRequestPacket.getUserId() + ": 登录成功!");
} else {
loginResponsePacket.setReason("账号密码校验失败");
loginResponsePacket.setSuccess(false);
System.out.println("user=" + loginRequestPacket.getUserId() + ": 登录失败!");
}
ctx.channel().writeAndFlush(loginResponsePacket);
}
private boolean valid(LoginRequestPacket loginRequestPacket) {
return true;
}
}
package com.juejing.im.server.handle;
import com.juejing.im.protocol.request.MessageRequestPacket;
import com.juejing.im.protocol.response.MessageResponsePacket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageRequestPacket messageRequestPacket) throws Exception {
MessageResponsePacket messageResponsePacket = new MessageResponsePacket();
System.out.println("收到客户端消息: " + messageRequestPacket.getMessage());
messageResponsePacket.setMessage("服务端回复【" + messageRequestPacket.getMessage() + "】");
channelHandlerContext.channel().writeAndFlush(messageResponsePacket);
}
}
package com.juejing.im.server;
import com.juejing.im.codec.PacketDecoder;
import com.juejing.im.codec.PacketEncoder;
import com.juejing.im.server.handle.LoginRequestHandler;
import com.juejing.im.server.handle.MessageRequestHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyServer {
private static final int PORT = 8000;
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(ChannelOption.TCP_NODELAY,true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
bind(bootstrap,PORT);
}
private static void bind(ServerBootstrap bootstrap, int port) {
bootstrap.bind(port).addListener(future -> {
if (future.isSuccess()){
System.out.println("端口"+port+"绑定成功-----------------------------------");
}else {
System.out.println("端口"+port+"绑定失败-----------------------------------");
}
});
}
}
package com.juejing.im.util;
import com.juejing.im.attribute.Attributes;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
public class LoginUtil {
public static void markAsLogin(Channel channel){
channel.attr(Attributes.LOGIN).set(true);
}
public static boolean hasLogin(Channel channel){
Attribute<Boolean> loginAttr = channel.attr(Attributes.LOGIN);
return loginAttr.get() != null;
}
}
以上是 netty实战之微信登录群聊4构建服务端与客户端pipeline 的全部内容, 来源链接: utcz.com/z/516330.html