多线程是加速程序计算的有效方式,Python的多线程模块threading上手快速简单,学习动手操作了一遍,这里记录一下。
1 Threading
1.1 添加线程
import threading#获取已激活的线程数print(threading.active_count()) #1#查看所有线程信息print(threading.enumerate()) #[<_MainThread(MainThread, started 18496)>]#查看现在正在运行的线程print(threading.current_thread()) #<_MainThread(MainThread, started 18496)>import threadingdef thread_job(): print('This is a thread of %s' % threading.current_thread())def runMain(): thread = threading.Thread(target=thread_job,) #定义线程 thread.start() #线程开始工作if __name__ == '__main__': runMain()#输出This is a thread of
1.2 join功能
不加join功能,线程任务还未完成便输出all done。
import threadingimport timedef thread_job(): print('T1 start.\n') for i in range(10): time.sleep(0.1) #任务间隔0.1秒 print('T1 finish.\n')add_thread = threading.Thread(target=thread_job, name='T1')add_thread.start()print('all done.\n')#输出T1 start.all done.T1 finish.
若要遵循顺序,在启动线程后调用join , 使用join控制多个线程的执行顺序,效果如下。
import threadingimport timedef thread_job(): print('T1 start.\n') for i in range(10): time.sleep(0.1) #任务间隔0.1秒 print('T1 finish.\n')add_thread = threading.Thread(target=thread_job, name='T1')add_thread.start()add_thread.join()print('all done.\n')#输出T1 start.T1 finish.all done.
1.3 存储进程结果Queue
将数据列表中的数据传入,使用四个线程处理,将结果保存在Queue中,线程执行完后,从Queue中获取存储的结果
#导入线程 队列的标准模块import threadingimport timefrom queue import Queue
定义一个被多线程调用的函数:函数的参数时一个列表l和一个队列q,函数的功能是对列表的每个元素进行平方计算,将结果保存在队列中
def job(l,q): for i in range(len(l)): l[i] = l[i]**2 q.put(l)
定义一个多线程函数:在多线程函数中定义一个Queue ,用来保存返回值 ,代替return ,定义一个多线程列表 ,初始化一个多维数据列表
def mulithreading(): q = Queue() #q中存放返回值 代替return的返回值 threads = [] data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]
在多线程函数中定义四个线程,启动线程,将每个线程添加到多线程的列表中
for i in range(4): #定义四个线程 t = threading.Thread(target=job,args=(data[i],q)) t.start() threads.append(t) #把每个线程append到线程列表中
分别join四个线程到主线程
for thread in threads: thread.join()
定义一个空列表results 将四个线程运行后保存在队列中的结果返回给results
results = [] for _ in range(4): results.append(q.get()) #q.get()按顺序从q中拿出一个值 print(results)
完整代码:
#导入线程 队列的标准模块import threadingimport timefrom queue import Queue#定义一个被多线程调用的函数def job(l,q): for i in range(len(l)): l[i] = l[i]**2 q.put(l)#定义一个多线程函数def mulithreading(): q = Queue() #q中存放返回值 代替return的返回值 threads = [] data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]] for i in range(4): #定义四个线程 t = threading.Thread(target=job,args=(data[i],q)) t.start() threads.append(t) #把每个线程append到线程列表中 #分别join四个线程到主线程 for thread in threads: thread.join() #定义一个空列表results 将四个线程运行后保存在队列中的结果返回给results results = [] for _ in range(4): results.append(q.get()) #q.get()按顺序从q中拿出一个值 print(results)if __name__ == '__main__': mulithreading()#输出[[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]
1.4 GIL 不一定有效率
python 的多线程 threading 有时候并不是特别理想. 最主要的原因是就是, Python 的设计上, 有一个必要的环节, 就是 Global Interpreter Lock (GIL). 这个东西让 Python 还是一次性只能处理一个东西.
import threadingfrom queue import Queueimport copyimport timedef job(l, q): res = sum(l) q.put(res)def multithreading(l): q = Queue() threads = [] for i in range(4): t = threading.Thread(target=job, args=(copy.copy(l), q), name='T%i' % i) t.start() threads.append(t) [t.join() for t in threads] total = 0 for _ in range(4): total += q.get() print(total)def normal(l): total = sum(l) print(total)if __name__ == '__main__': l = list(range(10000000)) s_t = time.time() normal(l*4) print('normal: ',time.time()-s_t) s_t = time.time() multithreading(l) print('multithreading: ', time.time()-s_t)#输出199999980000000normal: 1.7343778610229492199999980000000multithreading: 2.218825340270996
程序 threading 和 Normal 运行了一样多次的运算. 但是我们发现 threading 却没有快多少, 按理来说, 我们预期会要快3-4倍, 因为有建立4个线程, 但是并没有. 这就是其中的 GIL 在作怪.
1.5 线程锁
不使用锁
import threadingdef job1(): #全局变量A的值每次加1,循环10次 global A for i in range(10): A += 1 print('job1',A)def job2(): #全局变量A的值每次加10,循环10次 global A for i in range(10): A += 10 print('job2',A)if __name__ == '__main__': A = 0 t1 = threading.Thread(target=job1) t2 = threading.Thread(target=job2) t1.start() t2.start() t1.join() t2.join()#输出 打印结果非常混乱job1 1job1 2job1 3job1 4job2 14job1 15job2 25job1 26job2 36job1 37job2 47job1 48job2 58job1 59job2 69job1 70job2 80job2 90job2 100job2 110
使用锁
lock在不同线程使用同一共享内存时,能够确保线程之间互不影响,使用lock的方法是, 在每个线程执行运算修改共享内存之前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。import threadingdef job1(): global A;lock = threading.Lock() lock.acquire() for i in range(10): A += 1 print('job1',A) lock.release()def job2(): global A;lock = threading.Lock() lock.acquire() for i in range(10): A += 10 print('job2',A) lock.release()if __name__ == '__main__': A = 0 t1 = threading.Thread(target=job1) t2 = threading.Thread(target=job2) t1.start() t2.start() t1.join() t2.join()#输出 使用lock后 执行完一个线程后再执行另一个线程。使用lock和不使用lock,最后打印输出的结果是不同的。job1 1job1 2job1 3job1 4job1 5job1 6job1 7job1 8job1 9job1 10job2 20job2 30job2 40job2 50job2 60job2 70job2 80job2 90job2 100job2 110
2 Multiprocessing
多进程 Multiprocessing 和多线程 threading 类似, 都是在 python 中用来并行运算的。不过既然有了 threading, 为什么 Python 还要出一个 multiprocessing 呢? 因为要用来弥补 threading 的一些劣势, 比如在 threading 教程中提到的GIL, python 把 multiprocessing 和 threading 的使用方法做的几乎差不多,使用多线程发挥电脑多核系统的威力。
2.1添加Process
#导入线程进程标准模块import multiprocessing as mpimport threading as td#定义一个被线程和进程调用的函数def job(a,d): print('AA')#创建线程和进程t1=td.Thread(target=job,args=(1,2))p1=mp.Process(target=job,args=(1,2))#启动线程和进程t1.start()p1.start()#连接线程和进程t1.join()p1.join()#可以看出线程和进程的使用方式相似
完整代码
#导入线程进程标准模块import multiprocessing as mpimport threading as td#定义一个被进程调用的函数def job(a,d): print('AA')if __name__ == '__main__': p1 = mp.Process(target=job, args=(1, 2)) #创建进程 p1.start() #启动进程 p1.join() #连接进程#输出AA
2.2 存储进程输出 Queue
Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。因为多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果。
定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果#定义一个多线程调用函数def job(q): #注:该函数没有返回值 res = 0 for i in range(1000): res += i+i**2+i**3 q.put(res) #queue
定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,参数后面需要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错
p1 = mp.Process(target=job,args=(q,))p2 = mp.Process(target=job,args=(q,))
完整代码实现
import multiprocessing as mp#定义一个多线程调用函数def job(q): #注:该函数没有返回值 res = 0 for i in range(1000): res += i+i**2+i**3 q.put(res) #queueif __name__ == '__main__': q=mp.Queue() #定义一个多线程队列 存储结果 p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) p1.start() #启动线程 分两批处理 p2.start() p1.join() #连接线程 p2.join() res1=q.get() #分两批输出 将结果分别保存 res2=q.get() print(res1+res2)#输出499667166000
2.3效率对比
对比下多进程,多线程和什么都不做时的消耗时间,看看哪种方式更有效率。
import multiprocessing as mpdef job(q): res=0 for i in range(1000000): res += i + i**2 + i**3 q.put(res)#由于多进程是多核运算 多进程代码命名为multicore()def multicore(): q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print('multicore:',res1 + res2)#创建多线程import threading as tddef multithread(): q = mp.Queue() # thread可放入process同样的queue中 t1 = td.Thread(target=job, args=(q,)) t2 = td.Thread(target=job, args=(q,)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() print('multithread:', res1 + res2)#创建普通函数def normal(): res = 0 for _ in range(2): for i in range(1000000): res += i + i**2 + i**3 print('normal:', res)import timeif __name__ == '__main__': st = time.time() normal() st1 = time.time() print('normal time:', st1 - st) multithread() st2 = time.time() print('multithread time:', st2 - st1) multicore() print('multicore time:', time.time() - st2)#输出normal: 499999666667166666000000normal time: 1.6875250339508057multithread: 499999666667166666000000multithread time: 3.1562907695770264multicore: 499999666667166666000000multicore time: 1.0937612056732178
这次运行时间依然是:多进程 < 普通 < 多线程。 发现多核/多进程最快,说明在同时间运行了多个任务。 而多线程的运行时间居然比什么都不做的程序还要慢一点,说明多线程还是有短板。
2.4 进程池Pool
进程池就是将所要运行的东西,放到池子里,Python会自行解决多进程的问题
2.4.1 进程池Pool()和map()#定义一个Poolpool = mp.Pool()
有了池子之后,就可以让池子对应某一个函数,向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。
接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果res = pool.map(job, range(10))
import multiprocessing as mpdef job(x): return x*xdef multicore(): pool = mp.Pool() res = pool.map(job,range(10)) print(res)if __name__ == '__main__': multicore()#输出[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
2.4.2 自定义核数量
怎么知道Pool是否真的调用了多个核呢?可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况 打开CPU负载:活动监视器 > CPU > CPU负载(单击一下即可) Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量,def multicore(): pool = mp.Pool(processes=3) # 定义CPU核数量为3 res = pool.map(job, range(10)) print(res)
2.4.3 apply_async()
Pool除了map()外,还有可以返回结果的方式,就是apply_async()。apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get获得结果 print(res.get())#运行结果[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]4
2.4.4 apply_async()输出多个结果
在apply_async()中多传入几个值res = pool.apply_async(job, (2,3,4,)) #报错 TypeError: job() takes exactly 1 argument (3 given) 即apply_async()只能输入一组参数。
将apply_async() 放入迭代器中,定义一个新的multi_resmulti_res = [pool.apply_async(job, (i,)) for i in range(10)]
取出值时需要一个一个取出来print([res.get() for res in multi_res])
合并代码 def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get获得结果 print(res.get()) # 迭代器,i=0时apply一次,i=1时apply一次等等 multi_res = [pool.apply_async(job, (i,)) for i in range(10)] # 从迭代器中取出 print([res.get() for res in multi_res])#运行结果[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()4[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res
- Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
- map() 放入迭代参数,返回多个结果
- apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代
2.5 共享内存 shared memory
2.5.1 Shared Value
使用Value数据存储在一个共享的内存表中。import multiprocessing as mpvalue1 = mp.Value('i', 0)value2 = mp.Value('d', 3.14)
其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型
2.5.2 Shared Array 在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据array = mp.Array('i', [1, 2, 3, 4])
这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。 2.6 进程锁
2.6.1 不加锁
import multiprocessing as mpdef job(x): return x*xdef multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get获得结果 print(res.get()) # 迭代器,i=0时apply一次,i=1时apply一次等等 multi_res = [pool.apply_async(job, (i,)) for i in range(10)] # 从迭代器中取出 print([res.get() for res in multi_res])if __name__ == '__main__': multicore()#输出145891213161720
上面代码中定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1和p2 中设定了不同的累加值。所以来看下这两个进程是否会出现冲突。
2.6.2 加锁import multiprocessing as mpimport timedef job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.1) v.value += num # 获取共享内存 print(v.value) l.release() # 释放def multicore(): l = mp.Lock() # 定义一个进程锁 v = mp.Value('i', 0) # 定义共享内存 p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入 p2 = mp.Process(target=job, args=(v,3,l)) p1.start() p2.start() p1.join() p2.join()if __name__ == '__main__': multicore()#运行一下,看看是否还会出现抢占资源的情况12345811141720
运行结果显示,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行