信号量的实现和应用

编写程序解决生产者—消费者问题

在 Ubuntu 上编写应用程序 pc.c ,解决经典的生产者—消费者问题,完成下面的功能:

  • 建立一个生产者进程, N 个消费者进程 (N > 1);
  • 用文件建立一个共享缓冲区;
  • 生产者进程依次向缓冲区写入整数 0, 1, 2, …, M - 1, M >= 500;
  • 消费者进程从缓冲区读数,每次读一个,并将读出的数字从缓冲区删除,然后将本进程 ID 和数字输出到标准输出;
  • 缓冲区同时最多只能保存 10 个数。

编写 Ubuntu 中运行的 pc.c:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define BUFFER_SIZE 10
#define M 500
#define N 5

sem_t *empty, *full, *mutex;
int fd, buffer_in, buffer_out;

void producer() {
// produce M items
for (int i = 0; i < M; ++i) {
sem_wait(empty);
sem_wait(mutex);

lseek(fd, buffer_in * sizeof(int), SEEK_SET);
write(fd, &i, sizeof(i));

sem_post(mutex);
sem_post(full);

buffer_in = (buffer_in + 1) % BUFFER_SIZE;
}
}

void consumer() {
int item;
for (int i = 0; i < M/N; ++i) {
sem_wait(full);
sem_wait(mutex);

// Get index of item
lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET);
read(fd, &buffer_out, sizeof(buffer_out));

// Read item from buffer
lseek(fd, buffer_out * sizeof(int), SEEK_SET);
read(fd, &item, sizeof(item));

// Update index
buffer_out = (buffer_out + 1) % BUFFER_SIZE;
lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET);
write(fd, &buffer_out, sizeof(buffer_out));

// Print item
printf("%d: %d\n", getpid(), item);
fflush(stdout);

sem_post(mutex);
sem_post(empty);
}
}

int main() {
// Create named semaphores
empty = sem_open("empty", O_CREAT | O_EXCL, 0666, BUFFER_SIZE);
full = sem_open("full", O_CREAT | O_EXCL, 0666, 0);
mutex = sem_open("mutex", O_CREAT | O_EXCL, 0666, 1);

// Create buffer file
fd = open("buffer.txt", O_CREAT | O_TRUNC | O_RDWR, 0666);
if (fd == -1) {
perror("Open file failed\n");
exit(1);
}

// Initialize
buffer_in = 0;
buffer_out = 0;
lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET);
write(fd, &buffer_out, sizeof(buffer_out));

int pid = fork();
if (pid < 0) {
perror("Fork failed\n");
exit(1);
} else if (pid == 0) {
// child process as producer
producer();
exit(0);
}

for (int i = 0; i < N; ++i) {
pid = fork();
if (pid == 0) {
// child process as consumers
consumer();
exit(0);
}
}
// wait for all child processes to finish
while (wait(NULL) > 0);

// Close buffer file
close(fd);

// Close and unlink named semaphores
sem_close(empty);
sem_close(full);
sem_close(mutex);
sem_unlink("empty");
sem_unlink("full");
sem_unlink("mutex");

return 0;
}

在 Ubuntu 中使用 gcc 编译 pc.c 并执行,输出重定向至 a.out:

./pc > a.out

a.out 中部分输出如图 1, 2 所示:
图1
图2

需注意的是,由于需要按序输出,生产者和消费者均需维护上次操作的位置,对于单独的生产者可用局部变量实现,而多个进程的消费者们需使用可共同访问的变量。为存放进程间共享变量,不可在 pc.c 中草率的定义全局变量(与线程不同),否则在 fork() 后一旦变量改变,OS 和硬件会采用 Write-On-Copy 机制复制一份变量并更改,导致进程间的全局变量相互独立,互不影响。为解决该问题,既可通过在进程间共享的文件中某一固定位置存放变量,也可将不同进程的变量通过系统调用映射至同一物理地址。由于该程序将在 linux 0.11 中运行,而其并未实现对应系统调用,故采用方法一 —— 在充当缓冲区的 buffer.txt 末尾存放该变量。

实现信号量

信号量相关系统调用定义如下:

sem_t *sem_open(const char *name, unsigned int value);
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_unlink(const char *name);

在 kernel 目录下创建 sem.c 文件实现相关系统调用,include/linux 目录下创建 sem.h 文件定义相关数据结构:
sem.h 中定义的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#ifndef _SEM_H
#define _SEM_H

#define SEM_NAME_LEN 14

typedef struct sem_t {
char name[SEM_NAME_LEN];
int value;
struct task_struct *wait_task;
} sem_t;

typedef struct sem_node_t {
sem_t *sem;
struct sem_node_t *next;
} sem_node_t;

typedef struct sem_link_t {
sem_node_t *head;
sem_node_t *tail;
unsigned int node_num;
} sem_link_t;

sem_t *sys_sem_open(const char *name, unsigned int value);
int sys_sem_wait(sem_t *sem);
int sys_sem_post(sem_t *sem);
int sys_sem_unlink(const char *name);

#endif /* _SEM_H */

采用链表维护信号量集合,将链表封装在结构体 sem_link_t 中。
sem.c 中链表相关实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// Static initialization of unused head node
sem_node_t head_node = {
.sem = NULL,
.next = NULL
};

// Static initialization of semaphore link
sem_link_t sem_link = {
.head = &head_node,
.tail = &head_node,
.node_num = 0
};

// Create a semaphore
sem_t *sem_create(const char *name, int value) {
sem_t *sem = (sem_t *)malloc(sizeof(sem_t));
if (sem == NULL) {
return NULL;
}
strcpy(sem->name, name);
sem->value = value;
sem->wait_task = NULL;
return sem;
}

// Create a semaphore node
sem_node_t* sem_node_create(sem_t *sem) {
sem_node_t *node = (sem_node_t *)malloc(sizeof(sem_node_t));
if (node == NULL) {
return NULL;
}
node->sem = sem;
node->next = NULL;
return node;
}

// Add node to semaphore link
void sem_link_add(sem_node_t *node) {
sem_link.tail->next = node;
sem_link.tail = node;
sem_link.node_num++;
}

// Delete node from semaphore link
void sem_link_delete(sem_node_t *node) {
sem_node_t *current = sem_link.head->next;
sem_node_t *prev = sem_link.head;
while (current) {
if (current == node) {
prev->next = current->next;
sem_link.node_num--;
free(node);
return;
}
prev = current;
current = current->next;
}
}

sem.c 中信号量相关实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Open a semaphore
sem_t *sys_sem_open(const char *name,unsigned int value) {
char name_buf[SEM_NAME_LEN];
int i = 0, n = 0;

// Name cannot be NULL
if (name == NULL)
return NULL;

while (i < SEM_NAME_LEN) {
name_buf[i] = get_fs_byte(name + i);
if (name_buf[i] == '\0') break;
i++;
}

// Length of name out of range
if (i == SEM_NAME_LEN)
return NULL;

// Check if semaphore already exists
sem_node_t *p = sem_link.head->next;
while (p) {
if (strcmp(name_buf, p->sem->name) == 0) {
return p->sem;
}
p = p->next;
}

// Create a new semaphore
sem_t *sem = sem_create(name_buf, value);
// Add node to semaphore link
sem_link_add(sem_node_create(sem));
return sem;
}

// Try to wait on a semaphore
int sys_sem_wait(sem_t *sem) {
cli();
while(sem->value <= 0) {
sleep_on(&(sem->wait_task));
}
sem->value--;
sti();
return 0;
}

// Post on a semaphore
int sys_sem_post(sem_t *sem) {
if (sem == NULL)
return -1;

// Check if semaphore in sem_link
sem_node_t *p = sem_link.head->next;
while (p) {
if (p->sem == sem) break;
p = p->next;
}
// Not in sem_link
if (p == NULL)
return -1;

cli();
sem->value++;
if (sem->value > 0)
wake_up(&(sem->wait_task));
sti();
return 0;
}

// Delete a semaphore
int sys_sem_unlink(const char *name) {
char name_buf[SEM_NAME_LEN];
int i = 0, n = 0;

// Name cannot be NULL
if (name == NULL)
return -1;

while (i < SEM_NAME_LEN) {
name_buf[i] = get_fs_byte(name + i);
if (name_buf[i] == '\0') break;
i++;
}

// Length of name out of range
if (i == SEM_NAME_LEN)
return -1;

// Check if semaphore already exists
sem_node_t *p = sem_link.head->next;
while (p) {
if (strcmp(name_buf, p->sem->name) == 0)
break;
p = p->next;
}

// No semaphore matches the name
if(p == NULL)
return -1;

// Check if semaphore is in use
if (p->sem->wait_task)
return -1;

// Delete semaphore
free(p->sem);
// Delete semaphore node
sem_link_delete(p);
return 0;
}

在 sem_wait 编写过程中遇到了问题,以下是我的原始代码:

1
2
3
4
5
6
7
8
9
10
11
12
// Try to wait on a semaphore
int sys_sem_wait(sem_t *sem) {
cli();
while(sem->value <= 0) {
sti();
sleep_on(&(sem->wait_task));
cli();
}
sem->value--;
sti();
return 0;
}

首先,我将 cli 理解为关闭所有进程的外部中断,这会导致调用 sleep_on 睡眠并切换至其他进程后依旧处于关中断状态,这显然是不正确的。所以我在源代码调用 sleep_on 前打开中断,返回后关闭中断,并在测试程序中得到了理想的输出,然而某一瞬间我突然意识到该实现存在问题:若在开中断后,调用函数 sleep_on 前发生时钟中断切换至其他进程,并在 sem_post 函数执行过程中唤醒了信号量的等待队列,导致等待队列被提前唤醒,那么可能导致 sleep_on 永远阻塞等待队列。 可将 sleep_on 换为 interruptable_sleep_on,但这会导致 while 的多次循环,且与信号量的定义不符,一时之间陷入了两难境地。
然而,在指导书中有着提示:

1
2
3
4
5
6
7
8
static inline void lock_buffer(struct buffer_head * bh)
{
cli(); //关中断
while (bh->b_lock)
sleep_on(&bh->b_wait); // 将当前进程睡眠在 bh->b_wait
bh->b_lock=1;
sti(); //开中断
}

该实现显然与我对 cli 的理解相冲突,通过查看 x86 手册发现 cli 命令会改变 EFLAGS 寄存器中的 IF 位,而 EFLAGS 寄存器显然包括在进程的上下文中,故执行 cli 命令只会导致当前进程关闭外部中断,而不会改变其他进程的中断状态。在 sem_wait 关中断后只会保证当前进程不会触发时钟中断,依旧可通过 schedule 手动调度至其他进程并正常触发其时钟中断。

测试程序

在测试程序前,还需将 sem.h 中 sem 的结构体定义添加至文件系统的 usr/include/sem.h 中,并在 usr/include/unistd.h 中更新系统调用编号。
编写 linux 0.11 中运行的 pc.c:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#define __LIBRARY__
#include <unistd.h>
#include <sem.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>

_syscall2(sem_t *, sem_open, const char *, name, unsigned int, value);
_syscall1(int, sem_wait, sem_t *, sem);
_syscall1(int, sem_post, sem_t *, sem);
_syscall1(int, sem_unlink, const char *, name);

#define BUFFER_SIZE 10
#define M 500
#define N 5

sem_t *empty, *full, *mutex;
int fd, buffer_in, buffer_out;

void producer() {
/* produce M items */
int i;
for (i = 0; i < M; ++i) {
sem_wait(empty);
sem_wait(mutex);

lseek(fd, buffer_in * sizeof(int), SEEK_SET);
write(fd, &i, sizeof(i));

sem_post(mutex);
sem_post(full);

buffer_in = (buffer_in + 1) % BUFFER_SIZE;
}
}

void consumer() {
int item, i;
for (i = 0; i < M/N; ++i) {
sem_wait(full);
sem_wait(mutex);

/* Get index of item */
lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET);
read(fd, &buffer_out, sizeof(buffer_out));

/* Read item from buffer */
lseek(fd, buffer_out * sizeof(int), SEEK_SET);
read(fd, &item, sizeof(item));

/* Update index */
buffer_out = (buffer_out + 1) % BUFFER_SIZE;
lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET);
write(fd, &buffer_out, sizeof(buffer_out));

/* Print item */
printf("%d: %d\n", getpid(), item);
fflush(stdout);

sem_post(mutex);
sem_post(empty);
}
}

int main() {
int i;
pid_t pid;

/* Create named semaphores */
empty = sem_open("empty", BUFFER_SIZE);
full = sem_open("full", 0);
mutex = sem_open("mutex", 1);

/* Create buffer file */
fd = open("buffer.txt", O_CREAT | O_TRUNC | O_RDWR, 0666);
if (fd == -1) {
perror("Open file failed\n");
exit(1);
}

/* Initialize */
buffer_in = 0;
buffer_out = 0;
lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET);
write(fd, &buffer_out, sizeof(buffer_out));

pid = fork();
if (pid < 0) {
perror("Fork failed\n");
exit(1);
} else if (pid == 0) {
/* Child process as producer */
producer();
exit(0);
}

for (i = 0; i < N; ++i) {
pid = fork();
if (pid == 0) {
/* Child process as consumers */
consumer();
exit(0);
}
}

/* Wait for all child processes to finish */
while (wait(NULL) > 0);

/* Close buffer file */
close(fd);

/* Close and unlink named semaphores */
sem_unlink("empty");
sem_unlink("full");
sem_unlink("mutex");

return 0;
}

需要注意,由于 linux 0.11 中 gcc 的版本过低,不支持很多 C 语言的便捷语法如:

for (int i = 0; i < X; i++) //需提前定义变量 i: int i;
int pid = fork() //需提前定义变量 pid: pid_t pid;

注释同样不能使用 “// ···” 而是需要使用 “/* ··· */“。

在 linux 0.11 中使用 gcc 编译 pc.c 并执行,输出重定向至 a.out:

./pc > a.out

a.out 中部分输出如图 3, 4 所示:
图3
图4

相关问题

在 pc.c 中去掉所有与信号量有关的代码,再运行程序,执行效果有变化吗?为什么会这样?
很简单的临界区问题,读入、计算、写回操作的非原子性可能导致错误的调度,一个进程读入的值可能不是最新的值,而是另一进程还未写回,导致提供了未执行的假象。
实验的设计者在第一次编写生产者——消费者程序的时候,是这么做的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Producer()
{
P(Mutex); //互斥信号量
// 生产一个产品item;
P(Empty); //空闲缓存资源
// 将item放到空闲缓存中;
V(Full); //产品资源
V(Mutex);
}

Consumer()
{
P(Mutex);
P(Full);
// 从缓存区取出一个赋值给item;
V(Empty);
// 消费产品item;
V(Mutex);
}

这样可行吗?如果可行,那么它和标准解法在执行效果上会有什么不同?如果不可行,那么它有什么问题使它不可行?
经典的死锁问题,某一进程通过互斥信号量进入临界区后,若需等待资源陷入阻塞,由于未释放互斥锁导致生产该资源的进程无法进入临界区,任务永不可能继续推进,陷入死锁。