首页 > 技术文章 > 5、并发控制

Lioker 2019-05-12 18:35 原文

 

一、并发与竞态

并发是指一段时间内有多个程序执行,但任一个时刻点上只有一个程序在运行

并发就会导致一个问题:假设程序A对一个文件写入3000个字符“a”,而另一个程序B对这个文件写入3000个“b”,第三个程序C读取这个文件,会导致读取数据不一定是什么

因为可能在一段时间内先执行了A;当A执行到一半CPU切换到执行B了,这时就会导致数据混乱

 

解决这个问题的途径是保证对共享资源的互斥访问。如程序A向文件中写入字符,那么B就无法访问这个文件

访问共享资源的代码区称为临界区,它需要使用互斥机制保护,途径有中断屏蔽、原子操作、自旋锁、信号量和互斥体等

 

在第二章点亮LED 中简述过的readb()类函数就使用了内存屏蔽指令

 

 

二、中断屏蔽

CPU一般都具备屏蔽中断和打开中断的功能,这项功能可以保证正在执行的内核执行路径不被中断处理程序所抢占,防止某些竞态条件的发生。具体而言,中断屏蔽使得中断与进程之间的并发不再发生,而且,由于Linux内核的进程调度等操作都依赖中断来实现,内核抢占进程之间的并发也得以避免了

使用方法如下:

local_irq_disable() /* 屏蔽中断 */
...
/* 临界区*/
...
local_irq_enable() /* 开中断*/

 

为了保证系统运行,中断屏蔽的时间尽量要少

 

 

三、原子操作

原子操作可以保证对一个整型数据的修改是排他性的。原子操作系列函数使用方法如下:

atomic_t v = ATOMIC_INIT(0);    /* 定义原子变量v = 0 */

atomic_read(&v);    /* 读取原子变量值 */

atomic_add(i, &v);    /* 原子变量 + i */
atomic_sub(i, &v);    /* 原子变量 - i */

atomic_inc(&v);    /* 原子变量 + 1 */
atomic_dec(&v);    /* 原子变量 - 1 */

int atomic_inc_and_test(&v);    /* 原子变量自加并测试是否为0,为0返回1 */
int atomic_dec_and_test(&v);    /* 原子变量自减并测试是否为0 */
int atomic_sub_and_test(i, &v);    /* 原子变量 - i后测试是否为0 */

 

示例如下: 

 1 /* 使用原子变量使设备只能被一个进程打开 */
 2 static atomic_t v = ATOMIC_INIT(1);
 3 
 4 static int key_open(struct inode *nodep, struct file *filp)
 5 {
 6     if (!atomic_dec_and_test(&v)) /* 已经有进程打开 */ {
 7         atomic_inc(&v);        /* 重新加1 */
 8         return -EBUSY;
 9     }
10     ...
11 }
12 
13 static int key_release(struct inode *nodep, struct file *filp)
14 {
15     atomic_inc(&v);
16     ...
17 }

 

 

四、自旋锁

自旋锁(Spin Lock)是一种典型的对临界资源进行互斥访问的手段,工作原理是测试并设置(Test-And-Set)某个内存变量,如果此变量符合条件则退出

它有以下几个特点:

1. 自旋锁是忙等锁。若锁不可用,CPU会一直循环执行检测,相当于while(1)。因此适用于临界区代码小的情况

2. 可能导致系统死锁。比如递归调用自旋锁

3. 锁定期间不能调用可能导致进程调度的函数。如copy_to_user()和copy_from_user()

 

自旋锁系列函数使用方法如下:

spinlock_t lock;        /* 定义一个自旋锁*/
spin_lock_init(&lock);    /* 初始化一个自旋锁*/
spin_lock(&lock);        /* 获取自旋锁,保护临界区 */

/* 临界区*/

spin_unlock(&lock);        /* 解锁*/

 

示例如下:

 1 /* 使用自旋锁使设备只能被一个进程打开 */
 2 int count = 0;
 3 
 4 static int key_open(struct inode *nodep, struct file *filp)
 5 {
 6     spin_lock(&lock);
 7     if (count) /* 已经有进程打开 */ {
 8         spin_unlock(&lock);
 9         return -EBUSY;
10     }
11     ++count;
12     spin_unlock(&lock);
13     ...
14 }
15 
16 static int key_release(struct inode *nodep, struct file *filp)
17 {
18     spin_lock(&lock);
19     --count;
20     spin_unlock(&lock);
21     ...
22 }
23 
24 static int keys_init(void)
25 {
26     spinlock_t lock;
27     spin_lock_init(&lock);
28     ...
29 }

 

 

五、信号量

信号量使用方式类似于原子操作,系列函数使用方法如下:

struct semaphore sem;    /* 定义一个信号量 */

/* 初始化信号量 = val */
void sema_init(struct semaphore *sem, int val);

/* 获得信号量,它会导致休眠,因此不能在中断上下文中使用 */
void down(struct semaphore *sem);

/* 获得信号量,进入休眠状态的进程能被信号打断 */
int down_interruptible(struct semaphore *sem);

/* 尝试获得信号量sem,如果能够立刻获得,它就获得该信号量并返回0,否则返回非0值
 * 它不会导致调用者睡眠,可以在中断上下文中使用
 */
int down_trylock(struct semaphore *sem);

/* 释放信号量 */
void up(struct semaphore *sem);

DEFINE_SEMAPHORE(sem)可以初始化信号量并赋值为1。如果是在Linux2.6.36版本以下,应使用DECLARE_MUTEX(sem)

 

示例如下:

 1 /* 使用信号量使设备只能被一个进程打开 */
 2 static DEFINE_SEMAPHORE(lock);
 3 
 4 static int key_open(struct inode *nodep, struct file *filp)
 5 {
 6     if (!down_trylock(&lock)) /* 已经有进程打开 */ {
 7         return -EBUSY;
 8     }
 9     ...
10 }
11 
12 static int key_release(struct inode *nodep, struct file *filp)
13 {
14     up(&lock);
15     ...
16 }

 

 

六、互斥体

互斥体系列函数使用方法如下:

struct mutex lock;   /* 定义一个互斥体 */

mutex_init(&lock);   /* 初始化互斥体 */

/* 获取互斥体 */
void mutex_lock(struct mutex *lock);
int mutex_lock_interruptible(struct mutex *lock);
int mutex_trylock(struct mutex *lock);

/* 释放互斥体 */
void mutex_unlock(struct mutex *lock);

 

key源代码(本示例通过互斥体防止copy_to_user()函数竞态):

  1 #include <linux/module.h>
  2 #include <linux/fs.h>
  3 #include <linux/init.h>
  4 #include <linux/cdev.h>
  5 #include <linux/slab.h>
  6 #include <linux/device.h>
  7 #include <linux/irq.h>
  8 #include <linux/interrupt.h>
  9 #include <linux/wait.h>
 10 #include <linux/timer.h>
 11 #include <linux/gpio.h>
 12 #include <linux/sched.h>
 13 #include <linux/mutex.h>
 14 
 15 #include <asm/uaccess.h>
 16 #include <asm/irq.h>
 17 #include <asm/io.h>
 18 
 19 #include <mach/gpio.h>
 20 
 21 #define KEY_MAJOR        255
 22 
 23 struct pin_desc {
 24     int gpio;
 25     int val;
 26     char *name;
 27 };
 28 
 29 struct key_device {
 30     struct cdev cdev;
 31     wait_queue_head_t r_head;
 32     wait_queue_head_t w_head;
 33     struct mutex lock;
 34 };
 35 
 36 static struct pin_desc desc[4] = {
 37     { EXYNOS4_GPX3(2), 0x01, "KEY0" },
 38     { EXYNOS4_GPX3(3), 0x02, "KEY1" },
 39     { EXYNOS4_GPX3(4), 0x03, "KEY2" },
 40     { EXYNOS4_GPX3(5), 0x04, "KEY3" },
 41 };
 42 
 43 static int g_major = KEY_MAJOR;
 44 module_param(g_major, int, S_IRUGO);
 45 
 46 static struct key_device*    dev;
 47 static struct class*        scls;
 48 static struct device*        sdev;
 49 static unsigned char        key_val;
 50 
 51 static irqreturn_t key_interrupt(int irq, void *dev_id)
 52 {
 53     struct pin_desc *pindesc = (struct pin_desc *)dev_id;
 54     unsigned int tmp;
 55 
 56     tmp = gpio_get_value(pindesc->gpio);
 57 
 58     /* active low */
 59     printk(KERN_DEBUG "KEY %d: %08x\n", pindesc->val, tmp);
 60 
 61     if (tmp)
 62         key_val = pindesc->val;
 63     else
 64         key_val = pindesc->val | 0x80;
 65 
 66     set_current_state(TASK_RUNNING);
 67 
 68     return IRQ_HANDLED;
 69 }
 70 
 71 static ssize_t key_read(struct file *filp, char __user *buf, size_t len, loff_t * loff)
 72 {
 73     struct key_device *dev = filp->private_data;
 74 
 75     // 声明等待队列
 76     DECLARE_WAITQUEUE(rwait, current);    
 77     mutex_lock(&dev->lock);        // 上锁
 78     add_wait_queue(&dev->r_head, &rwait);
 79 
 80     // 休眠
 81     __set_current_state(TASK_INTERRUPTIBLE);
 82     mutex_unlock(&dev->lock);    // 解锁
 83     schedule();
 84 
 85     // 有数据
 86     mutex_lock(&dev->lock);        // 上锁
 87     copy_to_user(buf, &key_val, 1);
 88     mutex_unlock(&dev->lock);    // 解锁
 89 
 90     remove_wait_queue(&dev->r_head, &rwait);
 91     set_current_state(TASK_RUNNING);
 92 
 93     return 1;
 94 }
 95 
 96 static int key_open(struct inode *nodep, struct file *filp)
 97 {
 98     struct key_device *dev = container_of(nodep->i_cdev, struct key_device, cdev);
 99     // 放入私有数据中
100     filp->private_data = dev;
101 
102     int irq;
103     int i, err = 0;
104 
105     for (i = 0; i < ARRAY_SIZE(desc); i++) {
106         if (!desc[i].gpio)
107             continue;
108 
109         irq = gpio_to_irq(desc[i].gpio);
110         err = request_irq(irq, key_interrupt, IRQ_TYPE_EDGE_BOTH, 
111                 desc[i].name, (void *)&desc[i]);
112         if (err)
113             break;
114     }
115     
116     if (err) {
117         i--;
118         for (; i >= 0; i--) {
119             if (!desc[i].gpio)
120                 continue;
121 
122             irq = gpio_to_irq(desc[i].gpio);
123             free_irq(irq, (void *)&desc[i]);
124         }
125         return -EBUSY;
126     }
127     
128     init_waitqueue_head(&dev->r_head);
129     mutex_init(&dev->lock);        // 初始化
130 
131     return 0;
132 }
133 
134 static int key_release(struct inode *nodep, struct file *filp)
135 {
136     // 释放中断
137     int irq, i;
138 
139     for (i = 0; i < ARRAY_SIZE(desc); i++) {
140         if (!desc[i].gpio)
141             continue;
142 
143         irq = gpio_to_irq(desc[i].gpio);
144         free_irq(irq, (void *)&desc[i]);
145     }
146 
147     return 0;
148 }
149 
150 static struct file_operations key_fops = {
151     .owner    = THIS_MODULE,
152     .read    = key_read,
153     .open        = key_open,
154     .release    = key_release,
155 };
156 
157 static int keys_init(void)
158 {
159     int ret;
160     int devt;
161     if (g_major) {
162         devt = MKDEV(g_major, 0);
163         ret = register_chrdev_region(devt, 1, "key");
164     }
165     else {
166         ret = alloc_chrdev_region(&devt, 0, 1, "key");
167         g_major = MAJOR(devt);
168     }
169 
170     if (ret)
171         return ret;
172 
173     dev = kzalloc(sizeof(struct key_device), GFP_KERNEL);
174     if (!dev) {
175         ret = -ENOMEM;
176         goto fail_alloc;
177     }
178 
179     cdev_init(&dev->cdev, &key_fops);
180     ret = cdev_add(&dev->cdev, devt, 1);
181     if (ret)
182         return ret;
183 
184     scls = class_create(THIS_MODULE, "key");
185     sdev = device_create(scls, NULL, devt, NULL, "key");
186 
187     return 0;
188 
189 fail_alloc:
190     unregister_chrdev_region(devt, 1);
191 
192     return ret;
193 }
194 
195 static void keys_exit(void)
196 {
197     dev_t devt = MKDEV(g_major, 0);
198 
199     device_destroy(scls, devt);
200     class_destroy(scls);
201 
202     cdev_del(&(dev->cdev));
203     kfree(dev);
204 
205     unregister_chrdev_region(devt, 1);
206 }
207 
208 module_init(keys_init);
209 module_exit(keys_exit);
210 
211 MODULE_LICENSE("GPL");
View Code

Makefile:

 1 KERN_DIR = /work/tiny4412/tools/linux-3.5
 2 
 3 all:
 4     make -C $(KERN_DIR) M=`pwd` modules 
 5 
 6 clean:
 7     make -C $(KERN_DIR) M=`pwd` modules clean
 8     rm -rf modules.order
 9 
10 obj-m    += key.o
View Code

测试文件:

 1 #include <stdio.h>
 2 #include <unistd.h>
 3 #include <sys/types.h>
 4 #include <sys/stat.h>
 5 #include <fcntl.h>
 6 #include <string.h>
 7 
 8 int main(int argc, char** argv)
 9 {
10     int fd;
11     fd = open("/dev/key", O_RDWR);
12     if (fd < 0) {
13         printf("can't open /dev/key\n");
14         return -1;
15     }
16 
17     unsigned char key_val;
18 
19     while (1) {
20         read(fd, &key_val, 1);
21         printf("key_val = 0x%x\n", key_val);
22     }
23     
24     close(fd);
25 
26     return 0;
27 }
View Code

 

 

七、互斥体和自旋锁的选择

通过比较可知,互斥体的作用于自旋锁类似。那么什么时候使用互斥体,什么时候使用自旋锁呢?

1. 若临界区较小,宜使用自旋锁;若临界区较大,宜使用互斥体

2. 互斥体所保护的临界区可以出现包含阻塞(进程切换)的代码,但是自旋锁不可以

3. 互斥体存在于进程上下文;如果要在中断中使用,宜使用自旋锁

 

下一章  6、异步通知

 

推荐阅读