java socket 模拟im 即时通讯

java

自己想了一下怎么实现,就写了,没有深究是否合理.更多处理没有写下去,例如收件人不在线,应该保存在数据库,等下一次连接的时候刷新map,再把数据发送过去,图片发送也没有做,也没有用json格式

socket很奇怪,我用客户端连接上了服务器,没有发送消息的情况下,断开电脑网络,是不会出现问题,然后在把电脑网络连接上,通讯依然正常,正常断开也不出问题,但是用idea直接按stop键,那么服务端就会出问题了,读取事件会一直为true,造成死循环,消耗CPU,所以必须要判断一下客户端连接是否断开了

 

只需要把客户端代码启动几个,修改一些userName以及收件人,就可以测试,实现类似QQ微信即时通讯,聊天功能

 

 

服务端代码

package serversocketchannel;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.nio.charset.Charset;

import java.util.Iterator;

import java.util.concurrent.ConcurrentHashMap;

/**

*

* @author ZhenWeiLai

*

*/

public class ServerSocketChannelNonBlocking {

private static ServerSocketChannel serverSocketChannel = null;

private static Charset charset = Charset.forName("GBK");//设置编码集,用于编码,解码

private static Selector selector = null;

//保存客户端的map

private static final ConcurrentHashMap<String,SocketChannel> clientSockets = new ConcurrentHashMap<>();

static{

try {

serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().setReuseAddress(true);

serverSocketChannel.socket().bind(new InetSocketAddress(8000));

serverSocketChannel.configureBlocking(false);//设置为非阻塞

selector = Selector.open();//实例化一个选择器

} catch (IOException e) {

e.printStackTrace();

}

}

public static void main(String[] args) {

service();

}

private static void service(){

SocketChannel clientChannel = null;

SelectionKey selectionKey = null;

SocketChannel targetChannel = null;

try {

serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);//服务端监听连接

while(true){

selector.select();//阻塞至有新的连接就开始处理

Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();

while(selectionKeys.hasNext()){

selectionKey = selectionKeys.next();

if(selectionKey.isAcceptable()){//如果事件是连接事件

ServerSocketChannel serverChannel = (ServerSocketChannel)selectionKey.channel();//获取事件绑定的channel

clientChannel = serverChannel.accept();//连接获取带客户端信息的socketChannel

clientChannel.configureBlocking(false);//客户设置为非阻塞,因为非阻塞才支持选择器.避免盲等浪费资源

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//作为每一个客户端的附件缓冲器

/**

* 只监听读事件,这里千万别监听写事件,因为只要连接有效,那么写事件会一直为true,导致死循环,很耗资源

* 可以跟serverSocket用同一个选择器,因为绑定的channel不同

*/

clientChannel.register(selector,SelectionKey.OP_READ,byteBuffer);

}else if(selectionKey.isReadable()){//只要有客户端写入,那么就可以处理

//获取客户端附件,也就是写入的数据

ByteBuffer byteBuffer = (ByteBuffer)selectionKey.attachment();

//从selectionKey获取客户端的channel

SocketChannel socketChannel = (SocketChannel)selectionKey.channel();

//把附件读出,解码为字符串

String msg = read(socketChannel,byteBuffer);

//这里用了->分割收件人,->后面跟着的字符串是收件人

if(msg.indexOf("->")!=-1){

//内容

String content = msg.substring(0,msg.lastIndexOf("->"));

//从map里获取收件人的socket

targetChannel = clientSockets.get(msg.substring(msg.lastIndexOf("->")+2));

//实例化一个缓冲区,用来写出到收件人的socketChannel

ByteBuffer temp = ByteBuffer.allocate(1024);

temp.put(charset.encode(content));

//写出

handleWrite(targetChannel,temp);

}else{

//如果内容没有收件人,那么视为第一次连接,客户端发过来的userName,作为KEY存入MAP

clientSockets.put(msg,socketChannel);

}

}

selectionKeys.remove();

}

}

} catch (IOException e) {

try {

if(selectionKey!=null)selectionKey.cancel();

if(clientChannel!=null){

clientChannel.shutdownInput();

clientChannel.shutdownOutput();

clientChannel.close();

}

if(targetChannel!=null){

targetChannel.shutdownInput();

targetChannel.shutdownOutput();

targetChannel.close();

}

} catch (IOException e1) {

// TODO Auto-generated catch block

e1.printStackTrace();

}

e.printStackTrace();

}

}

private static String read(SocketChannel socketChannel,ByteBuffer byteBuffer){

//重置position limit为写入做准备

byteBuffer.clear();

try {

int flag =socketChannel.read(byteBuffer);

//判断客户端是否断开连接

if(flag==-1){

//如果客户端无故断开,一定要关闭,否则读取事件一直为true造成死循环,非常耗资源

socketChannel.close();

}

} catch (IOException e) {

try {

socketChannel.close();

} catch (IOException e1) {

e1.printStackTrace();

}

e.printStackTrace();

}

//position =0 limit等于有效下标,为写出做准备

byteBuffer.flip();

return charset.decode(byteBuffer).toString();

}

//写出

private static void handleWrite(SocketChannel socketChannel,ByteBuffer byteBuffer){

synchronized (byteBuffer) {

byteBuffer.flip();

try {

socketChannel.write(byteBuffer);

} catch (IOException e) {

try {

socketChannel.close();

} catch (IOException e1) {

e1.printStackTrace();

}

e.printStackTrace();

}

}

}

}

客户端代码

package socketchannel;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.InetSocketAddress;

import java.net.SocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.ClosedChannelException;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

import java.nio.charset.Charset;

import java.util.Iterator;

/**

* Created by lzw on 17-2-28.

*/

public class SocketChannelNonBlockingClient {

private static Charset charset = Charset.forName("GBK");

private static ByteBuffer receiveBuffer = ByteBuffer.allocate(10240);

private static ByteBuffer sendBuffer = ByteBuffer.allocate(10240);

private static SocketChannel socketChannel = null;

private static Selector selector = null;

private static String userName = "client1";//客户端名

private static String targetName = "client2";//收件人名

public static void main(String[] args) {

try {

socketChannel = SocketChannel.open();

//连接到服务端

SocketAddress socketAddress = new InetSocketAddress("19.95.103.112",8000);

selector = Selector.open();//实例化一个选择器

socketChannel.configureBlocking(false);//设置为非阻塞

//先监听一个连接事件

socketChannel.register(selector,SelectionKey.OP_CONNECT);

//连接

socketChannel.connect(socketAddress);

//jdk 1.8的lambda表达式,用一个线程监控控制台输入

new Thread(()->{

try {

receiveFromUser();

} catch (IOException e) {

e.printStackTrace();

}

}).start();

talk();

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

private static void talk(){

try {

while(true){

selector.select();//阻塞直到连接事件

Iterator<SelectionKey> readyKeys = selector.selectedKeys().iterator();

while(readyKeys.hasNext()){

SelectionKey key =readyKeys.next();

if(key.isConnectable()){

//非阻塞的情况下可能没有连接完成,这里调用finishConnect阻塞至连接完成

socketChannel.finishConnect();

//连接完成以后,先发送自己的userName以便保存在服务端的客户端map里面

synchronized (sendBuffer){

SocketChannel socketChannel1 = (SocketChannel)key.channel();

sendBuffer.clear();

sendBuffer.put(charset.encode(userName));

send(socketChannel1);

socketChannel.register(selector,SelectionKey.OP_READ);//仅监听一个读取事件

}

}else if(key.isReadable()){

//处理读事件

receive(key);

}

readyKeys.remove();

}

}

} catch (ClosedChannelException e) {

try {

socketChannel.close();

} catch (IOException e1) {

e1.printStackTrace();

}

e.printStackTrace();

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

/**

* 从控制台获取用户输入

* @throws IOException

*/

private static void receiveFromUser() throws IOException{

//阻塞直到控制台有输入

BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

for(String msg = br.readLine();msg!=null&&!msg.equals("bye");msg = br.readLine()){

//同步锁避免线程竞争

synchronized (sendBuffer) {

sendBuffer.clear();

//编码

sendBuffer.put(charset.encode(msg));

//分割副

sendBuffer.put(charset.encode("->"));

//目标名

sendBuffer.put(charset.encode(targetName));

send(socketChannel);

}

}

}

/**

* 接收服务端的数据

* @param key

*/

private static void receive(SelectionKey key) throws IOException {

//获取服务端的channel

SocketChannel channel = (SocketChannel) key.channel();

//为写入缓冲器做准备position=0,limit=capacity

receiveBuffer.clear();

//从服务端的channel把数据读入缓冲器

channel.read(receiveBuffer);

//position=0,limit=有效下标最后一位

receiveBuffer.flip();

//解码

String msg = charset.decode(receiveBuffer).toString();

//输出到控制台

System.out.println(msg);

}

/**

* 发送到服务端

*/

private static void send(SocketChannel sendChannel) throws IOException {

if(sendBuffer.remaining()!=0){

synchronized (sendBuffer){

sendBuffer.flip();

sendChannel.write(sendBuffer);

}

}

}

}

 

以上是 java socket 模拟im 即时通讯 的全部内容, 来源链接: utcz.com/z/389692.html

回到顶部