ZeroMQ简介以及使用(二)

编程

接着上次博客写的内容 ZeroMQ简介以及使用(一)

Pipeline pattern

PipeLine模式用于任务分配,通常在多级流水线中,其中一个或几个节点将工作推给许多工作人员,然后依次将结果推向一个或几个收集器。该模式在大多数情况下是可靠的,因为除非节点意外断开连接,否则它不会丢弃消息。它具有可伸缩性,因为节点可以随时加入。

ZeroMQ通过以下两种类型支持PipeLine

  • PUSH 类型
  • PULL 类型

PUSH socket

PUSH 以一组匿名的PULL同行,使用循环算法发送消息。该接收操作没有此Socket类型实现。

PUSHSocket由于已达到所有下游节点的高水位线而进入静音状态时,或者如果根本没有下游节点,则Socket上的任何发送操作都将阻塞,直到静音状态结束或至少一个下游节点为止可以发送;消息不会被丢弃。

特征摘要:

 

 

兼容的对等套接字

PULL

方向

单向

发送/接收模式

仅发送

入网路由策略

不适用

外发路由策略

轮循

静音状态下的动作

PULL socket

PULL类型与一组匿名PUSH对等方对话,并使用公平队列算法接收消息。

发送操作没有此套接字类型实现。

特征摘要:

 

 

兼容的对等套接字

PUSH

方向

单向

发送/接收模式

仅接收

入网路由策略

公平排队

外发路由策略

不适用

静音状态下的动作

代码实例

PUSH:

package ndm2.ndm2;

import java.io.IOException;

import java.util.Random;

import org.zeromq.SocketType;

import org.zeromq.ZContext;

import org.zeromq.ZMQ;

import org.zeromq.ZMQ.Socket;

public class ZMQDemoPush {

public static void main(String[] args) throws InterruptedException, IOException {

ZContext context = new ZContext();

ZMQ.Socket sender = context.createSocket(SocketType.PUSH);

sender.bind("tcp://localhost:5557");

for (int i = 0; i < 100; ++i) {

sender.send("this is a test", 0);

Thread.sleep(1000); // Give 0MQ time to deliver

}

}

}

pull:

package ndm2.ndm2;

import java.io.IOException;

import java.util.Random;

import org.zeromq.SocketType;

import org.zeromq.ZContext;

import org.zeromq.ZMQ;

import org.zeromq.ZMQ.Socket;

public class ZMQDemoPull {

public static void main(String[] args) throws InterruptedException, IOException {

ZContext context = new ZContext();

ZMQ.Socket receiver = context.createSocket(SocketType.PULL);

receiver.connect("tcp://localhost:5557");

while (!Thread.currentThread().isInterrupted()) {

String string = new String(receiver.recv(0), ZMQ.CHARSET).trim();

System.out.println(string + ".");

}

}

}

测试结果

 

Exclusive pair 模式

PAIR

PAIR只能一次连接到单个对等方。对通过PAIR Socket发送的消息不执行消息路由或筛选。

PAIR由于已达到所连接对等方的高水位线而进入静音状态时,或者如果未连接任何对等方,则Socket上的任何 发送操作都将阻塞,直到对等方可用于发送;消息不会被丢弃。

尽管PAIR 可以用于除inproc之外的其他传输,但是它们无法自动重新连接以及新的传入连接将被终止的事实,而任何先前的连接(包括处于关闭状态的连接)都会在大多数情况下不适合TCP。

代码示例

  private static class Step1 extends Thread {

private ZContext context;

private Step1(ZContext context) {

this.context = context;

}

@Override

public void run() {

Socket xmitter = context.createSocket(SocketType.PAIR);

xmitter.connect("inproc://step2");

System.out.println("Step 1 就绪, 传递 step 2");

xmitter.send("我是1号位", 0);

xmitter.close();

}

}

private static class Step2 extends Thread{

private ZContext context;

private Step2(ZContext context){

this.context = context;

}

@Override

public void run() {

Socket receiver = context.createSocket(SocketType.PAIR);

receiver.bind("inproc://step2");

String str = new String(receiver.recv(0));

System.out.println(str);

receiver.close();

Socket xmitter = context.createSocket(SocketType.PAIR);

xmitter.connect("inproc://step3");

System.out.println("Step 2 就绪, 传递 step 3");

xmitter.send("我是2号位", 0);

xmitter.close();

}

}

private static class Step3 extends Thread {

private ZContext context;

private Step3(ZContext context) {

this.context = context;

}

@Override

public void run() {

Socket receiver = context.createSocket(SocketType.PAIR);

receiver.bind("inproc://step3");

String str = new String(receiver.recv(0));

System.out.println(str);

receiver.close();

System.out.println("Step 3 就绪");

}

}

public static void main(String[] args) throws InterruptedException{

ZContext context = new ZContext();

Thread step1 = new Step1(context);

step1.start();

Thread step2 = new Step2(context);

step2.start();

Thread step3 = new Step3(context);

step3.start();

step1.join();

step2.join();

step3.join();

System.out.println("成功!");

}

 

后面几个模式由于都没有实现所以就没有实例 大致内容如下:

Client-server 模式

注意:此模式仍处于草稿状态,因此使用的zeromq库可能不支持该模式!

客户端-服务器模式用于允许单个SERVER服务器与一个或多个CLIENT客户端对话。客户端始终启动对话,此后,任何一个对等方都可以异步向另一方发送消息。

Client

CLIENT类型与一个或多个SERVER对等方对话。如果连接到多个对等方,它将以循环方式在这些对等方之间分散发送的消息。阅读时,它会依次从每个同位体中公平读取。它是可靠的,因为在正常情况下它不会丢弃消息。

如果CLIENT套接字已建立连接,则发送操作将接受消息,将消息排队,并在网络允许的速度下尽快发送消息。输出缓冲区限制由套接字的高水位线定义。如果传出缓冲区已满,或者没有连接的对等方,则默认情况下,发送操作将被阻止。CLIENT套接字不会丢弃消息。

特征摘要:

 

 

兼容的对等套接字

服务器

方向

双向的

发送/接收模式

无限制

外发路由策略

轮循

入网路由策略

公平排队

静音状态下的动作

SERVER

SERVER 类型与零个或多个CLIENT对等方对话。每个传出消息都发送到特定的对等客户端。SERVER套接字只能回复传入的消息:CLIENT对等方必须始终启动对话。

每个收到的消息都有一个routing_id,它是一个32位无符号整数。要将消息发送到给定的CLIENT对等方,应用程序必须在消息上设置对等方的 routing_id

如果未指定routing_id,或者未引用已连接的客户端对等方,则发送调用将失败。如果客户端对等方的传出缓冲区已满,则发送调用将阻塞。在任何情况下,SERVER套接字都不会丢弃消息。

特征摘要:

 

 

兼容的对等套接字

客户

方向

双向的

发送/接收模式

无限制

外发路由策略

看到文字

入网路由策略

公平排队

静音状态下的动作

失败

 

Radio-dish 模式

注意:此模式仍处于草稿状态,因此您使用的zeromq库可能不支持该模式!

 

Radio模式用于以扇出方式将数据从单个发布者分发到多个订阅者。

Radio-dish正在使用组(相对于Pub-sub主题),Dish套接字可以加入一个组,Radio套接字发送的每个消息都属于一个组。

组是限制为16个字符长度(包括null)的以null终止的字符串。目的是将长度增加到40个字符(包括null)。组的编码应为UTF8。

使用精确匹配(vs PubSub的前缀匹配)来匹配组

Radio

发布者使用RADIO 分发数据。每个消息都属于一个组。邮件将分发给组中的所有成员。该接收 操作没有此套接字类型实现。

当RADIO 由于已达到订户的最高水位而进入静音状态时,将发送给有问题的订户的任何消息将被丢弃,直到静音状态结束。该 发送操作不会阻塞此套接字类型。

特征摘要:

 

 

兼容的对等套接字

方向

单向

发送/接收模式

仅发送

入网路由策略

不适用

外发路由策略

扇出

静音状态下的动作

下降

DISH 

订户使用DISH 来订阅RADIO分发的组。最初,DISH套接字未订阅任何组。该发送 操作不此套接字类型实现。

特征摘要:

 

 

兼容的对等套接字

无线电

方向

单向

发送/接收模式

仅接收

入网路由策略

公平排队

外发路由策略

不适用

 

以上是 ZeroMQ简介以及使用(二) 的全部内容, 来源链接: utcz.com/z/514768.html

回到顶部