Disruptor(5)使用场景

编程

handleEventsWith & handleEventsWithWorkerPool

在disruptor框架调用start方法之前,需要将消息的消费者指定给disruptor框架。

  1. disruptor.handleEventsWith(EventHandler... handlers),将多个EventHandler的实现类传入方法,封装成一个EventHandlerGroup
  2. disruptor.handleEventsWithWorkerPool(WorkHandler... handlers),将多个WorkHandler的实现类传入方法,封装成一个EventHandlerGroup

不同点

  1. handleEventsWith方法的EventHandlerGroup中的每个消费者都会对同一条消息m进行消费,各个消费者之间不存在竞争。
  2. handleEventsWithWorkerPool方法返回的EventHandlerGroup,Group的消费者对于同一条消息m不重复消费;如果c0消费了消息m,则c1不再消费消息m。

对于独立消费的消费者,应当实现EventHandler接口。对于不重复消费的消费者,应当实现WorkHandler接口。
从代码层面而言, 有不同的具体实现来支持不同的模式

  1. ConsumerInfo
  2. EventProcessor

消费场景

此处的测试代码的对接口WorkHandler 进行了改造。

package com.lmax.disruptor.noob;

import java.time.Instant;

import java.time.format.DateTimeFormatter;

public class CompareTest {

public static int THREAD = 2; // 线程数量

public static int PER = 1; // 单个线程生产数量

public static int TOTAL_COUNT = THREAD * PER; // 数据总量

public static int SIZE = 4; // 最大容量

public static void main(String[] args) {

println("线程数:" + THREAD + " 单线程生产量: " + PER + " 容量:" + SIZE + " 数据总量:" + TOTAL_COUNT);

DisruptorTest.execute();

}

public static void println(String msg) {

System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + "[" + Thread.currentThread().getName() + "] " + msg);

}

}

---------

import java.util.concurrent.ThreadFactory;

import com.lmax.disruptor.RingBuffer;

import com.lmax.disruptor.dsl.Disruptor;

public class DisruptorTest {

public static void execute() {

Disruptor<DataEvent> disruptor = new Disruptor<DataEvent>(new DataEventFactory(), CompareTest.SIZE,

new ThreadFactory() {

AtomicInteger count = new AtomicInteger(0);

@Override

public Thread newThread(Runnable eventProcessor) {

CompareTest.println("EventProcessor wrapper");// 对事件处理总线的封装

Thread thread = new Thread(eventProcessor);

thread.setName("EventProcessor" + count.incrementAndGet());

return thread;

}

});

/**

* 创建EventProcessors<Runnable>.

* 子过程Disruptor.checkNotStarted()事件处理handler必须在启动之前绑定.

*/

disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"),new DataEventHandler("dataEventHandler2"));

// disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler1"),new DataWorkHandler("dataWorkHandler2"));

disruptor.start();

CompareTest.println("disruptor start success!");

RingBuffer<DataEvent> ringBuffer = disruptor.getRingBuffer();

DataProducer producer = new DataProducer(ringBuffer);

DataEventProducerWithTranslator translator = new DataEventProducerWithTranslator(ringBuffer);

long start = System.currentTimeMillis();

for (int l = 0; l < CompareTest.THREAD; l++) {

new Thread(() -> {

for (int m = 0; m < CompareTest.PER; m++) {

producer.onData(start);

// translator.onData(start); 推荐用这种方式做。

}

}).start();

}

}

}

----------

import java.util.concurrent.atomic.AtomicLong;

import com.lmax.disruptor.EventHandler;

public class DataEventHandler implements EventHandler<DataEvent> {

public AtomicLong count = new AtomicLong(0);

public String name = null;

public DataEventHandler(String name) {

this.name = name;

}

@Override

public void onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception {

Thread.sleep(name.contentEquals("dataEventHandler1") ? 1000 : 100);

CompareTest.println("handlerName: " + name + " 处理的sequence:" + sequence

+ " count:" + count.incrementAndGet() + " Disruptor 总耗时:"

+ (System.currentTimeMillis() - event.getStartTime()));

}

}

----------

import java.util.concurrent.atomic.AtomicLong;

import com.lmax.disruptor.WorkHandler;

public class DataWorkHandler implements WorkHandler<DataEvent> {

public AtomicLong count = new AtomicLong(0);

public String name = null;

public DataWorkHandler(String name) {

this.name = name;

}

@Override

public void onEvent(DataEvent event, long sequence) throws Exception {

Thread.sleep(name.contentEquals("dataWorkHandler2") ? 100 :1000);

CompareTest.println("handlerName: " + name + " 处理的sequence:" + sequence + " count:" + count.incrementAndGet()

+ " Disruptor 总耗时:" + (System.currentTimeMillis() - event.getStartTime()));

}

}

  1. handleEventsWith 同一消息被不同handler独立消费。 此时handler处理是无序的。
    disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"), new DataEventHandler("dataEventHandler2"));
  2. 依赖串行.  对于同一消息前handler处理完结,后handler才处理。
    disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1")).then(new DataEventHandler("dataEventHandler2")).then(new DataEventHandler("dataEventHandler3"));
  3. handleEventsWithWorkerPool 不重复消费。 
    disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler1"), new DataWorkHandler("dataWorkHandler2"));
  4. 组合方式

    disruptor.handleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler3"),new DataWorkHandler("dataWorkHandler4")).then(new DataEventHandler("dataEventHandler1"),

    new DataEventHandler("dataEventHandler2"));

     

    disruptor.handleEventsWith(new DataEventHandler("dataEventHandler1"),

    new DataEventHandler("dataEventHandler2")).thenHandleEventsWithWorkerPool(new DataWorkHandler("dataWorkHandler3"),new DataWorkHandler("dataWorkHandler4"));

 

以上是 Disruptor(5)使用场景 的全部内容, 来源链接: utcz.com/z/511286.html

回到顶部