操作系统导论-并发

一.并发

操作系统为了进程能有多个执行点,为进程提供了一种抽象:线程。线程与进程类似,一个进程中的所有线程共享地址空间,但有自己独立的栈。

image-20230106183045452

1.并发问题

线程的执行顺序也需要操作系统来进行调度。由于线程共享相同的地址空间,不可控的调度可能产生一些问题,这些问题就是并发问题。

并发问题产生的根本原因在于,对于共享数据的操作,可能需要多条指令才能完成,在完成操作的过程中,如果发生了线程的切换,对共享数据进行修改,就会产生问题。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*一个将指定位置的值+1的指令*/
mov 0x8049a1c, %eax
add $0x1, %eax
mov %eax, 0x8049a1c
/*一个可能的并发问题,两个线程都将同一变量+1*/
//进程A运行
mov 0x8049a1c, %eax //A取出值,假设为0
add $0x1, %eax //A计算得到1
//进程切换到B
mov %eax, 0x8049a1c //B取出值,还是0
add $0x1, %eax
mov %eax, 0x8049a1c //B计算并将1写入内存
//切换为A
mov %eax, 0x8049a1c //A将1写入内存

以上的例子假设变量为0,A和B都对变量+1,但结果是1,只加了1次,这是因为+1的操作不是原子进行的,不能保证正确完成。因此,操作系统和硬件必须提供一些方法,使调度可控,保证这些对共享数据的操作能够正确完成。

除了以上的并发问题,还有一些其他类型的并发问题,例如线程等待,父线程需要等待子线程完成才能继续运行,操作系统和硬件也需要提供方法,确保调度的正确。

2.并发术语

  • 临界区(critical section)是访问共享资源的一段代码,资源通常是一个变量或数据结构。
  • 竞态条件(race condition)出现在多个执行线程大致同时进入临界区时,它们都试图更新共享 的数据结构,导致了令人惊讶的(也许是不希望的)结果。
  • 不确定性(indeterminate)程序由一个或多个竞态条件组成,程序的输出因运行而异,具体取 决于哪些线程在何时运行。这导致结果不是确定的(deterministic),而我们通常期望计算机系统给出确 定的结果。
  • 为了避免这些问题,线程应该使用某种互斥(mutual exclusion)原语。这样做可以保证只有一 个线程进入临界区,从而避免出现竞态,并产生确定的程序输出。

二.锁

操作系统为线程提供了锁,来处理并发问题。

1.锁的基本思想

锁是一个变量,主要的操作是获取锁和释放锁,如果一个线程获取了锁,其他线程就无法获取锁。在临界区周围使用锁,就保证了临界区只有一个线程可以进入。

1
2
3
4
lock_t mutex; //全局锁
lock(&mutex);
balance = balance + 1;
unlock(&mutex);

要实现一个锁,必须借助硬件原语来完成(原语指硬件原子性完成的指令),如果获取锁和释放锁不借助硬件,还是会有调度问题。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct lock_t { int flag; } lock_t; 
//初始化锁为0,表示无线程持有
void init(lock_t *mutex) {
mutex->flag = 0;
}
//上锁,将值改为1
void lock(lock_t *mutex) {
while (mutex->flag == 1) // TEST the flag
; // spin-wait (do nothing)
mutex->flag = 1; // now SET it!
}
//释放锁
void unlock(lock_t *mutex) {
mutex->flag = 0;
}

如果锁也是一个共享变量,上锁只是从将值改为1,那么一个线程获取锁,还没将值改为1,另一个线程运行,读到的值仍为0,这个线程就也可以获取锁了。因此,必须要有硬件指令保证锁能原子性的获取。

2.锁的评价指标

  • 互斥:锁必须有效,阻止多个线程进入临界区
  • 公平性:线程有同等机会获取锁
  • 性能:使用锁的性能开销

3.锁的实现

锁的实现有以下几种方式:

控制中断

锁不设置值,直接关闭中断,这样上锁后,就不会发生调度。控制中断可以直接用关中断指令来完成。

关中断的优点是简单,但缺点很多:

  • 关中断是特权指令,不能信任用户程序,允许用户程序使用特权指令
  • 中断丢失:关中断时,一些IO中断等可能会丢失。如果一个进程在等待IO,而IO中断丢失了,进程就无法被唤醒
  • 不支持多处理器:关闭一个处理器的中断,但另一个线程可能在其他处理器运行

因为以上缺点,只有操作系统在一些特殊情况才使用关中断来实现锁。

测试并设置

锁可以使用测试并设置指令来实现,该指令原子的完成返回旧值,设定新值的操作。

1
2
3
4
5
6
7
int TestAndSet(int *old_ptr, int new);	//C描述
//获取锁
void lock(lock_t *lock) {
while (TestAndSet(&lock->flag, 1) == 1)
; // spin-wait (do nothing)
}
}

由于无法获取锁时,线程会在while循环等待,这种锁也被称为自旋锁。自旋锁的效率很低,如果有多个线程要获取锁,每个线程都会自旋一个时间片后才切换到其他线程,自旋的时间被浪费了。

比较并交换

另一个实现可以实现锁的硬件原语是比较并交换,如果值和期望的值相等,就修改该值,原子性的完成。

1
2
3
4
5
6
int CompareAndSwap(int *ptr, int expected, int new);
//获取锁
void lock(lock_t *lock) {
while (CompareAndSwap(&lock->flag, 0, 1) == 1)
; // spin
}

获取并增加

最后一个硬件原语是获取并增加指令,它能原子地返回特定地址的旧值,并且让该值自增一。

该原语实现的锁使用两个值,一个值ticket是当前线程排队获取锁的号,另一个值turn是当前持有锁的线程的号,获取锁时将ticket+1,释放锁时将turn+1,当一个线程的ticket与turn相等时,就可以获取锁,进入临界区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int FetchAndAdd(int *ptr);
typedef struct lock_t {
int ticket;
int turn;
} lock_t;

void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}

void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn)
; // spin
}

休眠队列

为了解决自旋锁浪费资源的问题,可以采取一些方法避免自旋。一种方式是让线程主动让出cpu,另一种方式是一旦线程不能获取锁,就先进入休眠队列休眠,当一个线程释放锁,就从队列中唤醒一个线程运行。

第一种方式,每个线程还是可能会被调度到,然后再放弃cpu,接下来又可能被调度到,再放弃cpu,这也是浪费时间的,无意义的上下文切换开销很大。

第二种方式,一个可能的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
typedef struct lock_t { 
int flag;
int guard;
queue_t *q;
} lock_t;

void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}

void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
queue_add(m->q, gettid());
m->guard = 0;
park();
}
}

void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock (for next thread!)
m->guard = 0;
}

flag表示锁是否被占用,而guard锁保证了对flag和队列的操作是互斥的。guard锁虽然自旋,但guard锁保护的获取锁和释放锁的临界区很小,因此不会浪费太多资源。

以上这个实现有一点小问题,如果有一个线程将要park休眠但还没休眠,另一个线程就释放了锁,这个线程就会在休眠后无人唤醒,解决方法是提前表明自己马上要 park。如果刚好另一个线程被调度,并且调用了 unpark,那么后续的 park 调用就会直接返回,而不是一直睡眠。

1
2
3
queue_add(m->q, gettid()); 
setpark(); // new code
m->guard = 0;

不同的硬件提供了不同的硬件原语,不同的操作系统也有不同的对锁的实现,细节和策略都有所不同。

三.条件变量

有些情况下,线程需要检查某一条件满足之后,才会继续运行。例如,父线程可能需要检查子线程是否执行完毕,子线程完毕后继续执行。条件变量的设计就是为了解决这种等待/唤醒的线程等待问题。

条件变量是一个显式队列,当某些执行状态(即条件,condition)不满足时,线程可以把自己加入队列,等待(waiting)该条件。另外某个线程,当它改变了上述状态时,就可以唤醒一个或者多个等待线程(通过在该条件上发信号),让它们继续执行。

1.条件变量的使用

条件变量有两种相关操作:wait()和 signal()。线程要睡 眠的时候,调用 wait()。当线程想唤醒等待在某个条件变量上的睡眠线程时,调用 signal()。以下是使用条件变量实现父线程等待子线程结束后继续执行的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int done = 0; 
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;

void thr_exit() {
Pthread_mutex_lock(&m);
done = 1;
Pthread_cond_signal(&c);
Pthread_mutex_unlock(&m);
}

void *child(void *arg) {
printf("child\n");
thr_exit();
return NULL;
}

void thr_join() {
Pthread_mutex_lock(&m);
while (done == 0)
Pthread_cond_wait(&c, &m);
Pthread_mutex_unlock(&m);
}

int main(int argc, char *argv[]) {
printf("parent: begin\n");
pthread_t p;
Pthread_create(&p, NULL, child, NULL);
thr_join();
printf("parent: end\n");
return 0;
}

针对以上的例子中的一些细节作一些说明:

  • 尽管使用了条件变量,还是需要一个变量done来进行同步,该变量是有必要的,如果父线程创建子线程后,子线程立即运行并执行完毕,父线程就根据done判断不应该休眠,否则将无法被唤醒。
  • 使用while判断done是否为1,在此不是必要的,但最好使用while,接下来介绍的生产者消费者问题中会说明为什么最好使用while。

  • 在线程休眠和唤醒的操作前后都进行了上锁,因为休眠队列也是共享数据,如果不上锁可能会产生竞态条件,例如父线程将要睡眠,子线程调度运行并执行完毕,唤醒父线程,然而父线程还没有休眠,再调度回父线程,父线程休眠就无法被唤醒了。

  • wait()调用时传入了锁,当线程休眠时,需要释放掉锁,让其他线程可以获取锁,当被信号唤醒时,该线程将重新获取锁,然后从wait()调用返回。如果不释放锁,会导致死锁,当前线程休眠,另一个线程被唤醒却获取不到锁,导致无法继续运行。

2.生产者消费者问题

生产者/消费者(producer/consumer)问题,也叫作有界缓冲区(bounded buffer)问题。 假设有一个或多个生产者线程和一个或多个消费者线程。生产者把生成的数据项放入缓冲区;消费者从缓冲区取走数据项,以某种方式消费。生产者消费者需要同步,如果缓冲区为空,消费者就应该睡眠,唤醒生产者;如果缓冲区已满,生产者就应该睡眠,唤醒消费者。

以下是使用条件变量解决的生产者消费者问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
cond_t empty, fill; 
mutex_t mutex;

void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // p1
while (count == MAX) // p2
Pthread_cond_wait(&empty, &mutex); // p3
put(i); // p4
Pthread_cond_signal(&fill); // p5
Pthread_mutex_unlock(&mutex); // p6
}
}

void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // c1
while (count == 0) // c2
Pthread_cond_wait(&fill, &mutex); // c3
int tmp = get(); // c4
Pthread_cond_signal(&empty); // c5
Pthread_mutex_unlock(&mutex); // c6
printf("%d\n", tmp);
}
}

对实现中的细节进行说明:

  • 使用两个条件变量,即两个队列,因为需要明确唤醒的对象类型,例如缓冲区空时,消费者必须唤醒生产者,而不是另一个消费者,因此需要两个条件变量让消费者和生产者在不同的队列休眠。
  • 使用while语句和count变量判断是否休眠,这是因为需要有一个变量进行同步,而且可能存在不止一个修改该变量的线程。例如,一个线程被唤醒但还未运行,另一个新线程插入并消费了缓冲区数据,然后刚被唤醒的线程才开始运行,就必须通过判断count确认缓冲区不为空,否则就产生了错误。这也是为什么使用while总是更好的原因,如果使用if,被唤醒的变量就不会再判断count是否为0了。

3.覆盖条件

条件变量是通过休眠/唤醒来进行线程等待和同步的,然而有一些情况,在等待的线程并不是完全相同的,而是有差别的,这时唤醒哪个线程就会对程序的运行存在影响。

考虑一个内存分配程序,假设有三个线程,一个线程正在使用50的内存空间,另两个线程分别要求分配10和100的内存空间,并因空间不足而休眠等待。这时运行的线程执行完毕,释放了50的空间,并唤醒一个线程,如果唤醒100的线程,内存空间还是不够,该线程继续休眠,需要10内存空间的线程可以运行却没有被唤醒。

解决以上问题的方案是唤醒所有等待的线程,确保所有应该唤醒的线程都被唤醒。这种方式下的条件变量被称为覆盖条件。显然因为唤醒了不必要的线程,产生了额外的开销,但在有些情况下,使用覆盖条件是必须的。

四.信号量

信号量是另一个解决并发问题的原语,是由Dijkstra 及其同事发明的。信号量可以用作锁和条件变量。

1.信号量

信号量是有一个整数值的对象,可以用两个函数来操作它。在 POSIX 标准中,是 sem_wait()和 sem_post()①。因为信号量的初始值能够决定其行为,所以首先要初始化信号量, 才能调用其他函数与之交互。

如果信号量的值大于等于1,sem_wait会立即返回,否则将让线程挂起,直到被唤醒。无论是否挂起,sem_wait最后都会将信号量的值-1。sem_post则直接增加信号量的值,如果有等待线程,唤醒其中一个。

2.二值信号量(锁)

信号量可以用作锁,使用 sem_wait()/sem_post()环绕临界区就可以了。第一个线程进入临界区应该可以获取锁,但另一个就不能了,因此显然信号量应该初始化为1,第一个线程进入临界区并将信号量-1变为0,第二个线程进入时因为信号量为0而进入休眠,并将信号量-1为-1,,第一个线程结束时,信号量被加到0,然后第二个线程被唤醒,此时直接从sem_wait返回,不再判断信号量是否大于1,最后执行结束,将信号量+1,信号量就又回到了1。

1
2
3
4
5
6
sem_t m; 
sem_init(&m, 0, 1);

sem_wait(&m);
// critical section here
sem_post(&m);

3.信号量用作条件变量

信号量也可以用作条件变量,只需要将值初始化为0。这样父线程将会休眠,等待子线程运行结束后将其唤醒,如果子线程在父线程休眠前先运行了,可以根据done的值直接运行而不调用sem_wait,在结束后调用sem_post,将信号量加为1,然后父线程就可以运行了。

但是回到条件变量的使用,条件变量在让线程休眠时,是需要释放锁的,如果不释放锁,就会导致死锁,而信号量没有这个功能,但是锁又是必须使用的,否则一个刚被唤醒的线程,和一个新插入的线程可能同时进入临界区。解决该问题的方法是调整锁的作用域:

1
2
3
4
5
6
sem_wait(&full); 
sem_wait(&mutex);
int tmp = get();
sem_post(&mutex);
sem_post(&empty);
printf("%d\n", tmp);

这样就可以避免以上问题了,但是在条件变量的使用时,这个锁是为了保护休眠队列的:

休眠队列也是共享数据,如果不上锁可能会产生竞态条件,例如父线程将要睡眠,子线程调度运行并执行完毕,唤醒父线程,然而父线程还没有休眠,再调度回父线程,父线程休眠就无法被唤醒了。

把锁移动到休眠的内部,只保护了缓冲区,不再保护队列了。但是不会有使用条件变量发生的上面这个问题,因为子线程执行完毕会将信号量+1,父线程就不会休眠了。

4.生产者/消费者(有界缓冲区)问题

以下是信号量实现的生产者/消费者(有界缓冲区)问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
sem_t empty; 
sem_t full;
sem_t mutex;

void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&empty); // line p1
sem_wait(&mutex); // line p1.5 (MOVED MUTEX HERE...)
put(i); // line p2
sem_post(&mutex); // line p2.5 (... AND HERE)
sem_post(&full); // line p3
}
}

void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&full); // line c1
sem_wait(&mutex); // line c1.5 (MOVED MUTEX HERE...)
int tmp = get(); // line c2
sem_post(&mutex); // line c2.5 (... AND HERE)
sem_post(&empty); // line c3
printf("%d\n", tmp);
}
}

int main(int argc, char *argv[]) {
// ...
sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with...
sem_init(&full, 0, 0); // ... and 0 are full
sem_init(&mutex, 0, 1); // mutex=1 because it is a lock
// ...
}

所有细节的问题都已经在以上的说明中提到了。

五.常见并发问题

常见的并发问题可以划分为死锁缺陷和非死锁缺陷。

1.非死锁缺陷

非死锁缺陷占并发问题的大多数,非死锁缺陷这里主要讨论两种:违反原子性(atomicity violation)缺陷和错误顺序(order violation)缺陷。

违反原子性缺陷就是对共享数据没有保护导致并发访问错误的缺陷,解决方法就是对临界区上锁。这是最简单的一种缺陷。

违反顺序缺陷是另一种问题下发生的,就是介绍条件变量时提到的等待问题,如果一个线程需要等待另一个线程执行完毕才能正确执行,却没有保证这种顺序,就可能产生问题。解决方式也很简单,使用条件变量或者信号量,保证线程执行的顺序。

2.死锁缺陷

死锁缺陷是多个线程互相等待的缺陷。

image-20230111180716598

产生死锁的条件 死锁的产生需要如下 4 个条件:

  • 互斥:线程对于需要的资源进行互斥的访问(例如一个线程抢到锁)。
  • 持有并等待:线程持有了资源(例如已将持有的锁),同时又在等待其他资源(例如,需要获得的锁)。
  • 非抢占:线程获得的资源(例如锁),不能被抢占。
  • 循环等待:线程之间存在一个环路,环路上每个线程都额外持有一个资源,而这 个资源又是下一个线程要申请的。

下面说明如何预防死锁:

  • 考虑从循环等待解决问题,可以通过保证加锁的顺序,避免循环等待。严格限定加锁顺序称为全序,但是有时锁的依赖很复杂,涉及多个锁,全序很难做到,这时可以使用偏序进行加锁。

  • 从持有并等待的角度解决问题,可以通过保证原子性获取所有锁,来避免持有并等待。使用这个方式必须提前知道所有要获取的锁,一方面其不适用于封装,另一方面提前获取所有锁可能降低了效率。

1
2
3
4
5
lock(prevention); 
lock(L1);
lock(L2);
...
unlock(prevention);
  • 从非抢占的角度考虑也可以预防死锁,如果另一个线程不能抢占自己的锁,导致了死锁,可以在不能获取锁时,把自己已经获取的锁释放掉,从而打破死锁的情况。然而如果另一线程采用不同的加锁顺序,持续的重复这一过程,又抢锁失败,会导致活锁。
1
2
3
4
5
6
top: 
lock(L1);
if (trylock(L2) == -1) {
unlock(L1);
goto top;
}
  • 从互斥考虑预防死锁的方法就是完全避免互斥,使用无等待(wait-free)数据结构,通过直接使用硬件原语避免死锁。

除了以上的预防方法,另一种方式是用调度避免死锁(然而锁的存在就是为了避免不确定的调度)。如果了解全局的信息,包 括不同线程在运行中对锁的需求情况,就可以让后续的调度能够避免产生死锁。

最后一种常用的策略就是允许死锁偶尔发生,检查到死锁时再采取行动。如果死锁很少见,这种不是办法的办法也是很实用的。