Pthreads是POSIX的线程标准,定义了创建和操作线程的一组API。

本篇源于Pthread:POSIX 多线程程序设计

一.概述

1.使用多线程

为什么使用多线程:

  • 提高通信效率
  • 异步事件处理
  • CPU在等待I/O时,可以被其他线程使用

使用Pthreads的情况:

  • 数据可以被多个任务同时执行
  • 阻塞与长时间的IO
  • 对异步事件必须响应

PthreadsAPI有三大类:

  • 线程管理
  • 互斥量
  • 条件变量

二.API

1.线程管理

创建线程

1
2
3
4
pthread_create(thread,attr,start_routine,arg);
pthread_exit(status);
pthread_attr_init(attr);
pthread_attr_destroy(attr);

pthread_create参数:

  • thread:返回的唯一的新线程标识符
  • attr:线程属性,可缺省
  • start_routine:执行函数
  • arg:参数,必须转换为void*

进程可创建的线程最大数量取决于操作系统,线程可以创建其他线程,没有依赖关系。

线程属性使用attr_init和attr_destroy来初始化和销毁。一些属性在下面讨论。

终止线程

终止线程有以下方法:

  • 线程返回
  • 线程调用pthread_exit
  • 其他线程调用pthread_cancel结束线程
  • 调用exec或者exit函数

当main结束时,所有线程都会结束,如果main中调用pthread_exit,那么其他线程还是会存活。

创建与终止实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <pthread.h> 
#include <stdio.h>
#define NUM_THREADS 5

void *PrintHello(void *threadid) {
int tid;
tid = (int)threadid;
printf("Hello World! It's me, thread #%d!\n", tid);
pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
pthread_t threads[NUM_THREADS];
int rc, t;
for(t=0; t<NUM_THREADS; t++){
printf("In main: creating thread %d\n", t);
rc = pthread_create(&threads[t], NULL, PrintHello, (void *)t);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
pthread_exit(NULL);
}

向线程传递参数

pthread_create函数只能传递一个参数,需要传递多个参数时,需要定义一个结构体包含所有参数,然后传递指向该结构体的指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct thread_data{
int thread_id;
int sum;
char *message;
};
struct thread_data thread_data_array[NUM_THREADS];

void *PrintHello(void *threadarg) {
struct thread_data *my_data;
...
my_data = (struct thread_data *) threadarg;
taskid = my_data->thread_id;
sum = my_data->sum;
hello_msg = my_data->message;
...
}

int main (int argc, char *argv[]) {
...
thread_data_array[t].thread_id = t;
thread_data_array[t].sum = sum;
thread_data_array[t].message = messages[t];
rc = pthread_create(&threads[t], NULL, PrintHello,
(void *) &thread_data_array[t]);
...
}

线程同步:join

相关API:

1
2
3
4
pthread_join (threadid,status)  
pthread_detach (threadid,status)
pthread_attr_setdetachstate (attr,detachstate)
pthread_attr_getdetachstate (attr,detachstate)

pthread_join会阻塞等待指定线程结束,如果目标线程调用pthread_exit(),可以在主线程获得终止状态。

当线程创建后,会默认处于joinable状态,是可以join等待结束的,可以用attr参数修改设置为detachable状态,就不可等待了。

线程结束后,如果没有join,则会处于僵尸态,有资源未回收,但是调用pthread_join会使调用者阻塞,如果不希望调用者阻塞,可以使用detach函数:

1
2
3
4
//子线程加入
pthread_detach(pthread_self());
//或者父线程调用
pthread_detach(thread_id)

调用后,子线程运行结束后会自动释放所有资源。

栈管理

POSIX标准没有指定线程栈的大小,可以分配和设定栈的位置和大小。

1
2
3
4
pthread_attr_getstacksize(attr, stacksize)  
pthread_attr_setstacksize(attr, stacksize)
pthread_attr_getstackaddr(attr, stackaddr)
pthread_attr_setstackaddr(attr, stackaddr)

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <pthread.h> 
#include <stdio.h>
#define NTHREADS 4
#define N 1000
#define MEGEXTRA 1000000
pthread_attr_t attr;

void *dowork(void *threadid) {
double A[N][N];
int i,j,tid;
size_t mystacksize;
tid = (int)threadid;
pthread_attr_getstacksize (&attr, &mystacksize);
printf("Thread %d: stack size = %li bytes \n", tid, mystacksize);
for (i=0; i<N; i++)
for (j=0; j<N; j++)
A[i][j] = ((i*j)/3.452) + (N-i);
pthread_exit(NULL);
}

int main(int argc, char *argv[]) {
pthread_t threads[NTHREADS];
size_t stacksize;
int rc, t;
pthread_attr_init(&attr);
pthread_attr_getstacksize (&attr, &stacksize);
printf("Default stack size = %li\n", stacksize);
stacksize = sizeof(double)*N*N+MEGEXTRA;
printf("Amount of stack needed per thread = %li\n",stacksize);
pthread_attr_setstacksize (&attr, stacksize);
printf("Creating threads with stack size = %li bytes\n",stacksize);
for(t=0; t<NTHREADS; t++){
rc = pthread_create(&threads[t], &attr, dowork, (void *)t);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
printf("Created %d threads.\n", t);
pthread_exit(NULL);
}

其他函数

1
2
3
4
5
6
7
//返回线程id
pthread_self();
//不同返回0,相同返回非0
pthread_equal(thread1,thread2);
//pthread_once函数只会被调用一次,可能是任何一个调用该函数的线程
pthread_once_t once_control = PTHREAD_ONCE_INIT;
pthread_once (once_control, init_routine)

2.互斥量

互斥量是保护共享数据的主要方法,用于防止竞争。

多个线程竞争同一个互斥量时会阻塞,用try_lock替换lock可以使失败时不会阻塞。

1
2
3
4
pthread_mutex_init (mutex,attr);
pthread_mutex_destroy (mutex);
pthread_mutexattr_init (attr);
pthread_mutexattr_destroy (attr);

互斥量必须初始化,使用pthread_mutex_t类型声明。

1
2
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_mutextattr_t attr;

可选属性包括协议,优先级上限,进程共享。

互斥量的操作函数为:

1
2
3
pthread_mutex_lock (mutex);
pthread_mutex_trylock (mutex);
pthread_mutex_unlock (mutex);

互斥量的使用实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include <pthread.h> 
#include <stdio.h>
#include <malloc.h>

/*
The following structure contains the necessary information
to allow the function "dotprod" to access its input data and
place its output into the structure.
*/

typedef struct {
double *a;
double *b;
double sum;
int veclen;
} DOTDATA;

/* Define globally accessible variables and a mutex */
#define NUMTHRDS 4
#define VECLEN 100

DOTDATA dotstr;
pthread_t callThd[NUMTHRDS];
pthread_mutex_t mutexsum;

/*
The function dotprod is activated when the thread is created.
All input to this routine is obtained from a structure
of type DOTDATA and all output from this function is written into
this structure. The benefit of this approach is apparent for the
multi-threaded program: when a thread is created we pass a single
argument to the activated function - typically this argument
is a thread number. All the other information required by the
function is accessed from the globally accessible structure.
*/

void *dotprod(void *arg) {
/* Define and use local variables for convenience */
int i, start, end, offset, len ;
double mysum, *x, *y;
offset = (int)arg;
len = dotstr.veclen;
start = offset*len;
end = start + len;
x = dotstr.a;
y = dotstr.b;
/*
Perform the dot product and assign result
to the appropriate variable in the structure.
*/
mysum = 0;
for (i=start; i<end ; i++) {
mysum += (x[i] * y[i]);
}
/*
Lock a mutex prior to updating the value in the shared
structure, and unlock it upon updating.
*/
pthread_mutex_lock (&mutexsum);
dotstr.sum += mysum;
pthread_mutex_unlock (&mutexsum);
pthread_exit((void*) 0);
}

/*
The main program creates threads which do all the work and then
print out result upon completion. Before creating the threads,
the input data is created. Since all threads update a shared structure,
we need a mutex for mutual exclusion. The main thread needs to wait for
all threads to complete, it waits for each one of the threads. We specify
a thread attribute value that allow the main thread to join with the
threads it creates. Note also that we free up handles when they are
no longer needed.
*/

int main (int argc, char *argv[]) {
int i;
double *a, *b;
void *status;
pthread_attr_t attr;
/* Assign storage and initialize values */
a = (double*) malloc (NUMTHRDS*VECLEN*sizeof(double));
b = (double*) malloc (NUMTHRDS*VECLEN*sizeof(double));
for (i=0; i<VECLEN*NUMTHRDS; i++) {
a[i]=1.0;
b[i]=a[i];
}
dotstr.veclen = VECLEN;
dotstr.a = a;
dotstr.b = b;
dotstr.sum=0;
pthread_mutex_init(&mutexsum, NULL);
/* Create threads to perform the dotproduct */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for(i=0; i<NUMTHRDS; i++) {
/*
Each thread works on a different set of data.
The offset is specified by 'i'. The size of
the data for each thread is indicated by VECLEN.
*/
pthread_create( &callThd[i], &attr, dotprod, (void *)i);
}
pthread_attr_destroy(&attr);
/* Wait on the other threads */
for(i=0; i<NUMTHRDS; i++){
pthread_join( callThd[i], &status);
}
/* After joining, print out the results and cleanup */
printf ("Sum = %f \n", dotstr.sum);
free (a);
free (b);
pthread_mutex_destroy(&mutexsum);
pthread_exit(NULL);
}

3.条件变量

条件变量使用实际的数据值来实现同步,通常和互斥一起使用。条件变量本身是一个队列,可以让等待条件达成的线程休眠,并释放掉互斥量,等待条件完成后被其他线程唤起。

1
2
3
4
5
pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER;	//静态初始化
pthread_cond_init(condition,attr);
pthread_cond_destroy(condition);
pthread_condattr_init(attr);
pthread_condattr_destroy(attr);

在条件变量上等待和发送信号:

1
2
3
pthread_cond_wait(condition,mutex);  
pthread_cond_signal(condition);
pthread_cond_broadcast(condition);

在学习OS时已经详细的学习过条件变量的内容了,所以这里直接放示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <pthread.h> 
#include <stdio.h>

#define NUM_THREADS 3
#define TCOUNT 10
#define COUNT_LIMIT 12

int count = 0;
int thread_ids[3] = {0,1,2};
pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;

void *inc_count(void *idp) {
int j,i;
double result=0.0;
int *my_id = idp;
for (i=0; i<TCOUNT; i++) {
pthread_mutex_lock(&count_mutex);
count++;
if (count == COUNT_LIMIT) {
pthread_cond_signal(&count_threshold_cv);
printf("inc_count(): thread %d, count = %d Threshold reached.\n", *my_id, count);
}
printf("inc_count(): thread %d, count = %d, unlocking mutex\n", *my_id, count);
pthread_mutex_unlock(&count_mutex);

/* Do some work so threads can alternate on mutex lock */
for (j=0; j<1000; j++)
result = result + (double)random();
}
pthread_exit(NULL);
}

void *watch_count(void *idp) {
int *my_id = idp;
printf("Starting watch_count(): thread %d\n", *my_id);
pthread_mutex_lock(&count_mutex);
if (count<COUNT_LIMIT) {
pthread_cond_wait(&count_threshold_cv, &count_mutex);
printf("watch_count(): thread %d Condition signal received.\n", *my_id);
}
pthread_mutex_unlock(&count_mutex);
pthread_exit(NULL);
}

int main (int argc, char *argv[]) {
int i, rc;
pthread_t threads[3];
pthread_attr_t attr;
/* Initialize mutex and condition variable objects */
pthread_mutex_init(&count_mutex, NULL);
pthread_cond_init (&count_threshold_cv, NULL);
/* For portability, explicitly create threads in a joinable state */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&threads[0], &attr, inc_count, (void *)&thread_ids[0]);
pthread_create(&threads[1], &attr, inc_count, (void *)&thread_ids[1]);
pthread_create(&threads[2], &attr, watch_count, (void *)&thread_ids[2]);
/* Wait for all threads to complete */
for (i=0; i<NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
printf ("Main(): Waited on %d threads. Done.\n", NUM_THREADS);
/* Clean up and exit */
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&count_threshold_cv);
pthread_exit(NULL);
}

三.未涉及的内容

Pthreads还有一些API没有提到:

  • 线程调度:Pthreads提供了显式设定线程调度策略和优先级的函数
  • 线程数据
  • 互斥量的属性
  • 跨进程的条件变量共享
  • 取消线程
  • 多线程和信号

四.练习

因为同时在熟悉makefile,所以写了makefile。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
all = main array_sum comsumer
obj_main = func.o main.o
main : $(obj)
gcc -o obj_main $(obj) -lpthread
main.o : main.c
func.o : func.c
array_sum: array_sum.c
gcc -o array_sum array_sum.c -lpthread
consumer: consumer.c
gcc -o consumer consumer.c -lpthread
rarraysum:
./array_sum
rconsumer:
./consumer
clean :
-rm *.o $(all)

点积

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include<stdio.h>
#include<pthread.h>

extern const int NUM_THREADS;
extern const int VEC_SIZE;
struct args{
int *a, *b, *sum;
};

void *dot_product_ser(int *a, int *b, int *result){
int tmp = 0;
for(int i=0;i<VEC_SIZE;i++){
tmp += a[i]*b[i];
}
*result = tmp;
return NULL;
}

void *dot_product_par(void *thread_args){
int my_size = VEC_SIZE/NUM_THREADS;
int tmp = 0;
int *a, *b, *result;
struct args *my_arg = (struct args*)thread_args;
a = my_arg->a;
b = my_arg->b;
result = my_arg->sum;
for(int i=0;i<my_size;i++){
tmp += a[i]*b[i];
}
*result = tmp;
pthread_exit(NULL);
}

void initialize(int *a, int *b){
for(int i=0;i<VEC_SIZE;i++){
a[i] = 1;
b[i] = 2;
}
}

main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>

extern const int NUM_THREADS = 5;
extern const int VEC_SIZE = 1e4;
void *dot_product_ser(int *a, int *b, int *result);
void *dot_product_par(void *thread_arg);
void initialize(int *a, int *b);

struct args{
int *a, *b, *sum;
};

int main(){
int *a = malloc(VEC_SIZE*sizeof(int));
int *b = malloc(VEC_SIZE*sizeof(int));
int *my_sum = malloc(NUM_THREADS*sizeof(int));
int result_par = 0, step = NUM_THREADS/VEC_SIZE, result_ser = 0;

initialize(a, b);
dot_product_ser(a,b,&result_ser);

//parallel computing
pthread_t threads[NUM_THREADS];
struct args thread_arg[NUM_THREADS];
for(int i=0;i<NUM_THREADS;i++){
thread_arg[i].a = a+i*step;
thread_arg[i].b = b+i*step;
thread_arg[i].sum = &my_sum[i];
int rt = pthread_create(&threads[i], NULL, dot_product_par,(void*)&thread_arg[i]);
}
for(int i=0;i<NUM_THREADS;i++){
pthread_join(threads[i],NULL);
result_par += my_sum[i];
}

//check
if(result_par==result_ser){
printf("coorect and result is %d\n", result_par);
}
else printf("wrong!");

free(a);
free(b);
free(my_sum);
return 0;
}

数组求和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>

const int NUM_THREADS = 5;
const int N = 1e6;

int res = 0;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

void *array_sum(void *a){
int *t = (int*)a;
int n = N/NUM_THREADS, tmp = 0;
for(int i=0;i<n;i++){
tmp += t[i];
}
pthread_mutex_lock(&mtx);
res += tmp;
pthread_mutex_unlock(&mtx);
}

int main(){
int *a = malloc(N*sizeof(int));
int step = N/NUM_THREADS;
int ans = 0;
pthread_t threads[N];
printf("array_sum runnning...\n");

for(int i=0;i<N;i++){
a[i] = i-1e3;
ans += a[i];
}
for(int i=0;i<NUM_THREADS;i++){
int rt = pthread_create(&threads[i], NULL, array_sum, (void*)(a+i*step));
if(rt) {
printf("ERROR! Create thread%d failed and error code is%d.\n",i,rt);
exit(-1);
}
}
for(int i=0;i<NUM_THREADS;i++){
pthread_join(threads[i], NULL);
}
printf("res = %d and ans = %d\n", res, ans);
free(a);
pthread_mutex_destroy(&mtx);
return 0;
}

消费者与生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<string.h>

const int NUM_THREADS = 5;
const int BUFFER_SIZE = 30;

char *buffer;
int done = 0;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t filled = PTHREAD_COND_INITIALIZER;
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;

void *consumer(){
sleep(1);
int cnt = 0;
while(cnt<NUM_THREADS-1){
pthread_mutex_lock(&mtx);
while(done==0){
pthread_cond_wait(&filled,&mtx);
}
printf("%s\n", buffer);
done = 0;
pthread_cond_signal(&empty);
pthread_mutex_unlock(&mtx);
cnt++;
}
printf("All of the data have been received.\n");
pthread_exit(NULL);
}

void *producer(void *tid){
sleep(2);
pthread_mutex_lock(&mtx);
while(done==1){
pthread_cond_wait(&empty,&mtx);
}
sprintf(buffer,"data written by thread%d",(int)tid);
done = 1;
pthread_cond_signal(&filled);
pthread_mutex_unlock(&mtx);
pthread_exit(NULL);
}


int main(){
pthread_t threads[NUM_THREADS];
buffer = malloc(sizeof(char)*BUFFER_SIZE);

for(int i=0;i<NUM_THREADS-1;i++){
int rc = pthread_create(&threads[i],NULL,producer, (void *)i);
if(rc){
printf("Create thread failed and error code is %d\n",rc);
exit(-1);
}
}

int rc = pthread_create(&threads[NUM_THREADS-1],NULL,consumer,NULL);
if(rc){
printf("Create thread failed and error code is %d\n",rc);
exit(-1);
}

for(int i=0;i<NUM_THREADS;i++){
pthread_join(threads[i],NULL);
}
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&filled);
pthread_cond_destroy(&empty);
free(buffer);
return 0;
}