前言

本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见100个问题搞定Java并发

正文

信号量为多线程协作提供了更为强大的控制方法。

从广义上说,信号量是对锁的扩展。

无论是内部锁 synchronized 还是重入锁 ReentrantLock ,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源

信号量主要提供了以下构造函数

public Semaphore(int permits)

public Semaphore(int permits,boolean fair)  //第二个参数可以指定是否公平在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。

当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。

信号量的主要逻辑方法有:

public void acquire()throws InterruptedException

public void acquireUninterruptibly()

public boolean tryAcquire()

public boolean tryAcquire(long timeout,TimeUnit unit)
        throws InterruptedException

public void release()

方法尝试获得一个准入的许可。

若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。

acquireUninterruptibly() 方法和 acquire() 方法类似,但是不响应中断。

tryAcquire() 方法尝试获得一个许可,如果成功则返回 true ,失败则返回 false ,它不会进行等待,立即返回。

release() 方法用于在线程访问资源结東后释放一个许可,以使其他等待许可的线程可以进行资源访问。

源码(JDK8)

/**
 * 一个计数信号量。 
 *
 * 在概念上,信号量维持一组许可证。 
 *
 * 如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 
 *
 * 每个release()添加许可证,潜在地释放阻塞获取方。 
 *
 * 但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。
 *
 * 信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。
 *
 * 在获得项目之前,每个线程必须从信号量获取许可证,以确保某个项目可用。 
 *
 * 当线程完成该项目后,它将返回到池中,并将许可证返回到信号量,允许另一个线程获取该项目。
 *
 * 请注意,当调用acquire()时,不会保持同步锁定,因为这将阻止某个项目返回到池中。 
 *
 * 信号量封装了限制对池的访问所需的同步,与保持池本身一致性所需的任何同步分开。
 *
 * 信号量被初始化为一个,并且被使用,使得它只有至多一个允许可用,可以用作互斥锁。 
 *
 * 这通常被称为二进制信号量 ,因为它只有两个状态:一个许可证可用,或零个许可证可用。 
 *
 * 当以这种方式使用时,二进制信号量具有属性(与许多Lock实现不同),“锁”可以由除所有者之外的线程释放(因为信号量没有所有权概念)。 
 *
 * 这在某些专门的上下文中是有用的,例如死锁恢复。
 *
 * 此类的构造函数可选择接受公平参数。 
 *
 * 当设置为false时,此类不会保证线程获取许可的顺序。 
 *
 * 特别是, 闯入是允许的,也就是说,一个线程调用acquire()可以提前已经等待线程分配的许可证-在等待线程队列的头部逻辑新的线程将自己。 
 *
 * 当公平设置为真时,信号量保证调用acquire方法的线程被选择以按照它们调用这些方法的顺序获得许可(先进先出; FIFO)。 
 *
 * 请注意,FIFO排序必须适用于这些方法中的特定内部执行点。 
 *
 * 因此,一个线程可以在另一个线程之前调用acquire ,但是在另一个线程之后到达排序点,并且类似地从方法返回。 
 *
 * 另请注意, 未定义的tryAcquire方法不符合公平性设置,但将采取任何可用的许可证。
 *
 * 通常,用于控制资源访问的信号量应该被公平地初始化,以确保线程没有被访问资源。 
 *
 * 当使用信号量进行其他类型的同步控制时,非正常排序的吞吐量优势往往超过公平性。
 *
 * 这个类还提供了方便的方法, 一次acquire和release多个许可证。 
 *
 * 当没有公平地使用这些方法时,请注意增加无限期延期的风险。
 *
 * 内存一致性效应:在一个线程中调用“释放”方法的操作,例如release() happen-before 在另一个线程中成功执行“获取”方法 如 acquire() 。
 */
public class Semaphore implements java.io.Serializable

官方示例

package com.shockang.study.java.concurrent.aqs;

import java.util.concurrent.Semaphore;

/**
 * 这是一个使用信号量来控制对一个项目池的访问的类
 */
public class Pool {
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }

    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }


    protected Object[] items = {};
    protected boolean[] used = new boolean[MAX_AVAILABLE];

    protected synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null; // not reached
    }

    protected synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else
                    return false;
            }
        }
        return false;
    }
}

实践

简单入门示例

package com.shockang.study.java.concurrent.aqs;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo implements Runnable {
    final Semaphore semp = new Semaphore(5);

    @Override
    public void run() {
        try {
            semp.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semp.release();
        }
    }

    public static void main(String[] args) {
        ExecutorService exec = Executors.newFixedThreadPool(20);
        final SemaphoreDemo demo = new SemaphoreDemo();
        for (int i = 0; i < 20; i++) {
            exec.submit(demo);
        }
    }
}

说明

第8行代码声明了一个包含5个许可的信号量。

这就意味着同时可以有5个线程进入代码段第13~15行。

第13~15行为临界区管理代码,程序会限制执行这段代码的线程数。

申请信号量使用 acquire方法操作,在离开时,务必使用 release方法释放信号量(代码第 19 行)。

这就和释放锁是一个道理。

如果不幸发生了信号量的泄露(申请了但没有释放), 那么可以进入临界区的线程数量就会越来越少,直到所有的线程均不可访问。

在本例中, 同时开启20个线程。观察这段程序的输出,你就会发现系统以5个线程一组为单位,依次输出带有线程 ID 的提示文本。

上一篇 下一篇