Disruptor介绍

编程

  • Disruptor是什么?

    Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。

    获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。

 

  • Disruptor能做什么?

    从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。

    可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。

    我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者(Consumer)能获得通知;如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。

    这些都是 Disruptor 能做到的,与之不同的是,Disruptor 能做更多:

    • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);
    • 预分配用于存储事件内容的内存空间;
    • 针对极高的性能目标而实现的极度优化和无锁的设计;

    以上的描述虽然简单地指出了 Disruptor 是什么,但对于它“能做什么”还不是那么直截了当。一般性地来说,当你需要在两个独立的处理过程(两个线程)之间交换数据时,就可以使用 Disruptor 。当然使用队列(如上面提到的 BlockingQueue)也可以,只不过 Disruptor 做得更好。

 

  • Disruptor怎么用?

  1. 定义事件
    事件(Event)就是通过 Disruptor 进行交换的数据类型。

    package com.xuele.disruptor.event;

    public class HelloEvent {

    private String eventValue;

    public String getEventValue() {

    return eventValue;

    }

    public void setEventValue(String eventValue) {

    this.eventValue = eventValue;

    }

    }

     

  2. 定义事件工厂

    事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory<T>。

    Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。

    一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。

    package com.xuele.disruptor.factory;

    import com.lmax.disruptor.EventFactory;

    import com.xuele.disruptor.event.HelloEvent;

    public class HelloEventFactory implements EventFactory<HelloEvent> {

    public HelloEvent newInstance() {

    return new HelloEvent();

    }

    }

     

  3. 定义事件处理的具体实现
    通过实现接口 com.lmax.disruptor.EventHandler<T> 定义事件处理的具体实现。

    package com.xuele.disruptor.handle;

    import com.lmax.disruptor.EventHandler;

    import com.xuele.disruptor.event.HelloEvent;

    public class HelloEventHandle implements EventHandler<HelloEvent> {

    public void onEvent(HelloEvent arg0, long arg1, boolean arg2) throws Exception {

    System.out.println(arg0.getEventValue());

    }

    }

     

  4. 定义用于事件处理的线程池
    Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。例如:

    ExecutorService executor = Executors.newCachedThreadPool();

     

  5. 指定等待策略

    Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。

    Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。

    例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,其中,

    BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;

    SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;

    YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

    WaitStrategy  blockingWaitStrategy = new BlockingWaitStrategy();

    WaitStrategy sleepingWaitStrategy = new SleepingWaitStrategy();

    WaitStrategy yieldingWaitStrategy = new YieldingWaitStrategy();

     

  6. 启动 Disruptor

    EventFactory<HelloEvent> helloEventFactory = new HelloEventFactory();

    int ringBufferSize = 1024*1024;

    Disruptor<HelloEvent> disruptor = new Disruptor<HelloEvent>(helloEventFactory ,

    ringBufferSize ,executor , ProducerType.SINGLE,blockingWaitStrategy );

    EventHandler<HelloEvent> eventHandle = new HelloEventHandle();

    disruptor.handleEventsWith(eventHandle);

    disruptor.start();

     

  7. 发布事件

    Disruptor 的事件发布过程是一个两阶段提交的过程:

      第一步:先从 RingBuffer 获取下一个可以写入的事件的序号;

      第二步:获取对应的事件对象,将数据写入事件对象;

      第三部:将事件提交到 RingBuffer;

    事件只有在提交之后才会通知 EventProcessor 进行处理;

    package com.xuele.disruptor.producer;

    import java.nio.ByteBuffer;

    import com.lmax.disruptor.RingBuffer;

    import com.xuele.disruptor.event.HelloEvent;

    public class HelloEventProducer {

    private final RingBuffer<HelloEvent> ringBuffer;

    public HelloEventProducer(RingBuffer<HelloEvent> ringBuffer){

    this.ringBuffer = ringBuffer;

    }

    /**

    * onData用来发布事件,每调用一次就发布一次事件

    * 它的参数会用过事件传递给消费者

    */

    public void onData(ByteBuffer bb){

    //1.可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽

    long sequence = ringBuffer.next();

    try {

    //2.用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)

    HelloEvent event = ringBuffer.get(sequence);

    //3.获取要通过事件传递的业务数据

    event.setEventValue(bb.getLong(0)+"");

    } finally {

    //4.发布事件

    //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;

    // 如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。

    ringBuffer.publish(sequence);

    }

    }

    }

     注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。

    Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。

    static class Translator implements EventTranslatorOneArg<LongEvent, Long>{

    @Override

    public void translateTo(LongEvent event, long sequence, Long data) {

    event.set(data);

    }

    }

    public static Translator TRANSLATOR = new Translator();

    public static void publishEvent2(Disruptor<LongEvent> disruptor) {

    // 发布事件;

    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

    long data = getEventData();//获取要通过事件传递的业务数据;

    ringBuffer.publishEvent(TRANSLATOR, data);

    }

    此外,Disruptor 要求 RingBuffer.publish 必须得到调用的潜台词就是,如果发生异常也一样要调用 publish ,那么,很显然这个时候需要调用者在事件处理的实现上来判断事件携带的数据是否是正确的或者完整的,这是实现者应该要注意的事情。

     

  8. 关闭 Disruptor

    disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;

    executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;

    完整的main函数代码

    package com.xuele.disruptor;

    import java.nio.ByteBuffer;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    import com.lmax.disruptor.BlockingWaitStrategy;

    import com.lmax.disruptor.EventFactory;

    import com.lmax.disruptor.EventHandler;

    import com.lmax.disruptor.RingBuffer;

    import com.lmax.disruptor.SleepingWaitStrategy;

    import com.lmax.disruptor.WaitStrategy;

    import com.lmax.disruptor.YieldingWaitStrategy;

    import com.lmax.disruptor.dsl.Disruptor;

    import com.lmax.disruptor.dsl.ProducerType;

    import com.xuele.disruptor.event.HelloEvent;

    import com.xuele.disruptor.factory.HelloEventFactory;

    import com.xuele.disruptor.handle.HelloEventHandle;

    import com.xuele.disruptor.producer.HelloEventProducer;

    /**

    * Hello world!

    *

    */

    public class DisruptorTest

    {

    public static void main( String[] args )

    {

    ExecutorService executor = Executors.newCachedThreadPool();

    WaitStrategy blockingWaitStrategy = new BlockingWaitStrategy();

    WaitStrategy sleepingWaitStrategy = new SleepingWaitStrategy();

    WaitStrategy yieldingWaitStrategy = new YieldingWaitStrategy();

    EventFactory<HelloEvent> helloEventFactory = new HelloEventFactory();

    int ringBufferSize = 1024*1024;

    Disruptor<HelloEvent> disruptor = new Disruptor<HelloEvent>(helloEventFactory ,

    ringBufferSize ,executor , ProducerType.SINGLE,blockingWaitStrategy );

    EventHandler<HelloEvent> eventHandle = new HelloEventHandle();

    disruptor.handleEventsWith(eventHandle);

    disruptor.start();

    RingBuffer<HelloEvent> ringBuffer = disruptor.getRingBuffer();

    HelloEventProducer producer = new HelloEventProducer(ringBuffer);

    ByteBuffer byteBuffer = ByteBuffer.allocate(8);

    for(long l = 0; l<100; l++){

    byteBuffer.putLong(0, l);

    producer.onData(byteBuffer);

    }

    disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;

    executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;

    }

    }

     

 

  • Disruptor原理剖析

    核心概念

    先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

    • Ring Buffer
      如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
    • Sequence  Disruptor
      通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
      (注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。
    • Sequencer 
      Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
    • Sequence Barrier
      用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
    • Wait Strategy
      定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
    • Event
      在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
    • EventProcessor
      EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
    • EventHandler
      Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
    • Produce

      即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

    • 类图

    • 执行时序图

    • 内存预分配

      RingBuffer使用数组Object[] entries作为存储元素,初始化RingBuffer时,会将所有的entries的每个元素指定为特定的Event,这时候event中的detail属性是null;后面生产者向RingBuffer中写入消息时,RingBuffer不是直接将enties[7]指向其他的event对象,而是先获取event对象,然后更改event对象的detail属性;消费者在消费时,也是从RingBuffer中读取出event,然后取出其detail属性。可以看出,生产/消费过程中,RingBuffer的entities[7]元素并未发生任何变化,未产生临时对象,entities及其元素对象一直存活,知道RingBuffer消亡。故而可以最小化GC的频率,提升性能。

      创建entries数组的源码

      private void fill(EventFactory<E> eventFactory)

      {

      for (int i = 0; i < bufferSize; i++)

      {

      entries[BUFFER_PAD + i] = eventFactory.newInstance(); //使用工厂方法初始化enties元素

      }

      }

       

    • 消除‘伪共享’

      如果两个不同的并发变量位于同一个缓存行,则在并发情况下,会互相影响到彼此的缓存有效性,进而影响到性能,这叫着‘伪共享’。为了避开‘伪共享’,Disruptor3.0在Sequence.java中使用多个long变量填充,从而确保一个序号独占一个缓存行。关于缓存行和‘伪共享’请参考:伪共享(False Sharing)。

      private static class Padding

      {

      /** Set to -1 as sequence starting point */

      public long nextValue = Sequence.INITIAL_VALUE, cachedValue = Sequence.INITIAL_VALUE, p2, p3, p4, p5, p6, p7;

      }



       

    • 序号栅栏和序号配合使用来消除锁和CAS

      Disruptor3.0中,序号栅栏(SequenceBarrier)和序号(Sequence)搭配使用,协调和管理消费者与生产者的工作节奏,避免了锁和CAS的使用。在Disruptor3.0中,各个消费者和生产者持有自己的序号,这些序号的变化必须满足如下基本条件:

      • 消费者序号数值必须小于生产者序号数值;
      • 消费者序号数值必须小于其前置(依赖关系)消费者的序号数值;
      • 生产者序号数值不能大于消费者中最小的序号数值,以避免生产者速度过快,将还未来得及消费的消息覆盖。

      上述前两点是在SequenceBarrier的waitFor()方法中完成的,源码如下:

      public long waitFor(final long sequence) //sequence参数是该消费者期望获取的下一个序号值

      throws AlertException, InterruptedException, TimeoutException

      {

      checkAlert();

           //根据配置的waitStrategy策略,等待期望的下一序号值变得可用

           //这里并不保证返回值availableSequence一定等于 given sequence,他们的大小关系取决于采用的WaitStrategy。

           //eg. 1、YieldingWaitStrategy在自旋100次尝试后,会直接返回dependentSequence的最小seq,这时并不保证返回值>=given sequence

           // 2、BlockingWaitStrategy则会阻塞等待given sequence可用为止,可用并不是说availableSequence == given sequence,而应当是指 >=

      long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

      //如果当前可用的序号小于期望获取的下一个序号,则返回availableSequence,这将导致调用者EventProcessor继续wait

      if (availableSequence < sequence)

      {

      return availableSequence;

      }

           //这一句是‘批处理’的精妙所在,放在后面讲

      return sequencer.getHighestPublishedSequence(sequence, availableSequence);

      }

      上面第三点是针对生产者建立的Barrier,逻辑判定发生在生产者从ringBuffer获取下一个可用的entry时,RingBuffer会将获取下一个可用的entry委托给Sequencer。

      @Override

      public long next(int n)

      {

      if (n < 1)

      {

      throw new IllegalArgumentException("n must be > 0");

      }

      long nextValue = this.nextValue;

      long nextSequence = nextValue + n;//生产者当前序号值+期望获取的序号数量后达到的序号值

      long wrapPoint = nextSequence - bufferSize;//减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’

      long cachedGatingSequence = this.cachedValue;

      //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段是,被赋值的当时的‘消费者中最小序号值’

      //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。

      //(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’

      //(cachedGatingSequence > nextValue) : 生产者和消费者均为顺序递增的,且生产者的seq“先于”消费者的seq,注意是‘先于’而不是‘大于’。当nextValue>Long.MAXVALUE时,nextValue+1就会变成负数,wrapPoint也会变成负数,这时候必然会是:cachedGatingSequence > nextValue

      //这个变化的过程会持续bufferSize个序号,这个区间,由于getMinimumSequence()得到的虽然是名义上的‘消费者中最小序号值’,但是不代表是走在‘最后面’的消费者

      if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)

      {

      cursor.setVolatile(nextValue); // StoreLoad fence

      long minSequence;

      //生产者停下来,等待消费者消费,知道‘覆盖’现象清除。

      while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))

      {

      LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?

      }

      this.cachedValue = minSequence;

      }

      this.nextValue = nextSequence;

      return nextSequence;

      }

       

    • 批处理效应

      当生产者节奏快于消费者,消费者可以通过‘批处理效应’快速追赶,即:消费者可以一次性从RingBuffer中获取多个已经准备好的enties,从而提高效率。代码实现如下:

      SequenceBarrier的waitFor()方法:

      public long waitFor(final long sequence)

      throws AlertException, InterruptedException, TimeoutException

      {

      checkAlert();

      long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

      if (availableSequence < sequence)

      {

      return availableSequence;

      }

      //获取消费者可以消费的最大的可用序号,支持批处理效应,提升处理效率。

      //当availableSequence > sequence时,需要遍历 sequence --> availableSequence,找到最前一个准备就绪,可以被消费的event对应的seq。

      //最小值为:sequence-1

      return sequencer.getHighestPublishedSequence(sequence, availableSequence);

      }

       

 

 

 

 

 

 

 

 

 

 

以上是 Disruptor介绍 的全部内容, 来源链接: utcz.com/z/517653.html

回到顶部