disruptor并发框架的使用

编程

1、导包

<dependency>

<groupId>org.apache.camel</groupId>

<artifactId>camel-disruptor-starter</artifactId>

<version>2.25.0</version>

</dependency>

2、一个数据封装类

/**

* 数据封装类

* Created by liunanhua on 2018/7/19.

*/

public class LongEvent {

private long value;

public long getValue() {

return value;

}

public void setValue(long value) {

this.value = value;

}

}

数据封装类工厂

/**

* LongEven工厂

* Created by liunanhua on 2018/7/19.

*/

public class LongEventFactory implements EventFactory<LongEvent> {

@Override

public LongEvent newInstance() {

return new LongEvent();

}

}

3、一个生产者

import com.lmax.disruptor.EventTranslatorOneArg;

import java.text.MessageFormat;

/**

* 生产者

* 事件转换

* Created by liunanhua on 2018/7/19.

*/

public class LongEventTranslator implements EventTranslatorOneArg<LongEvent, Long> {

@Override

public void translateTo(LongEvent event, long sequence, Long data) {

System.out.println(MessageFormat.format("生产者生产数据data = {0}, seq = {1}", data, sequence));

event.setValue(data);

}

}

4、一个或多个消费者  

import com.lmax.disruptor.EventHandler;

import com.lmax.disruptor.WorkHandler;

/**

* 消费者

* EventHandler和WorkHandler的区别是前者不需要池化,后者需要池化。

* Created by liunanhua on 2018/7/19.

*/

public class LongEventConsumer implements EventHandler<LongEvent>, WorkHandler<LongEvent> {

@Override

public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) {

this.onEvent(longEvent);

}

@Override

public void onEvent(LongEvent longEvent) {

long threadId = Thread.currentThread().getId(); // 获取当前线程id

System.out.println("消费者[" + threadId +"]消费数据data = " + longEvent.getValue());

}

}

5、使用方法

import com.lmax.disruptor.RingBuffer;

import com.lmax.disruptor.YieldingWaitStrategy;

import com.lmax.disruptor.dsl.Disruptor;

import com.lmax.disruptor.dsl.ProducerType;

import java.nio.ByteBuffer;

import java.util.concurrent.ThreadFactory;

/**

* 测试类

* Created by liunanhua on 2018/7/19.

*/

public class DisruptorTest {

public static void main(String[] args) throws Exception {

//RingBuffer大小,必须是2的N次方

int ringBufferSize = 1024 * 8;

/**

*BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现

* WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();

*SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景

* WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();

*YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性

* WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();

*/

//创建disruptor

Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new LongEventFactory(),

ringBufferSize,

new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

return new Thread(r);

}

},

ProducerType.SINGLE,

new YieldingWaitStrategy());

//处理业务,连接消费事件方法,可多个消费者

//disruptor.handleEventsWith(new LongEventConsumer());

LongEventConsumer[] consumers = new LongEventConsumer[10];

for (int i = 0; i < consumers.length; i++) {

consumers[i] = new LongEventConsumer();

}

//使用handleEventsWithWorkerPool就可以完成不重复消费,使用handleEventsWith会重复消费。

disruptor.handleEventsWithWorkerPool(consumers);

//启动

RingBuffer<LongEvent> ringBuffer = disruptor.start();

ByteBuffer byteBuffer = ByteBuffer.allocate(8);

for (long l = 0; l < 10; l++) {

//生产数据

long data = (long)(Math.random() * 1000d);

byteBuffer.putLong(0, data);

new LongEventProducer(ringBuffer).produceData(byteBuffer);//生产数据

}

disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;

}

}

其他

事件转换器中生产数据

import com.lmax.disruptor.EventTranslatorOneArg;

import java.text.MessageFormat;

/**

* 生产者

* 事件转换

* Created by liunanhua on 2018/7/19.

*/

public class LongEventTranslator implements EventTranslatorOneArg<LongEvent, Long> {

@Override

public void translateTo(LongEvent event, long sequence, Long data) {

System.out.println(MessageFormat.format("生产者生产数据data = {0}, seq = {1}", data, sequence));

event.setValue(data);

}

}

测试

import com.lmax.disruptor.RingBuffer;

import com.lmax.disruptor.YieldingWaitStrategy;

import com.lmax.disruptor.dsl.Disruptor;

import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ThreadFactory;

/**

* 测试类

* Created by liunanhua on 2018/7/19.

*/

public class DisruptorTest {

public static void main(String[] args) {

//RingBuffer大小,必须是2的N次方

int ringBufferSize = 1024 * 8;

//创建disruptor

Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new LongEventFactory(),

ringBufferSize,

new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

return new Thread(r);

}

},

ProducerType.SINGLE,

new YieldingWaitStrategy());

//处理业务,连接消费事件方法,可多个消费者

//使用handleEventsWithWorkerPool就可以完成不重复消费,使用handleEventsWith会重复消费。

disruptor.handleEventsWithWorkerPool(consumers);

//启动

RingBuffer<LongEvent> ringBuffer = disruptor.start();

for (long l = 0; l < 10; l++) {

//生产数据

long data = (long)(Math.random() * 1000d);

ringBuffer.publishEvent(new LongEventTranslator(), data);

}

disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;

}

}

 

以上是 disruptor并发框架的使用 的全部内容, 来源链接: utcz.com/z/513151.html

回到顶部