亲宝软件园·资讯

展开

JUC 中提供的限流利器-Semaphore(信号量)

平头哥的技术博文 人气:0
在 JUC 包下,有一个 Semaphore 类,翻译成信号量,Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。Semaphore 跟锁(synchronized、Lock)有点相似,不同的地方是,锁同一时刻只允许一个线程访问某一资源,而 Semaphore 则可以控制同一时刻多个线程访问某一资源。 Semaphore(信号量)并不是 Java 语言特有的,几乎所有的并发语言都有。所以也就存在一个**信号量模型**的概念,如下图所示: ![信号量模型](https://user-gold-cdn.xitu.io/2020/3/31/1712e2f202ff468e?w=455&h=341&f=png&s=11610) 信号量模型比较简单,可以概括为:**一个计数器、一个队列、三个方法**。 计数器:记录当前还可以运行多少个资源访问资源。 队列:待访问资源的线程 **三个方法**: - **init()**:初始化计数器的值,可就是允许多少线程同时访问资源。 - **up()**:计数器加1,有线程归还资源时,如果计数器的值大于或者等于 0 时,从等待队列中唤醒一个线程 - **down()**:计数器减 1,有线程占用资源时,如果此时计数器的值小于 0 ,线程将被阻塞。 这三个方法都是原子性的,由实现方保证原子性。例如在 Java 语言中,JUC 包下的 Semaphore 实现了信号量模型,所以 Semaphore 保证了这三个方法的原子性。 Semaphore 是基于 AbstractQueuedSynchronizer 接口实现信号量模型的。AbstractQueuedSynchronizer 提供了一个基于 FIFO 队列,可以用于构建锁或者其他相关同步装置的基础框架,利用了一个 int 来表示状态,通过类似 acquire 和 release 的方式来操纵状态。关于 AbstractQueuedSynchronizer 更多的介绍,可以点击链接: > http://ifeve.com/introduce-abstractqueuedsynchronizer/ AbstractQueuedSynchronizer 在 Semaphore 类中的实现类如下: ```java abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } ``` 在 Semaphore 类中,实现了两种信号量:**公平的信号量和非公平的信号量**,公平的信号量就是大家排好队,先到先进,非公平的信号量就是不一定先到先进,允许插队。非公平的信号量效率会高一些,所以默认使用的是非公平信号量。具体的可以查看 Semaphore 类实现源码。 Semaphore 类中,主要有以下方法: ```java // 构造方法,参数表示许可证数量,用来创建信号量 public Semaphore(int permits); // 从信号量中获取许可,相当于获取到执行权 public void acquire() throws InterruptedException; // 尝试获取1个许可,不管是否能够获取成功,都立即返回,true表示获取成功,false表示获取失败 public boolean tryAcquire(); // 将许可还给信号量 public void release(); ``` Semaphore 类的实现就了解的差不多了。可能你会有疑问 Semaphore 的应用场景是什么?Semaphore 可以用来限流(流量控制),在一些公共资源有限的场景下,Semaphore 可以派上用场。比如在做日志清洗时,可能有几十个线程在并发清洗,但是将清洗的数据存入到数据库时,可能只给数据库分配了 10 个连接池,这样两边的线程数就不对等了,我们必须保证同时只能有 10 个线程获取数据库链接,否则就会存在大量线程无法链接上数据库。 用 Semaphore 信号量来模拟这操作,代码如下: ```java public class SemaphoreDemo { /** * semaphore 信号量,可以限流 * * 模拟并发数据库操作,同时有三十个请求,但是系统每秒只能处理 5 个 */ private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors .newFixedThreadPool(THREAD_COUNT); // 初始化信号量,个数为 5 private static Semaphore s = new Semaphore(5); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { // 获取许可 s.acquire(); System.out.println(Thread.currentThread().getName()+" 完成数据库操作 ,"+ new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format( new Date())); // 休眠两秒钟,效果更直观 Thread.sleep(2000); // 释放许可 s.release(); } catch (InterruptedException e) { } } }); } // 关闭连接池 threadPool.shutdown(); } } ``` 运行效果如下: ![图片描述](https://user-gold-cdn.xitu.io/2020/3/31/1712e2f203261093?w=674&h=468&f=png&s=94596) 从结果中,可以看出,每秒只有 5 个线程在执行,这符合我们的预期。 好了,关于 Semaphore 的内容就结束了,更加详细的还请您查阅相关资料和阅读 Semaphore 源码。希望这篇文章对您的学习或者工作有所帮助。 感谢您的阅读,祝好。 ### 最后 > 目前互联网上很多大佬都有 Semaphore(信号量) 相关文章,如有雷同,请多多包涵了。原创不易,码字不易,还希望大家多多支持。若文中有所错误之处,还望提出,谢谢。 ![互联网平头哥](https://user-gold-cdn.xitu.io/2020/2/1/16fffec0e10797f6?w=900&h=500&f=png&s=172745)

加载全部内容

相关教程
猜你喜欢
用户评论