相互排斥

临界截面

int count = 0;
void increment(int n) {
        for(int i = 0; i < n; i++) {
                int x = count;
                x = x + 1;
                count = x;
        }
}

int main(int argc, char* argv[]) {
        pthread_t t1, t2;
        ....
        pthread_create(&t1, ..., increment)
        pthread_create(&t2, ..., increment)
}
  • 当多个线程执行增量方法时会发生什么?

  • 指令可以在哪些命令中执行?

  • Count的期望值是多少?

确定关键部分

  • 这是增量方法的关键部分

int x = count;
x = x + 1;
count = x;
  • 临界区是一次只能有一个线程进入的代码段,程序的结果肯定是正确的。

  • 在某些临界区中,可以同时执行多个线程,但不能保证正确性。

执行顺序和原子性

  • 关键部分的代码

int x = count;
x = x + 1;
count = x;
  • 临界区的伪装配

load &count, r0
set r0, r1
add 1, r1
set r1, r0
store &count, r0
  • 要记住的要点:
    • 编译器并不总是执行您所期望的操作

    • 在语句“count=x”之后,甚至直到几个程序语句之后,也不能100%保证该值已被存储以进行计数

    • 由于编译器的优化,Count的值在寄存器中的存储时间可能比您预期的要长得多。为了处理这些情况,‘Volatile’关键字将确保更早地插入商店

    • 不要指望一条程序语句被转换成一条原子机器指令。大多数人都不是。只有单机指令可以是原子的。

执行顺序和原子性

  • 对于我们的伪汇编,可能的执行顺序是什么?

  • 在两个不同的CPU上有两个线程

t0: load &count, r0
t0: set r0, r1
t1: load &count, r0
t0: add 1, r1
t1: set r0, r1
t0: set r1, r0
t0: store &count, r0
t1: add 1, r1
t1: set r1, r0
t1: store &count, r0
  • 在本例中,count的值不是2,而是1

  • 给定伪汇编程序和两个线程,程序的多少个排列等同于一个线程连续执行另一个线程的串行执行?

  • 为了实现程序的正确性,我们必须消除所有不具有与串行执行等同的结果的排列。

  • 解决方案在以下方面有所不同:
    • 是否需要重试执行

    • 所需的协调/合作数量

    • 程序并行性的基本假设(例如用户线程与内核线程)。

一种好的锁定解决方案的特点

  • 两个进程不能同时位于其临界区内

  • 不得对速度或CPU数量做出任何假设

  • 在其临界区之外运行的任何进程都不能阻止其他进程

  • 任何进程都不应该永远等待才能进入其临界区。

实现原子性/可序列化

  • 编写程序时有两种可能的模型需要考虑:悲观锁定和乐观锁定

  • 悲观锁定:
    • 获得临界区所需的所有资源的锁,执行临界区,然后释放所有锁

  • 乐观锁定:
    • 制作资源的快照或保存更改的日志。执行临界区,然后检查快照或日志以查看操作是否为原子操作。如果是,则提交任何更改,否则将放弃更改。

  • 对于大多数框架和语言,悲观锁定是更常用/更被接受的解决方案。随着内核数量和RAM大小的增加,乐观锁变得越来越流行。

悲观锁的类型

  • 禁用中断

  • 严格的替代

  • 自旋锁/互斥锁

  • 信号量

  • 条件变量/监视器

  • 读取器/写入器锁定

乐观锁的类型

  • 大多数非数据库实现都使用某种软件事务存储器库

何时考虑乐观锁定

  • 如果您的数据结构是稀疏的。
    • 例如:您正在读取和写入1 GB数组中的几个值。

  • 与非序列相比,您的代码的可能的序列等价排列的数量较高。

  • 与重试执行的成本相比,通信成本较高
    • 通过网络进行通信

    • 在CPU计数较高的NUMA系统上进行通信

    • 当您有非常高的CPU计数时

禁用中断

  • 这是使关键部分原子化的最简单解决方案。

  • 禁用中断可防止调度器被调用,因此在禁用和启用中断之间执行的任何代码都将是原子的。

  • 禁用中断是不可取的,因为:
    • 就资源利用率而言,它是最不理想的解决方案

    • 禁用中断是特权操作(只有操作系统可以执行此操作)

    • 如果在禁用中断的情况下程序崩溃,中断将不会恢复,调度程序将不会被调用,整个计算机将基本上停止。

    • 在大多数体系结构中,禁用中断只发生在一个CPU上。

  • 在操作系统内核中禁用中断对于实现互斥非常有用

严格的替代

  • 一个或多个进程轮流进入临界区。这是由外部调度器或使用‘TURN’变量控制的。

  • 一般来说,这是一个非常糟糕的解决方案:基本上,一个进程可以通过在其非关键代码部分变慢而有效地锁定另一个进程。

while(TRUE) {
        while(turn != 0) {}
        critical_section();
        turn = 1;
        non_critical_section();
}
while(TRUE) {
        while(turn != 1) {}
        critical_section();
        turn = 0;
        non_critical_section();
}

旋转锁

  • 旋转锁最多允许一个锁拥有者。如果代码的关键部分同意使用自旋锁,则在进入之前锁定自旋锁,在退出后解锁将使关键部分成为原子性的。

int lock = 0;
void increment(int n) {
        for(int i = 0; i < n; i++) {
                spin_lock(&lock);
                int x = count;
                x = x + 1;
                count = x;
                spin_unlock(&lock);
        }
}
  • 这一关键部分现在是原子的。

  • 现在可能的行刑顺序是什么?

旋转锁-实施

  • 自旋锁有两种状态:已锁定和未锁定。

  • 以下是旋转锁定和旋转解锁的伪算法

spin_lock(int* lock) {
        while(*lock == 1) { }
        *lock = 1;
}

spin_unlock(int* lock) {
        *lock = 0;
}
  • 当lock==1时,其他人持有它。一旦它切换到0,它就被解锁,我们将它设置为1,我们就获得了锁

  • While循环是自旋锁获得其“自旋”名称的地方

  • 要解锁,我们只需将LOCK设置回0

  • Spin_lock()中的临界区是什么?

  • 伪实现会奏效吗?为什么?或者为什么不呢?

旋转锁-实施

  • 伪实现将不起作用。它的关键部分是整个方法。

  • 例如,如果两个线程处于SPIN_LOCK中,则当lock变为0时,两个线程都可能会中断While循环,并将lock设置为1。然后,两个线程都将获得锁。

  • 因此,我们需要做的是将While循环压缩到单个原子操作中。

  • 此原子操作将必须测试lock的值,如果值为零,则将其设置为1

spin_lock(int* lock) {
        while(test_and_set(lock) == 1){}
}

旋转锁-实施

  • 那么,我们如何实现测试和设置呢?
    • 对于这一点,没有C/C++关键字。

    • 我们不能保证任何C/C++语句都会被简化为一条原子指令。

  • 为此,我们需要依赖专门的机器指令。在x86上,这是xchg指令。

  • SPIN_LOCK的x86程序集版本:

lock:
        dd    0
spin_lock:
        mov   eax, 1
loop:
        xchg  eax, [lock]
        test  eax, eax
        jnz   loop
        ret
  • 使用xchg指令,我们的测试和设置操作是原子的

旋转锁-实施

unsigned long test_and_set(unsigned long* lock) {
        unsigned long newval = 0;
        asm volatile("lock: cmpxchgl %2, %0"
                : "+m" (*lock), "+a" (newval)
                : "r" (1)
                : "cc");
        return newval;
}

void spin_lock(unsigned long *lock) {
        while(test_and_setlock) == 1) {}
}

void spin_unlock(unsigned long *lock) {
        *lock = 0;
}

线程-互斥

  • Linux和Minix中的p线程库提供了互斥锁的高效实现。

  • 如果线程不能立即获得锁,它就会进入休眠状态,而不是通过‘旋转’来浪费CPU时间。

int main(int argc, char* argv[]) {
        pthread_mutex_t mutex;
        pthread_mutex_init (&mutex, NULL);

        pthread_mutex_lock(&mutex);

        //critical section

        pthread_mutex_unlock(&mutex);

}

Windows中的Mutexes

  • Windows还为互斥锁提供了高效的实现。

  • WaitForSingleObject锁定互斥锁(以及其他锁原语)

  • ReleaseMutex解锁互斥锁

int main(int argc, char* argv[]) {
        HANDLE mutex = CreateMutex(NULL, FALSE, NULL);

        WaitForSingleObject(mutex, INFINITE);

        //critical section

        ReleaseMutex(mutex);

        CloseHandle(mutex);
}

信号灯--一些历史

  • 信号量的概念早于计算时代。

  • 信号量的概念最早是在铁路行业引入的。你仍然可以看到芝加哥CTA或Metra上使用的这些

  • 最近在绿线上发生的CTA脱轨事件涉及一个信号灯

  • 许多软件锁定技术的灵感来自于铁路、公路和工业自动化

信号量

  • 信号量支持一次进入临界区的N个线程。

  • 信号量是自旋锁的泛化
    • 自旋锁有两种状态:0-解锁,1-锁定

    • 信号量有N种状态:0-锁定,1-N-解锁

  • 对保护类似队列的数据结构很有用(稍后将详细介绍)

  • 用于保护N读取器M写入器数据结构(稍后将详细介绍)

  • 信号量的操作与自旋锁不同:
    • 旋转锁有锁、解锁

    • 信号量具有:
      • Up()-增加信号量的值

      • Down()-在信号量块为零时减少它的值

信号量--实现

  • 信号量可以在保护计数变量的自旋锁的顶部实现

int lock = 0;
int semaphore = 0;

void up() {
        spin_lock(&lock);
        semaphore += 1;
        spin_unlock(&lock);
}

void down() {
        spin_lock(&lock);
        while(semaphore == 0) {
                spin_unlock(&lock);
                spin_lock(&lock);
        }
        semaphore -= 1;
        spin_unlock(&lock);
}

二进制信号量

  • 模拟互斥锁

  • 初始状态为%1。

  • 可能的状态为1:未锁定,0:锁定

  • 示例:

Semaphore *mutex = new Semaphore(1);
int count = 0;
void increment(int n) {
        for(int i = 0; i < n; i++) {
                mutex->down();
                count += 1;
                mutex->up();
        }
}

事件信号量

  • 用于通知事件。

  • 初始值为0

  • 可能的状态为:0-无信号。1-发信号

  • 示例:

Semaphore *event = new Semaphore(0);
Semaphore *done = new Semaphore(0);
int a = 0, b = 0, total = 0;
void producer() {
        start(consumer);
        a = 50;
        b = 50;
        event->up();
        done->down();
        printf("total: %d\n", total);
}

void consumer() {
        event->down();
        total = a + b;
        done->up();
}
  • 生产者启动消费者

  • 生产者建立价值

  • 生产商向消费者发出信号并等待

  • 消费者收到信号

  • 消费者执行操作

  • 消费者信号生产者

  • 制片人醒来并打印结果

信号量--实现

  • 信号量的一个很好的用例是生产者/消费者问题:

Queue *queue = new Queue(5);
void producer() {
        while(1) {
                int item = produce_item();
                queue->Enqueue(item);
        }
}

void consumer() {
        while(1) {
                int item = queue->Dequeue();
                consume_item(item);
        }
}
  • 这里存在一类问题:

  • 消费者如何知道队列中有可用的项目?

  • 生产商如何知道队列未满(5件)?

  • 我们如何保证对队列的访问是原子的--队列和出队操作不会相互干扰?

信号量--实现

  • 有界缓冲区示例
    • 互斥是一个二进制信号量(状态为0-锁定,1-解锁)

    • Empty保护队列为空的状态

    • 已满保护队列已满的情况

Queue *queue = new Queue(5);
Semaphore *mutex = new Semaphore(1);
Semaphore *empty = new Semaphore(5);
Semaphore *full = new Semaphore(0);

void producer() {
        while(1) {
                empty->down();
                mutex->down();
                int item = produce_item();
                queue->Enqueue(item);
                mutex->up();
                full->up();
        }
}

void consumer() {
        while(1) {
                full->down();
                mutex->down();
                int item = queue->Dequeue();
                consume_item(item);
                mutex->up();
                empty->up();
        }
}

信号量-读取器/写入器锁定

  • 在某些情况下,需要允许共享资源上的N读取器和M写入器。

  • 最典型的情况是N读取器和1写入器。在这种情况下,因为多个线程可以读取一个值,或者一个线程可以随时写入该值

  • 使用读取器-写入器锁可以实现比使用二进制信号量或互斥锁高得多的并发性。

  • 读取器/写入器锁定实现

Semaphore *read = new Semaphore(0);
Semaphore *write = new Semaphore(0);
Semaphore *lock = new Semaphore(1);
int readWait = 0, writeWait = 0;
int activeRead = 0, activeWrite = 0;

void StartWrite() {
        lock->down();
        if(activeWrite+activeRead+writeWait == 0) {
                write->up();
                activeWrite += 1;
        } else {
                writeWait += 1;
        }
        lock->up();
        write->down();
}

void EndWrite() {
        lock->down();
        activeWrite -= 1;
        if(writeWait > 0) {
                write->up();
                activeWrite += 1;
                writeWait -= 1;
        } else {
                while(readWait>0) {
                        read->up();
                        activeRead += 1;
                        readWait -= 1;
                }
        }
        lock->up();
}

void BeginRead() {
        lock->down();
        if(activeWrite+writeWait == 0) {
                read->up();
                activeRead++;
        } else {
                readWait += 1;
        }
        lock->up();
        read->down();
}

void EndRead() {
        lock->down();
        activeRead -= 1;
        if(activeRead == 0 && writeWait > 0)) {
                write->up();
                activeWrite += 1;
                writeWait -= 1;
        }
        lock->up();
}

信号量-读取器/写入器锁定

  • 正如您从上一张幻灯片中看到的,读取器/写入器锁的实现可能很复杂。

  • 实施细节将根据锁的条件而有所不同。

  • 一些实施注意事项
    • 谁得到了更高的优先权:读者还是作家?

    • 我们允许用户降级或升级锁吗?即,读锁可以被转换为写锁,写锁可以被转换为读锁吗?

  • 读取器/写入器锁可能有一些模糊而复杂的饥饿/死锁场景(稍后将详细介绍)

信号量-线程

  • int sem_init(sem_t *sem, int pshared, unsigned int value)

  • 扫描电子显微镜是信号灯

  • PShared-信号量可以被切分的位置:
    • 0-进程的线程-必须位于堆中

    • 非0-在进程之间-必须位于共享内存中

  • 值-信号量的初始值。典型案例包括:
    • 0用于事件信号量

    • 1用于二进制信号量

    • N用于正常信号量

int main(int argc, char* argv[]) {
        sem_t sema;
        sem_init(&sema, 0, 1);
        sem_wait(&sema);
        //critical section
        sem_post(&sema);
}

Windows-信号量

  • Windows还提供了高效的信号量实现。

  • Windows通过名称共享信号量。CreateSemaffore中的第四个参数接受一个名称。如果要使用现有的名为信号量的函数,可以调用OpenSemaffore。

int main(int argc, char* argv[]) {
        const int minCount = 0;
        const int maxCount = 20;
        HANDLE sema = CreateSemaphore(NULL, minCount, maxCount, NULL);

        WaitForSingleObject(sema, INFINITE);

        //critical section

        ReleaseSemaphore(sema, 1, NULL);
}

监视器/条件变量

  • 监视器是更高级别的锁定抽象

  • 监视器有4种操作:
    • Enter-锁定监视器

    • 退出-解锁显示器

    • 等待-释放锁并使线程休眠。当Wait()返回时,自动再次获取锁

    • PULSE-告诉其他线程唤醒并重新获得监视器锁

  • 在许多情况下,与旋转锁和信号量相比,监视器允许使用更简单的代码实现更复杂的锁定条件

监测器

  • 监视器锁定通常遵循以下模式:
    1. 输入()

    2. While(继续的条件不为真){Wait();}

    3. 如果条件为真,请立即执行操作

    4. IF(条件已改变){PULSE();}

    5. 退出();

  • 在关键部分开始时,调用Enter(),在结束时,调用Exit()

  • 在调用Enter()之后、紧邻临界区之前,While循环在每次迭代中调用Wait()时检查是否可以继续

  • While循环结束后,我们进入临界区

  • 如果在临界区之后,临界区的条件发生了变化,则调用Pulse()

  • 在可能调用Pulse()之后,调用Exit()以结束锁定并退出临界区

  • 监视器锁的While循环中的条件可以非常简单,也可以非常复杂。

  • 即使条件变得非常复杂,监视器上的操作也保持简单

  • 复杂条件的危险在于,您需要确保While循环中的所有线程在某个时候都会中断While循环。否则,您的程序将冻结。

  • 显示器的性能通常非常好。在某些情况下,自旋锁和信号量的性能将超过监视器

监视器限制的缓冲区

  • 信号量实现:

Queue *queue = new Queue(5);
Semaphore *mutex = new Semaphore(1);
Semaphore *empty = new Semaphore(5);
Semaphore *full = new Semaphore(0);

void producer() {
        while(1) {
                empty->down();
                mutex->down();
                int item = produce_item();
                queue->Enqueue(item);
                mutex->up();
                full->up();
        }
}

void consumer() {
        while(1) {
                full->down();
                mutex->down();
                int item = queue->Dequeue();
                consume_item(item);
                mutex->up();
                empty->up();
        }
}

监视器限制的缓冲区

void consumer() {
        while(1) {
                monitor->Enter();
                while(queue->Size() == 0) {
                        monitor->Wait();
                }
                int item = queue->Dequeue();
                consume_item(item);
                monitor->Pulse();
                monitor->Exit();
        }
}

Queue *queue = new Queue(5);
Monitor *monitor = new Monitor();

void producer() {
        while(1) {
                monitor->Enter();
                while(queue->Size() >= 5) {
                        monitor->Wait();
                }
                int item = produce_item();
                queue->Enqueue(item);
                monitor->Pulse();
                monitor->Exit();
        }
}
  • 在本例中,我们在每次退出之前调用Pulse()。我们这样做是因为队列的大小在临界区被更改,并且可能允许其他线程操作。

信号量读取器/写入器锁

void StartWrite() {
        lock->down();
        if(activeWrite+activeRead+writeWait == 0) {
                write->up();
                activeWrite += 1;
        }  else {
                writeWait += 1;
        }
        lock->up();
        write->down();
}

void EndWrite() {
        lock->down();
        activeWrite -= 1;
        if(writeWait > 0) {
                write->up();
                activeWrite += 1;
                writeWait -= 1;
        } else {
                while(readWait>0) {
                        read->up();
                        activeRead += 1;
                        readWait -= 1;
                }
        }
        lock->up();
}

Semaphore *read = new Semaphore(0);
Semaphore *write = new Semaphore(0);
Semaphore *lock = new Semaphore(1);
int readWait = 0, writeWait = 0;
int activeRead = 0, activeWrite = 0;

void BeginRead() {
        lock->down();
        if(activeWrite+writeWait == 0) {
                read->up();
                activeRead++;
        } else {
                readWait += 1;
        }
        lock->up();
        read->down();
}

void EndRead() {
        lock->down();
        activeRead -= 1;
        if(activeRead == 0 && writeWait > 0)) {
                write->up();
                activeWrite += 1;
                writeWait -= 1;
        }
        lock->up();
}

监视器-读取器/写入器锁定

  • 监视器的实现要简单得多!

Monitor *monitor = new Monitor();
int readWait = 0, writeWait = 0;
int activeRead = 0, activeWrite = 0;

void BeginRead() {
        monitor->enter();
        readWait += 1;
        while(activeWrite+writeWait > 0) {
                monitor->wait();
        }
        readWait -= 1;
        activeRead += 1;
        monitor->pulse();
        monitor->exit();
}

void EndRead() {
        monitor->enter();
        activeRead -= 1;
        monitor->pulse();
        monitor->exit();
}

void StartWrite() {
        monitor->enter();
        writeWait += 1;
        while(activeWrite+activeRead > 0) {
                monitor->wait();
        }
        activeWrite += 1;
        monitor->pulse();
        monitor->exit();
}

void EndWrite() {
        monitor->enter();
        activeWrite -= 1;
        monitor->pulse();
        monitor->exit();
}

多线程--条件变量

  • PTHREADS使用条件变量实现类似监视器的功能。

  • 要使用它们,您需要一个互斥锁和一个条件变量:

int main(int argc, char* argv[]) {
        pthread_mutex_t mutex;
        pthread_cond_t cond;
        pthread_mutex_init(&mutex);
        pthread_cond_init(&cond);

        pthread_mutex_lock(&mutex);
        while(/* condition is not true */ ) {
                pthread_cond_wait(&cond, &mutex);
        }
        //critical section
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
}

窗口-条件变量

  • 在Windows Vista、7和Server 2008中可用

  • 条件变量使用CRITICAL_SECTION,就像p线程的条件变量使用互斥锁一样。在这种情况下,对象是一个自旋锁,而不是互斥体。

int main(int argc, char* argv[]) {
        CONDITION_VARIABLE cond;
        CRITICAL_SECTION lock;

        InitializeConditionVariable(&cond);
        InitializeCriticalSection(&lock);

        EnterCriticalSection(&lock);
        while(/*condition is not true*/) {
                SleepConditionVariableCS(&cond, &lock, INFINITE);
        }
        //critical section
        WakeConditionVariable(&cond);
        LeaveCriticalSection(&lock);
}