Java BIO实现聊天程序

本文实例为大家分享了Java BIO实现聊天程序的具体代码,供大家参考,具体内容如下

我们使用一个聊天程序来说本文的主题

1、BIO 客户端服务器通讯

public class ChatServer {

public static void main(String[] args) throws IOException {

ServerSocket serverSocket = new ServerSocket(9000);

while (true) {

try {

System.out.println("聊天服务已启动,等待客户连接....");

Socket socket = serverSocket.accept();

System.out.printf("建立了与%s的连接!\n",socket.getRemoteSocketAddress());

loopReadRequest(socket);

} catch (IOException e) {

e.printStackTrace();

}

}

}

public static String loopReadRequest(Socket socket) throws IOException {

InputStreamReader reader = new InputStreamReader(socket.getInputStream());

StringBuilder sb = new StringBuilder();

char[] cbuf = new char[256];

// 循环读取socket的输入数据流

while (true) {

// read方法,读出内容写入 char 数组,read 方法会一直阻塞

// 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket)

// 正常读取时方法会返回读取的字符数,当输入流结束时(对方关闭了socket)方法返回 -1

int readed = reader.read(cbuf);

SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();

// 客户端执行了socket.close()

if (readed == -1) {

System.out.println(remoteSocketAddress + " 断开了连接!");

reader.close();

socket.close();

break;

}

String readedStr = new String(cbuf, 0, readed);

sb.append(readedStr);

// ready()用来判断流是否可被读取,如果reader缓冲区不是空则返回true,否则返回false

if (!reader.ready()) {//reader缓冲区为空,表示数据流已读完

// 数据流已读完,此时向客户端发送响应

socket.getOutputStream().write((remoteSocketAddress+"你好,"+sb+"已收到").getBytes());

System.out.println("收到内容:"+sb);

// 清除sb的内容,准备接收下一个请求内容

sb.setLength(0);

System.out.println("等待客户端消息....");

}

}

return sb.toString();

}

}

public class ChatClient {

public static void main(String[] args) {

try {

Socket socket = new Socket("localhost", 9000);

Scanner scanner = new Scanner(System.in);

while (true) {

System.out.print(">");

String line = scanner.nextLine();

if("".equals(line)){

continue;

}

if ("quit".equals(line)) {

scanner.close();

socket.close();

break;

}

socket.getOutputStream().write(line.getBytes());

System.out.println(readRequest(socket));

}

} catch (IOException e) {

e.printStackTrace();

}

}

public static String readRequest(Socket socket) throws IOException {

InputStreamReader reader = new InputStreamReader(socket.getInputStream());

StringBuilder sb = new StringBuilder();

char[] cbuf = new char[256];

while (true) {

int readed = reader.read(cbuf);

// 读出内容写入 char 数组,read 方法会一直阻塞

// 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket)

// 正常读取,方法会返回读取的字符数,而当输入流结束(对方关闭了socket)则返回 -1

if (readed == -1) {

System.out.println(socket.getRemoteSocketAddress() + " 断开了连接!");

reader.close();

socket.close();

break;

}

String readedStr = new String(cbuf, 0, readed);

sb.append(readedStr);

if(!reader.ready()){

break;

}

}

return sb.toString();

}

}

ChatServer与ChatClient建立了长连接,且ChatServer阻塞等待ChatClient发送消息过来,程序中 Server端只能与一个Client建立连接。程序这么写,只能实现一个客户端和服务端进行通信。

如何支持多个Client的连接呢? 使用独立的线程去读取socket

2、多线程实现单聊,群聊

单聊发送 格式:-c 对方端口号 消息内容, 群聊直接发送信息就可以了,具体发送逻辑看下面的程序

public class ChatServer {

private static Map<String, Socket> connnectedSockets = new ConcurrentHashMap<>();

public static void main(String[] args) throws IOException {

// 1、服务端初始化工作

ServerSocket serverSocket = new ServerSocket(9000);

ExecutorService executorService = getExecutorService();

// 2、主线程- 循环阻塞接收新的连接请求

while (true) {

Socket socket = serverSocket.accept();

cacheSocket(socket);

// 3、一个socket对应一个读取任务,交给线程池中的线程执行

// 如果使用fixed线程池,会操作读取任务分配不到线程的情况

// 现象就是发送的消息别人收不到(暂存在Socket缓存中)

executorService.submit(createLoopReadTask(socket));

}

}

private static Runnable createLoopReadTask(Socket socket) {

return new Runnable() {

public void run() {

try {

loopReadRequestAndRedirect(socket);

} catch (IOException e) {

e.printStackTrace();

}

}

};

}

private static ExecutorService getExecutorService() {

ExecutorService executorService = Executors.newCachedThreadPool();

int nThreads = Runtime.getRuntime().availableProcessors();

nThreads = 1;

// 如果只设置一个线程,那么最先连接进来的客户端可以发送消息

// 因为程序阻塞读取第一个socket连接的数据流,没有其他线程资源去读后面建立的socket了

executorService = Executors.newFixedThreadPool(nThreads);

return executorService;

}

private static void cacheSocket(Socket socket) {

SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();

String[] split = remoteSocketAddress.toString().split(":");

connnectedSockets.put(split[1], socket);

}

public static String loopReadRequestAndRedirect(Socket socket) throws IOException {

InputStreamReader reader = new InputStreamReader(socket.getInputStream());

StringBuilder sb = new StringBuilder();

char[] cbuf = new char[256];

while (true) {

SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();

System.out.println(Thread.currentThread() + "执行 " + remoteSocketAddress + "发送的消息");

// 读出内容写入 char 数组,read 方法会一直阻塞

// 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket)

// 正常读取时方法会返回读取的字符数,当输入流结束(对方关闭了socket)时返回 -1

int readed = reader.read(cbuf);

if (readed == -1) {

System.out.println(remoteSocketAddress + " 断开了连接!");

reader.close();

socket.close();

break;

}

String readedStr = new String(cbuf, 0, readed);

sb.append(readedStr);

//ready()用来判断流是否可被读取,如果reader缓冲区不是空则返回true,否则返回false

boolean oneReqeustStreamReaded = !reader.ready();

if (oneReqeustStreamReaded) {

String requestContent = sb.toString().trim();

String prifix = requestContent.substring(0, 2);

// 单聊

if ("-c".equals(prifix)) {

requestContent = requestContent.substring(3);

String port = requestContent.substring(0, requestContent.indexOf(" "));

requestContent = requestContent.replaceFirst(port, "");

sendToOneSocket(connnectedSockets.get(port), requestContent);

// 群聊

} else {

// 向客户端发送响应

socket.getOutputStream().write(("您发送的消息-'" + sb + "' 已收到").getBytes());

sendToAllSocket(sb.toString(), socket);

}

sb.setLength(0);

}

}

return sb.toString();

}

/**

* 发送消息给某个socket

*

* @param socket

* @param msg

*/

private static void sendToOneSocket(Socket socket, String msg) {

// 对于同一个socket,同一时刻只有一个线程使用它发送消息

synchronized (socket) {

try {

socket.getOutputStream().write(msg.getBytes("UTF-8"));

} catch (IOException e) {

e.printStackTrace();

}

}

}

/**

* 发送消息给所有的socket

*

* @param msg

*/

private static void sendToAllSocket(String msg, Socket selfSocket) {

for (String key : connnectedSockets.keySet()) {

Socket socket = connnectedSockets.get(key);

if (socket.equals(selfSocket)) {

continue;

}

sendToOneSocket(socket, msg);

}

}

}

public class ChatClient {

public static void main(String[] args) throws IOException {

new ChatClient().start();

}

public void start() throws IOException {

Socket socket = new Socket("localhost", 9000);

ExecutorService executorService = Executors.newFixedThreadPool(2);

Runnable readTask = new Runnable() {

public void run() {

try {

loopReadRequest(socket);

} catch (IOException e) {

e.printStackTrace();

}

}

};

executorService.submit(readTask);

Runnable sendMsgTask = new Runnable() {

public void run() {

try {

Scanner scanner = new Scanner(System.in);

while (true) {

System.out.print(">");

String line = scanner.nextLine();

if ("".equals(line)) {

continue;

}

if ("quit".equals(line)) {

scanner.close();

socket.close();

break;

}

socket.getOutputStream().write(line.getBytes());

}

} catch (IOException e) {

e.printStackTrace();

}

}

};

executorService.submit(sendMsgTask);

}

public void loopReadRequest(Socket socket) throws IOException {

InputStreamReader reader = new InputStreamReader(socket.getInputStream());

StringBuilder sb = new StringBuilder();

char[] cbuf = new char[256];

while (true) {

int readed = reader.read(cbuf);

// 读出内容写入 char 数组,read 方法会一直阻塞

// 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket)

// 正常读取,方法会返回读取的字符数,而当输入流结束(对方关闭了socket)则返回 -1

if (readed == -1) {

System.out.println(socket.getRemoteSocketAddress() + " 断开了连接!");

reader.close();

socket.close();

break;

}

String readedStr = new String(cbuf, 0, readed);

sb.append(readedStr);

if (!reader.ready()) {

System.out.println(sb);

sb.setLength(0);

}

}

}

}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

以上是 Java BIO实现聊天程序 的全部内容, 来源链接: utcz.com/p/250990.html

回到顶部