信号量的实现和应用 编写程序解决生产者—消费者问题 在 Ubuntu 上编写应用程序 pc.c ,解决经典的生产者—消费者问题,完成下面的功能:
建立一个生产者进程, N 个消费者进程 (N > 1);
用文件建立一个共享缓冲区;
生产者进程依次向缓冲区写入整数 0, 1, 2, …, M - 1, M >= 500;
消费者进程从缓冲区读数,每次读一个,并将读出的数字从缓冲区删除,然后将本进程 ID 和数字输出到标准输出;
缓冲区同时最多只能保存 10 个数。
编写 Ubuntu 中运行的 pc.c:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 #include <semaphore.h> #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/wait.h> #include <unistd.h> #define BUFFER_SIZE 10 #define M 500 #define N 5 sem_t *empty, *full, *mutex; int fd, buffer_in, buffer_out; void producer() { // produce M items for (int i = 0; i < M; ++i) { sem_wait(empty); sem_wait(mutex); lseek(fd, buffer_in * sizeof(int), SEEK_SET); write(fd, &i, sizeof(i)); sem_post(mutex); sem_post(full); buffer_in = (buffer_in + 1) % BUFFER_SIZE; } } void consumer() { int item; for (int i = 0; i < M/N; ++i) { sem_wait(full); sem_wait(mutex); // Get index of item lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET); read(fd, &buffer_out, sizeof(buffer_out)); // Read item from buffer lseek(fd, buffer_out * sizeof(int), SEEK_SET); read(fd, &item, sizeof(item)); // Update index buffer_out = (buffer_out + 1) % BUFFER_SIZE; lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET); write(fd, &buffer_out, sizeof(buffer_out)); // Print item printf("%d: %d\n", getpid(), item); fflush(stdout); sem_post(mutex); sem_post(empty); } } int main() { // Create named semaphores empty = sem_open("empty", O_CREAT | O_EXCL, 0666, BUFFER_SIZE); full = sem_open("full", O_CREAT | O_EXCL, 0666, 0); mutex = sem_open("mutex", O_CREAT | O_EXCL, 0666, 1); // Create buffer file fd = open("buffer.txt", O_CREAT | O_TRUNC | O_RDWR, 0666); if (fd == -1) { perror("Open file failed\n"); exit(1); } // Initialize buffer_in = 0; buffer_out = 0; lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET); write(fd, &buffer_out, sizeof(buffer_out)); int pid = fork(); if (pid < 0) { perror("Fork failed\n"); exit(1); } else if (pid == 0) { // child process as producer producer(); exit(0); } for (int i = 0; i < N; ++i) { pid = fork(); if (pid == 0) { // child process as consumers consumer(); exit(0); } } // wait for all child processes to finish while (wait(NULL) > 0); // Close buffer file close(fd); // Close and unlink named semaphores sem_close(empty); sem_close(full); sem_close(mutex); sem_unlink("empty"); sem_unlink("full"); sem_unlink("mutex"); return 0; }
在 Ubuntu 中使用 gcc 编译 pc.c 并执行,输出重定向至 a.out:
./pc > a.out
a.out 中部分输出如图 1, 2 所示:
需注意的是,由于需要按序输出,生产者和消费者均需维护上次操作的位置,对于单独的生产者可用局部变量实现,而多个进程的消费者们需使用可共同访问的变量。为存放进程间共享变量,不可在 pc.c 中草率的定义全局变量(与线程不同),否则在 fork() 后一旦变量改变,OS 和硬件会采用 Write-On-Copy 机制复制一份变量并更改,导致进程间的全局变量相互独立,互不影响。为解决该问题,既可通过在进程间共享的文件中某一固定位置存放变量,也可将不同进程的变量通过系统调用映射至同一物理地址。由于该程序将在 linux 0.11 中运行,而其并未实现对应系统调用,故采用方法一 —— 在充当缓冲区的 buffer.txt 末尾存放该变量。
实现信号量 信号量相关系统调用定义如下:
sem_t *sem_open(const char *name, unsigned int value); int sem_wait(sem_t *sem); int sem_post(sem_t *sem); int sem_unlink(const char *name);
在 kernel 目录下创建 sem.c 文件实现相关系统调用,include/linux 目录下创建 sem.h 文件定义相关数据结构:sem.h 中定义的数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #ifndef _SEM_H #define _SEM_H #define SEM_NAME_LEN 14 typedef struct sem_t { char name[SEM_NAME_LEN]; int value; struct task_struct *wait_task; } sem_t; typedef struct sem_node_t { sem_t *sem; struct sem_node_t *next; } sem_node_t; typedef struct sem_link_t { sem_node_t *head; sem_node_t *tail; unsigned int node_num; } sem_link_t; sem_t *sys_sem_open(const char *name, unsigned int value); int sys_sem_wait(sem_t *sem); int sys_sem_post(sem_t *sem); int sys_sem_unlink(const char *name); #endif /* _SEM_H */
采用链表维护信号量集合,将链表封装在结构体 sem_link_t 中。sem.c 中链表相关实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 // Static initialization of unused head node sem_node_t head_node = { .sem = NULL, .next = NULL }; // Static initialization of semaphore link sem_link_t sem_link = { .head = &head_node, .tail = &head_node, .node_num = 0 }; // Create a semaphore sem_t *sem_create(const char *name, int value) { sem_t *sem = (sem_t *)malloc(sizeof(sem_t)); if (sem == NULL) { return NULL; } strcpy(sem->name, name); sem->value = value; sem->wait_task = NULL; return sem; } // Create a semaphore node sem_node_t* sem_node_create(sem_t *sem) { sem_node_t *node = (sem_node_t *)malloc(sizeof(sem_node_t)); if (node == NULL) { return NULL; } node->sem = sem; node->next = NULL; return node; } // Add node to semaphore link void sem_link_add(sem_node_t *node) { sem_link.tail->next = node; sem_link.tail = node; sem_link.node_num++; } // Delete node from semaphore link void sem_link_delete(sem_node_t *node) { sem_node_t *current = sem_link.head->next; sem_node_t *prev = sem_link.head; while (current) { if (current == node) { prev->next = current->next; sem_link.node_num--; free(node); return; } prev = current; current = current->next; } }
sem.c 中信号量相关实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 // Open a semaphore sem_t *sys_sem_open(const char *name,unsigned int value) { char name_buf[SEM_NAME_LEN]; int i = 0, n = 0; // Name cannot be NULL if (name == NULL) return NULL; while (i < SEM_NAME_LEN) { name_buf[i] = get_fs_byte(name + i); if (name_buf[i] == '\0') break; i++; } // Length of name out of range if (i == SEM_NAME_LEN) return NULL; // Check if semaphore already exists sem_node_t *p = sem_link.head->next; while (p) { if (strcmp(name_buf, p->sem->name) == 0) { return p->sem; } p = p->next; } // Create a new semaphore sem_t *sem = sem_create(name_buf, value); // Add node to semaphore link sem_link_add(sem_node_create(sem)); return sem; } // Try to wait on a semaphore int sys_sem_wait(sem_t *sem) { cli(); while(sem->value <= 0) { sleep_on(&(sem->wait_task)); } sem->value--; sti(); return 0; } // Post on a semaphore int sys_sem_post(sem_t *sem) { if (sem == NULL) return -1; // Check if semaphore in sem_link sem_node_t *p = sem_link.head->next; while (p) { if (p->sem == sem) break; p = p->next; } // Not in sem_link if (p == NULL) return -1; cli(); sem->value++; if (sem->value > 0) wake_up(&(sem->wait_task)); sti(); return 0; } // Delete a semaphore int sys_sem_unlink(const char *name) { char name_buf[SEM_NAME_LEN]; int i = 0, n = 0; // Name cannot be NULL if (name == NULL) return -1; while (i < SEM_NAME_LEN) { name_buf[i] = get_fs_byte(name + i); if (name_buf[i] == '\0') break; i++; } // Length of name out of range if (i == SEM_NAME_LEN) return -1; // Check if semaphore already exists sem_node_t *p = sem_link.head->next; while (p) { if (strcmp(name_buf, p->sem->name) == 0) break; p = p->next; } // No semaphore matches the name if(p == NULL) return -1; // Check if semaphore is in use if (p->sem->wait_task) return -1; // Delete semaphore free(p->sem); // Delete semaphore node sem_link_delete(p); return 0; }
在 sem_wait 编写过程中遇到了问题,以下是我的原始代码:
1 2 3 4 5 6 7 8 9 10 11 12 // Try to wait on a semaphore int sys_sem_wait(sem_t *sem) { cli(); while(sem->value <= 0) { sti(); sleep_on(&(sem->wait_task)); cli(); } sem->value--; sti(); return 0; }
首先,我将 cli 理解为关闭所有进程的外部中断,这会导致调用 sleep_on 睡眠并切换至其他进程后依旧处于关中断状态,这显然是不正确的。所以我在源代码调用 sleep_on 前打开中断,返回后关闭中断,并在测试程序中得到了理想的输出,然而某一瞬间我突然意识到该实现存在问题:若在开中断后,调用函数 sleep_on 前发生时钟中断切换至其他进程,并在 sem_post 函数执行过程中唤醒了信号量的等待队列,导致等待队列被提前唤醒,那么可能导致 sleep_on 永远阻塞等待队列。 可将 sleep_on 换为 interruptable_sleep_on,但这会导致 while 的多次循环,且与信号量的定义不符,一时之间陷入了两难境地。 然而,在指导书中有着提示:
1 2 3 4 5 6 7 8 static inline void lock_buffer(struct buffer_head * bh) { cli(); //关中断 while (bh->b_lock) sleep_on(&bh->b_wait); // 将当前进程睡眠在 bh->b_wait bh->b_lock=1; sti(); //开中断 }
该实现显然与我对 cli 的理解相冲突,通过查看 x86 手册发现 cli 命令会改变 EFLAGS 寄存器中的 IF 位,而 EFLAGS 寄存器显然包括在进程的上下文中,故执行 cli 命令只会导致当前进程关闭外部中断,而不会改变其他进程的中断状态。在 sem_wait 关中断后只会保证当前进程不会触发时钟中断,依旧可通过 schedule 手动调度至其他进程并正常触发其时钟中断。
测试程序 在测试程序前,还需将 sem.h 中 sem 的结构体定义添加至文件系统的 usr/include/sem.h 中,并在 usr/include/unistd.h 中更新系统调用编号。 编写 linux 0.11 中运行的 pc.c:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 #define __LIBRARY__ #include <unistd.h> #include <sem.h> #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/wait.h> _syscall2(sem_t *, sem_open, const char *, name, unsigned int, value); _syscall1(int, sem_wait, sem_t *, sem); _syscall1(int, sem_post, sem_t *, sem); _syscall1(int, sem_unlink, const char *, name); #define BUFFER_SIZE 10 #define M 500 #define N 5 sem_t *empty, *full, *mutex; int fd, buffer_in, buffer_out; void producer() { /* produce M items */ int i; for (i = 0; i < M; ++i) { sem_wait(empty); sem_wait(mutex); lseek(fd, buffer_in * sizeof(int), SEEK_SET); write(fd, &i, sizeof(i)); sem_post(mutex); sem_post(full); buffer_in = (buffer_in + 1) % BUFFER_SIZE; } } void consumer() { int item, i; for (i = 0; i < M/N; ++i) { sem_wait(full); sem_wait(mutex); /* Get index of item */ lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET); read(fd, &buffer_out, sizeof(buffer_out)); /* Read item from buffer */ lseek(fd, buffer_out * sizeof(int), SEEK_SET); read(fd, &item, sizeof(item)); /* Update index */ buffer_out = (buffer_out + 1) % BUFFER_SIZE; lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET); write(fd, &buffer_out, sizeof(buffer_out)); /* Print item */ printf("%d: %d\n", getpid(), item); fflush(stdout); sem_post(mutex); sem_post(empty); } } int main() { int i; pid_t pid; /* Create named semaphores */ empty = sem_open("empty", BUFFER_SIZE); full = sem_open("full", 0); mutex = sem_open("mutex", 1); /* Create buffer file */ fd = open("buffer.txt", O_CREAT | O_TRUNC | O_RDWR, 0666); if (fd == -1) { perror("Open file failed\n"); exit(1); } /* Initialize */ buffer_in = 0; buffer_out = 0; lseek(fd, BUFFER_SIZE * sizeof(int), SEEK_SET); write(fd, &buffer_out, sizeof(buffer_out)); pid = fork(); if (pid < 0) { perror("Fork failed\n"); exit(1); } else if (pid == 0) { /* Child process as producer */ producer(); exit(0); } for (i = 0; i < N; ++i) { pid = fork(); if (pid == 0) { /* Child process as consumers */ consumer(); exit(0); } } /* Wait for all child processes to finish */ while (wait(NULL) > 0); /* Close buffer file */ close(fd); /* Close and unlink named semaphores */ sem_unlink("empty"); sem_unlink("full"); sem_unlink("mutex"); return 0; }
需要注意,由于 linux 0.11 中 gcc 的版本过低,不支持很多 C 语言的便捷语法如:
for (int i = 0; i < X; i++) //需提前定义变量 i: int i; int pid = fork() //需提前定义变量 pid: pid_t pid;
注释同样不能使用 “// ···” 而是需要使用 “/* ··· */“。
在 linux 0.11 中使用 gcc 编译 pc.c 并执行,输出重定向至 a.out:
./pc > a.out
a.out 中部分输出如图 3, 4 所示:
相关问题 在 pc.c 中去掉所有与信号量有关的代码,再运行程序,执行效果有变化吗?为什么会这样? 很简单的临界区问题,读入、计算、写回操作的非原子性可能导致错误的调度,一个进程读入的值可能不是最新的值,而是另一进程还未写回,导致提供了未执行的假象。实验的设计者在第一次编写生产者——消费者程序的时候,是这么做的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Producer() { P(Mutex); //互斥信号量 // 生产一个产品item; P(Empty); //空闲缓存资源 // 将item放到空闲缓存中; V(Full); //产品资源 V(Mutex); } Consumer() { P(Mutex); P(Full); // 从缓存区取出一个赋值给item; V(Empty); // 消费产品item; V(Mutex); }
这样可行吗?如果可行,那么它和标准解法在执行效果上会有什么不同?如果不可行,那么它有什么问题使它不可行? 经典的死锁问题,某一进程通过互斥信号量进入临界区后,若需等待资源陷入阻塞,由于未释放互斥锁导致生产该资源的进程无法进入临界区,任务永不可能继续推进,陷入死锁。