亲宝软件园·资讯

展开

Java Disruptor入门 Java多线程之Disruptor入门

EileenChang 人气:0
想了解Java多线程之Disruptor入门的相关内容吗,EileenChang在本文为您仔细讲解Java Disruptor入门的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:Java,Disruptor入门,Java,Disruptor,下面大家一起来学习吧。

一、Disruptor简介

Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

二、浅聊Disruptor的核心

在这里插入图片描述
  

Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首位相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。

三、Disruptor使用

3.1 pom.xml

<dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.3</version>
        </dependency>

3.2 事件Event

Disruptor是基于事件的生产者消费者模型。其RingBuffer中存放的其实是将消息封装成的事件。这里定义了一个LongEvent,表示消息队列中存放的是long类型的数据。

public class LongEvent {
	private long value;

	public void set(long value) {
		this.value = value;
	}

    @Override
    public String toString() {
        return "LongEvent{" +
                "value=" + value +
                '}';
    }
}

3.3 EventFactory

实现EventFactory接口,定义Event工厂,用于填充队列。Event工厂其实是为了提高Disruptor的效率,初始化的时候,会调用Event工厂,对RingBuffer进行内存的提前分配,GC的频率会降低。

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {
	public LongEvent newInstance() {
		return new LongEvent();
	}
}

3.4 EventHandler

实现EventHandler接口,定义EventHandler(消费者),处理容器中的元素。

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {
	public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
		System.out.println("Event: " + event + ", sequence: " + sequence);
	}
}

3.5 使用Disruptor原始API发布消息

import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

/**
 * 定义一个生产者,往Disruptor中投递消息
 */
public class LongEventProducer {

    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 定位到下一个可存放的位置
        long sequence = ringBuffer.next();
        try {
            // 拿到该位置的event
            LongEvent event = ringBuffer.get(sequence);
            // 设置event的值
            event.set(byteBuffer.getLong(0));
        } finally {
            // 发布
            ringBuffer.publish(sequence);
        }
    }
}

import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
public class TestMain {
    public static void main(String[] args) throws InterruptedException {
        // 定义event工厂
        LongEventFactory factory = new LongEventFactory();
        // ringBuffer长度
        int bufferSize = 1024;
        // 构造一个Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());
        // 绑定handler
        disruptor.handleEventsWith(new LongEventHandler());

        // 启动Disruptor
        disruptor.start();
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (long i = 0; true; i++) {
            byteBuffer.clear();
            byteBuffer.putLong(i);
            // 投递消息
            producer.onData(byteBuffer);
            Thread.sleep(1000);
        }
    }
}

3.6 使用Translators发布消息

import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

public class LongEventProducerUsingTranslator {
    private RingBuffer<LongEvent> ringBuffer;
    public LongEventProducerUsingTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
        @Override
        public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) {
            longEvent.set(byteBuffer.getLong(0));
        }
    };

    public void onData(ByteBuffer byteBuffer) {
        ringBuffer.publishEvent(TRANSLATOR, byteBuffer);
    }
}

import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

/**
 * @author ZhangSheng
 * @date 2021-4-26 14:23
 */
public class TestMain {

    public static void main(String[] args) throws InterruptedException {
        LongEventFactory factory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
        disruptor.handleEventsWith(new LongEventHandler());

        disruptor.start();
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer);
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);

        for (long i = 0L; true; i++) {
            byteBuffer.putLong(0, i);
            // 发布
            producer.onData(byteBuffer);
            Thread.sleep(1000);
        }
    }
}

加载全部内容

相关教程
猜你喜欢
用户评论