netty实战之微信登录群聊4构建服务端与客户端pipeline

编程

我们可以把 解码,登录验证,消息处理,编码 设计成一个个channlhandler,每个handler处理相应的逻辑

package com.juejing.im.attribute;

import io.netty.util.AttributeKey;

public interface Attributes {

AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");

}

package com.juejing.im.client.handle;

import com.juejing.im.protocol.request.LoginRequestPacket;

import com.juejing.im.protocol.response.LoginResponsePacket;

import com.juejing.im.util.LoginUtil;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Date;

import java.util.UUID;

public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

// 创建登录对象

LoginRequestPacket loginRequestPacket = new LoginRequestPacket();

loginRequestPacket.setUserId(UUID.randomUUID().toString());

loginRequestPacket.setUserId("flash");

loginRequestPacket.setPassword("pwd");

// 写数据

ctx.channel().writeAndFlush(loginRequestPacket);

}

@Override

protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) throws Exception {

if(loginResponsePacket.isSuccess()){

System.out.println(new Date() + ": 客户端登录成功");

LoginUtil.markAsLogin(ctx.channel());

} else {

System.out.println(new Date() + ": 客户端登录失败,原因:" + loginResponsePacket.getReason());

}

}

}

package com.juejing.im.client.handle;

import com.juejing.im.protocol.response.MessageResponsePacket;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Date;

public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageResponsePacket messageResponsePacket) throws Exception {

System.out.println(new Date() + ": 收到服务端的消息: " + messageResponsePacket.getMessage());

}

}

package com.juejing.im.client;

import com.juejing.im.client.handle.LoginResponseHandler;

import com.juejing.im.client.handle.MessageResponseHandler;

import com.juejing.im.codec.PacketDecoder;

import com.juejing.im.codec.PacketEncoder;

import com.juejing.im.protocol.request.MessageRequestPacket;

import com.juejing.im.util.LoginUtil;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Date;

import java.util.Scanner;

import java.util.concurrent.TimeUnit;

public class NettyClient {

private static final int MAX_RETRY = 5;

private static final String HOST = "127.0.0.1";

private static final int PORT = 8000;

public static void main(String[] args) {

NioEventLoopGroup workGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(workGroup)

.channel(NioSocketChannel.class)

.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)

.option(ChannelOption.TCP_NODELAY, true)

.option(ChannelOption.SO_KEEPALIVE, true)

.handler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new PacketDecoder());

ch.pipeline().addLast(new LoginResponseHandler());

ch.pipeline().addLast(new MessageResponseHandler());

ch.pipeline().addLast(new PacketEncoder());

}

});

connect(bootstrap, HOST, PORT, MAX_RETRY);

}

private static void connect(Bootstrap bootstrap, String host, int port, int retry) {

bootstrap.connect(host, port).addListener(future -> {

if (future.isSuccess()) {

System.out.println(new Date() + ": 连接成功,启动控制台线程……");

Channel channel = ((ChannelFuture) future).channel();

startConsoleThread(channel);

} else if (retry == 0) {

System.err.println("重试次数已用完,放弃连接!");

} else {

// 第几次重连

int order = (MAX_RETRY - retry) + 1;

// 本次重连的间隔

int delay = 1 << order;

System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");

bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit

.SECONDS);

}

});

}

private static void startConsoleThread(Channel channel) {

new Thread(() -> {

while (!Thread.interrupted()) {

if (LoginUtil.hasLogin(channel)) {

System.out.println("输入消息发送至服务端: ");

Scanner sc = new Scanner(System.in);

String line = sc.nextLine();

channel.writeAndFlush(new MessageRequestPacket(line));

}

}

}).start();

}

}

package com.juejing.im.codec;

import com.juejing.im.protocol.PacketCodeC;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class PacketDecoder extends ByteToMessageDecoder {

@Override

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {

list.add(PacketCodeC.INSTANCE.decode(in));

}

}

package com.juejing.im.codec;

import com.juejing.im.protocol.Packet;

import com.juejing.im.protocol.PacketCodeC;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

public class PacketEncoder extends MessageToByteEncoder<Packet> {

@Override

protected void encode(ChannelHandlerContext channelHandlerContext, Packet packet, ByteBuf byteBuf) throws Exception {

PacketCodeC.INSTANCE.encode(byteBuf,packet);

}

}

package com.juejing.im.protocol.command;

public interface Command {

//登录请求

Byte LOGIN_REQUEST = 1;

//登录响应

Byte LOGIN_RESPONSE = 2;

//消息请求

Byte MESSAGE_REQUEST = 3;

//消息响应

Byte MESSAGE_RESPONSE = 4;

}

package com.juejing.im.protocol.request;

import com.juejing.im.protocol.Packet;

import lombok.Data;

import lombok.NoArgsConstructor;

import static com.juejing.im.protocol.command.Command.LOGIN_REQUEST;

/**

* 登录请求 消息体

*/

@Data

@NoArgsConstructor

public class LoginRequestPacket extends Packet {

private String userId;

private String password;

public LoginRequestPacket(String userId, String password) {

this.userId = userId;

this.password = password;

}

@Override

public Byte getCommand() {

return LOGIN_REQUEST;

}

}

package com.juejing.im.protocol.request;

import com.juejing.im.protocol.Packet;

import lombok.Data;

import lombok.NoArgsConstructor;

import static com.juejing.im.protocol.command.Command.MESSAGE_REQUEST;

/**

* 发送---消息体

*/

@Data

@NoArgsConstructor

public class MessageRequestPacket extends Packet {

private String message;

public MessageRequestPacket(String message) {

this.message = message;

}

@Override

public Byte getCommand() {

return MESSAGE_REQUEST;

}

}

package com.juejing.im.protocol.response;

import com.juejing.im.protocol.Packet;

import lombok.Data;

import static com.juejing.im.protocol.command.Command.LOGIN_RESPONSE;

/**

* 登录响应消息体

*/

@Data

public class LoginResponsePacket extends Packet {

private String reason;

private boolean success;

@Override

public Byte getCommand() {

return LOGIN_RESPONSE;

}

}

package com.juejing.im.protocol.response;

import com.juejing.im.protocol.Packet;

import lombok.Data;

import static com.juejing.im.protocol.command.Command.MESSAGE_RESPONSE;

/**

* 响应消息体

*/

@Data

public class MessageResponsePacket extends Packet {

private String message;

@Override

public Byte getCommand() {

return MESSAGE_RESPONSE;

}

}

package com.juejing.im.protocol;

import lombok.Data;

/**

* 协议版本

*/

@Data

public abstract class Packet {

//版本

private Byte version = 1;

/**

* 协议类型 --登录请求,登录响应、消息请求,消息响应

*

* @return

*/

public abstract Byte getCommand();

}

package com.juejing.im.protocol;

import com.juejing.im.protocol.request.LoginRequestPacket;

import com.juejing.im.protocol.request.MessageRequestPacket;

import com.juejing.im.protocol.response.LoginResponsePacket;

import com.juejing.im.protocol.response.MessageResponsePacket;

import com.juejing.im.serialize.JSONSerializer;

import com.juejing.im.serialize.Serializer;

import com.juejing.im.serialize.SerializerAlogrithm;

import io.netty.buffer.ByteBuf;

import java.util.HashMap;

import java.util.Map;

import static com.juejing.im.protocol.command.Command.*;

/**

* 消息体序列化工具类

*/

public class PacketCodeC {

private static final int MAGIC_NUMBER = 0x12345678;

public static final PacketCodeC INSTANCE = new PacketCodeC();

/**

* 协议版本类型

*/

private final Map<Byte, Class<? extends Packet>> packetTypeMap;

/**

* 协议序列化map

*/

private final Map<Byte, Serializer> serializerMap;

private PacketCodeC() {

packetTypeMap = new HashMap<>();

packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);

packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);

packetTypeMap.put(MESSAGE_REQUEST, MessageRequestPacket.class);

packetTypeMap.put(MESSAGE_RESPONSE, MessageResponsePacket.class);

serializerMap = new HashMap<>();

Serializer serializer = new JSONSerializer();

serializerMap.put(serializer.getSerializerAlogrithm(), serializer);

}

public void encode(ByteBuf byteBuf, Packet packet) {

byte[] bytes = Serializer.DEFAULT.serialize(packet);

byteBuf.writeInt(MAGIC_NUMBER);

byteBuf.writeByte(packet.getVersion());

byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlogrithm());

byteBuf.writeByte(packet.getCommand());

byteBuf.writeInt(bytes.length);

byteBuf.writeBytes(bytes);

}

public Packet decode(ByteBuf byteBuf) {

//跳过 magic number

byteBuf.skipBytes(4);

//跳过版本

byteBuf.skipBytes(1);

// 序列化算法

byte serializeAlgorithm = byteBuf.readByte();

//指令

byte command = byteBuf.readByte();

//读取消息长度

int length = byteBuf.readInt();

byte[] bytes = new byte[length];

byteBuf.readBytes(bytes);

//序列化要返回的类型

Class<? extends Packet> requestType = getRequestType(command);

//需要的序列化算法

Serializer serializer = getSerializer(serializeAlgorithm);

if (requestType != null && serializer != null) {

return serializer.deserialize(requestType, bytes);

} else {

return null;

}

}

private Serializer getSerializer(byte serializeAlgorithm) {

return serializerMap.get(serializeAlgorithm);

}

private Class<? extends Packet> getRequestType(byte command) {

return packetTypeMap.get(command);

}

}

package com.juejing.im.serialize;

import com.alibaba.fastjson.JSON;

public class JSONSerializer implements Serializer {

@Override

public byte getSerializerAlogrithm() {

return SerializerAlogrithm.JSON;

}

@Override

public byte[] serialize(Object object) {

return JSON.toJSONBytes(object);

}

@Override

public <T> T deserialize(Class<T> clazz, byte[] bytes) {

return JSON.parseObject(bytes, clazz);

}

}

package com.juejing.im.serialize;

/**

* 序列华化算法

*/

public interface Serializer {

Serializer DEFAULT = new JSONSerializer();

/**

* 获取 序列化算法

*

* @return

*/

byte getSerializerAlogrithm();

/**

* java 对象 转化成二进制

*

* @param object

* @return

*/

byte[] serialize(Object object);

/**

* 二进制转java对象

*

* @param clazz

* @param bytes

* @param <T>

* @return

*/

<T> T deserialize(Class<T> clazz, byte[] bytes);

}

package com.juejing.im.serialize;

/**

* 序列化种类

*/

public interface SerializerAlogrithm {

/**

* json序列化

*/

byte JSON = 1;

}

package com.juejing.im.server.handle;

import com.juejing.im.protocol.request.LoginRequestPacket;

import com.juejing.im.protocol.response.LoginResponsePacket;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) throws Exception {

System.out.println("收到客户端请求-----------------");

LoginResponsePacket loginResponsePacket = new LoginResponsePacket();

loginResponsePacket.setVersion(loginRequestPacket.getVersion());

if (valid(loginRequestPacket)) {

loginResponsePacket.setSuccess(true);

System.out.println("user=" + loginRequestPacket.getUserId() + ": 登录成功!");

} else {

loginResponsePacket.setReason("账号密码校验失败");

loginResponsePacket.setSuccess(false);

System.out.println("user=" + loginRequestPacket.getUserId() + ": 登录失败!");

}

ctx.channel().writeAndFlush(loginResponsePacket);

}

private boolean valid(LoginRequestPacket loginRequestPacket) {

return true;

}

}

package com.juejing.im.server.handle;

import com.juejing.im.protocol.request.MessageRequestPacket;

import com.juejing.im.protocol.response.MessageResponsePacket;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageRequestPacket messageRequestPacket) throws Exception {

MessageResponsePacket messageResponsePacket = new MessageResponsePacket();

System.out.println("收到客户端消息: " + messageRequestPacket.getMessage());

messageResponsePacket.setMessage("服务端回复【" + messageRequestPacket.getMessage() + "】");

channelHandlerContext.channel().writeAndFlush(messageResponsePacket);

}

}

package com.juejing.im.server;

import com.juejing.im.codec.PacketDecoder;

import com.juejing.im.codec.PacketEncoder;

import com.juejing.im.server.handle.LoginRequestHandler;

import com.juejing.im.server.handle.MessageRequestHandler;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyServer {

private static final int PORT = 8000;

public static void main(String[] args) {

NioEventLoopGroup bossGroup = new NioEventLoopGroup();

NioEventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup,workGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG,1024)

.childOption(ChannelOption.SO_KEEPALIVE,true)

.childOption(ChannelOption.TCP_NODELAY,true)

.childHandler(new ChannelInitializer<NioSocketChannel>() {

@Override

protected void initChannel(NioSocketChannel ch) throws Exception {

ch.pipeline().addLast(new PacketDecoder());

ch.pipeline().addLast(new LoginRequestHandler());

ch.pipeline().addLast(new MessageRequestHandler());

ch.pipeline().addLast(new PacketEncoder());

}

});

bind(bootstrap,PORT);

}

private static void bind(ServerBootstrap bootstrap, int port) {

bootstrap.bind(port).addListener(future -> {

if (future.isSuccess()){

System.out.println("端口"+port+"绑定成功-----------------------------------");

}else {

System.out.println("端口"+port+"绑定失败-----------------------------------");

}

});

}

}

package com.juejing.im.util;

import com.juejing.im.attribute.Attributes;

import io.netty.channel.Channel;

import io.netty.util.Attribute;

public class LoginUtil {

public static void markAsLogin(Channel channel){

channel.attr(Attributes.LOGIN).set(true);

}

public static boolean hasLogin(Channel channel){

Attribute<Boolean> loginAttr = channel.attr(Attributes.LOGIN);

return loginAttr.get() != null;

}

}

 

以上是 netty实战之微信登录群聊4构建服务端与客户端pipeline 的全部内容, 来源链接: utcz.com/z/516330.html

回到顶部