【Java】dubbo实现动态负载均衡

引言

本文续写上一篇博客 dubbo框架,这里主要简单演示dubbo如何实现动态负载均衡的。

模块构建

新建子模块 springboot_dubbo_load
【Java】dubbo实现动态负载均衡

该模块 pom文件中引入ZK客户端依赖:

    <dependencies>

<!--ZK客户端工具-->

<dependency>

<groupId>com.101tec</groupId>

<artifactId>zkclient</artifactId>

<!-- <exclusions>-->

<!-- <exclusion>-->

<!-- <groupId>org.slf4j</groupId>-->

<!-- <artifactId>slf4j-log4j12</artifactId>-->

<!-- </exclusion>-->

<!-- </exclusions>-->

<version>0.10</version>

</dependency>

</dependencies>

ZkServerSocket Socket服务端:

package com.baba.wlb.server;

import org.I0Itec.zkclient.ZkClient;

import java.io.IOException;

import java.net.ServerSocket;

import java.net.Socket;

/**

* @Author wulongbo

* @Date 2021/1/5 9:56

* @Version 1.0

*/

// ServerSocket服务端

public class ZkServerSocket implements Runnable {

private static int port = 18081;

private String parentService = "/service";

/**

* 服务器端:<br>

* 1.服务器启动的时候,会将当前服务器信息注册到注册中心。首先先创建一个父节点为service,在父节点下面再创建一个子节点,

* 每个子节点都存放当前服务接口地址。

* ## 子节点结构

* /service 持久节点

* ##/8080 value 39.102.56.91:18080 临时节点

* ##/8081 value 39.102.56.91:18081 临时节点

*

* @param args

*/

private ZkClient zkClient = new ZkClient("39.102.56.91:2181");

public static void main(String[] args) {

ZkServerSocket server = new ZkServerSocket(port);

Thread thread = new Thread(server);

thread.start();

}

public ZkServerSocket(int port) {

this.port = port;

}

public void regServer() {

// 1.先创建父节点server 为持久节点

if (!zkClient.exists(parentService)) {

// // 删除旧节点

// zkClient.delete(parentService);

// 2.创建父节点

// 持久节点

zkClient.createPersistent(parentService);

}

String serverKey = parentService + "/server_" + port;

if (!zkClient.exists(serverKey)) {

// // 删除旧节点

// zkClient.delete(serverKey);

// 3.创建子节点value为服务接口地址

// 临时节点

zkClient.createEphemeral(serverKey, "127.0.0.1:" + port);

}

}

@Override

public void run() {

ServerSocket serverSocket = null;

try {

serverSocket = new ServerSocket(port);

System.out.println("Server start port:" + port);

regServer();

Socket socket = null;

while (true) {

socket = serverSocket.accept();

new Thread(new ServerHandler(socket)).start();

}

} catch (IOException e) {

e.printStackTrace();

} finally {

try {

if (serverSocket != null) {

serverSocket.close();

}

} catch (Exception e2) {

}

}

}

}

ZkServerSocket2 Socket服务端 用于模拟集群:

package com.baba.wlb.server;

import org.I0Itec.zkclient.ZkClient;

import java.io.IOException;

import java.net.ServerSocket;

import java.net.Socket;

/**

* @Author wulongbo

* @Date 2021/1/5 9:56

* @Version 1.0

*/

// ServerSocket服务端

public class ZkServerSocket2 implements Runnable {

private static int port = 18080;

private String parentService = "/service";

/**

* 服务器端:<br>

* 1.服务器启动的时候,会将当前服务器信息注册到注册中心。首先先创建一个父节点为service,在父节点下面再创建一个子节点,

* 每个子节点都存放当前服务接口地址。

* ## 子节点结构

* /service 持久节点

* ##/8080 value 39.102.56.91:18080 临时节点

* ##/8081 value 39.102.56.91:18081 临时节点

*

* @param args

*/

private ZkClient zkClient = new ZkClient("39.102.56.91:2181");

public static void main(String[] args) {

ZkServerSocket2 server = new ZkServerSocket2(port);

Thread thread = new Thread(server);

thread.start();

}

public ZkServerSocket2(int port) {

this.port = port;

}

public void regServer() {

// 1.先创建父节点server 为持久节点

if (!zkClient.exists(parentService)) {

// // 删除旧节点

// zkClient.delete(parentService);

// 2.创建父节点

// 持久节点

zkClient.createPersistent(parentService);

}

String serverKey = parentService + "/server_" + port;

if (!zkClient.exists(serverKey)) {

// // 删除旧节点

// zkClient.delete(serverKey);

// 3.创建子节点value为服务接口地址

// 临时节点

zkClient.createEphemeral(serverKey, "127.0.0.1:" + port);

}

}

@Override

public void run() {

ServerSocket serverSocket = null;

try {

serverSocket = new ServerSocket(port);

System.out.println("Server start port:" + port);

regServer();

Socket socket = null;

while (true) {

socket = serverSocket.accept();

new Thread(new ServerHandler(socket)).start();

}

} catch (IOException e) {

e.printStackTrace();

} finally {

try {

if (serverSocket != null) {

serverSocket.close();

}

} catch (Exception e2) {

}

}

}

}

ZkServerClient Socket客户端 :

package com.baba.wlb.client;

import org.I0Itec.zkclient.IZkChildListener;

import org.I0Itec.zkclient.ZkClient;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.PrintWriter;

import java.net.Socket;

import java.util.ArrayList;

import java.util.List;

/**

* @Author wulongbo

* @Date 2021/1/5 10:24

* @Version 1.0

*/

public class ZkServerClient {

// 存放服务列表信息

public static List<String> listServer = new ArrayList<String>();

// 客户端:读取service节点,获取下面的子节点value值,本地实现远程调用。

private static String parentService = "/service";

private static ZkClient zkClient = new ZkClient("39.102.56.91:2181");

public static void main(String[] args) {

initServer();

ZkServerClient client = new ZkServerClient();

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));

while (true) {

String name;

try {

name = bufferedReader.readLine();

if ("exit".equals(name)) {

System.exit(0);

}

client.send(name);

} catch (IOException e) {

e.printStackTrace();

}

}

}

private void send(String name) {

String server = ZkServerClient.getServer();

String[] cfg = server.split(":");

Socket socket = null;

BufferedReader in = null;

PrintWriter out = null;

try {

socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));

in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

out = new PrintWriter(socket.getOutputStream(), true);

out.println(name);

while (true) {

String resp = in.readLine();

if (resp == null)

break;

else if (resp.length() > 0) {

System.out.println("Receive:" + resp);

break;

}

}

} catch (Exception e) {

e.printStackTrace();

} finally {

if (out != null) {

out.close();

}

if (in != null) {

try {

in.close();

} catch (IOException e1) {

e1.printStackTrace();

}

}

if (socket != null) {

try {

socket.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

// 注册所有server

private static void initServer() {

// listServer.clear();

// listServer.add("39.102.56.91:18080");

// 从zookeeper上获取服务列表信息

List<String> children = zkClient.getChildren(parentService);

getChildData(zkClient, children);

// 使用Zk时间通知获取最新服务列表信息

zkClient.subscribeChildChanges(parentService, new IZkChildListener() {

@Override

public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {

System.out.println("注册中心服务里列表信息发生变化...");

getChildData(zkClient, currentChildren);

}

});

}

public static void getChildData(ZkClient zkClient, List<String> children) {

listServer.clear();

children.stream().forEach(

node -> {

String serverAddress = zkClient.readData(parentService + "/" + node);

listServer.add(serverAddress);

}

);

System.out.println("服务接口地址:" + listServer.toString());

}

// 请求总数

private static int reqCount = 1;

// 获取当前server信息

public static String getServer() {

// return listServer.get(0);

int index = reqCount % listServer.size();

String address = listServer.get(index);

System.out.println("客户端请求服务器端:" + address);

reqCount++;

return address;

}

}

启动Socket服务端

分别启动 ZkServerSocketZkServerSocket2
我们可以看到zookeeper上注册了一个持久节点 server和两个临时节点 server_18080server_18081,他们对应的value值分别是:127.0.0.1:18080127.0.0.1:18081
【Java】dubbo实现动态负载均衡

启动Socket客户端

  • 可以发现 服务接口地址:[127.0.0.1:18080, 127.0.0.1:18081]

【Java】dubbo实现动态负载均衡

  • 我们在控制台输入内容: 这是第一次传输,通过取模算法,负载到18081这台服务上

【Java】dubbo实现动态负载均衡

  • 在端口为:18081 的Socket服务端可以收到消息

【Java】dubbo实现动态负载均衡

  • 我们在控制台输入内容: 这是第二次传输

【Java】dubbo实现动态负载均衡

  • 在端口为:18080 的Socket服务端可以收到消息

【Java】dubbo实现动态负载均衡

  • 现在强制停掉 18081 的服务,只剩下 18080 这台,需等待约15s 由于节点发生变化,通过事件通知的方式给到订阅方,订阅方通过事件监听的方式来动态获取服务端节点。

【Java】dubbo实现动态负载均衡

  • 我们在控制台输入内容: 这是第三次传输这是第四次传输,都只会转发给 18080

【Java】dubbo实现动态负载均衡

  • 再次启动18081,复活服务,又可实现动态负载。

【Java】dubbo实现动态负载均衡

总结

本文演示的是dubbo如何实现动态负载均衡的,只要我们对注册中心zookeeper 有一定的了解,便很容易明白!

以上是 【Java】dubbo实现动态负载均衡 的全部内容, 来源链接: utcz.com/a/94414.html

回到顶部