博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
多线程 Threading Multiprocessing(Python)
阅读量:5103 次
发布时间:2019-06-13

本文共 11761 字,大约阅读时间需要 39 分钟。

多线程是加速程序计算的有效方式,Python的多线程模块threading上手快速简单,学习动手操作了一遍,这里记录一下。

1 Threading

1.1 添加线程

import threading#获取已激活的线程数print(threading.active_count())  #1#查看所有线程信息print(threading.enumerate())  #[<_MainThread(MainThread, started 18496)>]#查看现在正在运行的线程print(threading.current_thread()) #<_MainThread(MainThread, started 18496)>import threadingdef thread_job():    print('This is a thread of %s' % threading.current_thread())def runMain():    thread = threading.Thread(target=thread_job,) #定义线程    thread.start()  #线程开始工作if __name__ == '__main__':    runMain()#输出This is a thread of 

1.2 join功能

不加join功能,线程任务还未完成便输出all done。

import threadingimport timedef thread_job():    print('T1 start.\n')    for i in range(10):        time.sleep(0.1)  #任务间隔0.1秒    print('T1 finish.\n')add_thread = threading.Thread(target=thread_job, name='T1')add_thread.start()print('all done.\n')#输出T1 start.all done.T1 finish.

若要遵循顺序,在启动线程后调用join , 使用join控制多个线程的执行顺序,效果如下。

import threadingimport timedef thread_job():    print('T1 start.\n')    for i in range(10):        time.sleep(0.1)  #任务间隔0.1秒    print('T1 finish.\n')add_thread = threading.Thread(target=thread_job, name='T1')add_thread.start()add_thread.join()print('all done.\n')#输出T1 start.T1 finish.all done.

1.3 存储进程结果Queue

将数据列表中的数据传入,使用四个线程处理,将结果保存在Queue中,线程执行完后,从Queue中获取存储的结果

#导入线程 队列的标准模块import threadingimport timefrom queue import Queue

定义一个被多线程调用的函数:函数的参数时一个列表l和一个队列q,函数的功能是对列表的每个元素进行平方计算,将结果保存在队列中

def job(l,q):    for i in range(len(l)):        l[i] = l[i]**2    q.put(l)

定义一个多线程函数:在多线程函数中定义一个Queue ,用来保存返回值 ,代替return ,定义一个多线程列表 ,初始化一个多维数据列表

def mulithreading():    q = Queue() #q中存放返回值 代替return的返回值    threads = []    data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]

在多线程函数中定义四个线程,启动线程,将每个线程添加到多线程的列表中

for i in range(4): #定义四个线程        t = threading.Thread(target=job,args=(data[i],q))        t.start()        threads.append(t) #把每个线程append到线程列表中

分别join四个线程到主线程

for thread in threads:        thread.join()

定义一个空列表results 将四个线程运行后保存在队列中的结果返回给results

results = []    for _ in range(4):        results.append(q.get()) #q.get()按顺序从q中拿出一个值    print(results)

完整代码:

#导入线程 队列的标准模块import threadingimport timefrom queue import Queue#定义一个被多线程调用的函数def job(l,q):    for i in range(len(l)):        l[i] = l[i]**2    q.put(l)#定义一个多线程函数def mulithreading():    q = Queue() #q中存放返回值 代替return的返回值    threads = []    data = [[1,2,3],[3,4,5],[4,4,4],[5,5,5]]    for i in range(4): #定义四个线程        t = threading.Thread(target=job,args=(data[i],q))        t.start()        threads.append(t) #把每个线程append到线程列表中    #分别join四个线程到主线程    for thread in threads:        thread.join()    #定义一个空列表results 将四个线程运行后保存在队列中的结果返回给results    results = []    for _ in range(4):        results.append(q.get()) #q.get()按顺序从q中拿出一个值    print(results)if __name__ == '__main__':    mulithreading()#输出[[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]

1.4 GIL 不一定有效率

python 的多线程 threading 有时候并不是特别理想. 最主要的原因是就是, Python 的设计上, 有一个必要的环节, 就是 Global Interpreter Lock (GIL). 这个东西让 Python 还是一次性只能处理一个东西.

import threadingfrom queue import Queueimport copyimport timedef job(l, q):    res = sum(l)    q.put(res)def multithreading(l):    q = Queue()    threads = []    for i in range(4):        t = threading.Thread(target=job, args=(copy.copy(l), q), name='T%i' % i)        t.start()        threads.append(t)    [t.join() for t in threads]    total = 0    for _ in range(4):        total += q.get()    print(total)def normal(l):    total = sum(l)    print(total)if __name__ == '__main__':    l = list(range(10000000))    s_t = time.time()    normal(l*4)    print('normal: ',time.time()-s_t)    s_t = time.time()    multithreading(l)    print('multithreading: ', time.time()-s_t)#输出199999980000000normal:  1.7343778610229492199999980000000multithreading:  2.218825340270996

程序 threading 和 Normal 运行了一样多次的运算. 但是我们发现 threading 却没有快多少, 按理来说, 我们预期会要快3-4倍, 因为有建立4个线程, 但是并没有. 这就是其中的 GIL 在作怪.

1.5 线程锁

不使用锁

import threadingdef job1():  #全局变量A的值每次加1,循环10次    global A    for i in range(10):        A += 1        print('job1',A)def job2(): #全局变量A的值每次加10,循环10次    global A    for i in range(10):        A += 10        print('job2',A)if __name__ == '__main__':    A = 0    t1 = threading.Thread(target=job1)    t2 = threading.Thread(target=job2)    t1.start()    t2.start()    t1.join()    t2.join()#输出 打印结果非常混乱job1 1job1 2job1 3job1 4job2 14job1 15job2 25job1 26job2 36job1 37job2 47job1 48job2 58job1 59job2 69job1 70job2 80job2 90job2 100job2 110

使用锁

lock在不同线程使用同一共享内存时,能够确保线程之间互不影响,使用lock的方法是, 在每个线程执行运算修改共享内存之前,执行lock.acquire()将共享内存上锁, 确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,使用lock.release()将锁打开, 保证其他的线程可以使用该共享内存。

import threadingdef job1():    global A;lock = threading.Lock()    lock.acquire()    for i in range(10):        A += 1        print('job1',A)    lock.release()def job2():    global A;lock = threading.Lock()    lock.acquire()    for i in range(10):        A += 10        print('job2',A)    lock.release()if __name__ == '__main__':    A = 0    t1 = threading.Thread(target=job1)    t2 = threading.Thread(target=job2)    t1.start()    t2.start()    t1.join()    t2.join()#输出  使用lock后 执行完一个线程后再执行另一个线程。使用lock和不使用lock,最后打印输出的结果是不同的。job1 1job1 2job1 3job1 4job1 5job1 6job1 7job1 8job1 9job1 10job2 20job2 30job2 40job2 50job2 60job2 70job2 80job2 90job2 100job2 110

2 Multiprocessing

多进程 Multiprocessing 和多线程 threading 类似, 都是在 python 中用来并行运算的。不过既然有了 threading, 为什么 Python 还要出一个 multiprocessing 呢? 因为要用来弥补 threading 的一些劣势, 比如在 threading 教程中提到的GIL, python 把 multiprocessing 和 threading 的使用方法做的几乎差不多,使用多线程发挥电脑多核系统的威力。

2.1添加Process

#导入线程进程标准模块import multiprocessing as mpimport threading as td#定义一个被线程和进程调用的函数def job(a,d):    print('AA')#创建线程和进程t1=td.Thread(target=job,args=(1,2))p1=mp.Process(target=job,args=(1,2))#启动线程和进程t1.start()p1.start()#连接线程和进程t1.join()p1.join()#可以看出线程和进程的使用方式相似

完整代码

#导入线程进程标准模块import multiprocessing as mpimport threading as td#定义一个被进程调用的函数def job(a,d):    print('AA')if __name__ == '__main__':    p1 = mp.Process(target=job, args=(1, 2)) #创建进程    p1.start() #启动进程    p1.join()  #连接进程#输出AA

2.2 存储进程输出 Queue

Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。因为多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果。

定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果

#定义一个多线程调用函数def job(q): #注:该函数没有返回值    res = 0    for i in range(1000):        res += i+i**2+i**3    q.put(res) #queue

定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,参数后面需要加一个逗号,表示args是可迭代的,后面可能还有别的参数,不加逗号会出错

p1 = mp.Process(target=job,args=(q,))p2 = mp.Process(target=job,args=(q,))

完整代码实现

import multiprocessing as mp#定义一个多线程调用函数def job(q): #注:该函数没有返回值    res = 0    for i in range(1000):        res += i+i**2+i**3    q.put(res) #queueif __name__ == '__main__':    q=mp.Queue()  #定义一个多线程队列 存储结果    p1 = mp.Process(target=job, args=(q,))    p2 = mp.Process(target=job, args=(q,))    p1.start() #启动线程  分两批处理    p2.start()    p1.join() #连接线程    p2.join()    res1=q.get() #分两批输出 将结果分别保存    res2=q.get()    print(res1+res2)#输出499667166000

2.3效率对比

对比下多进程,多线程和什么都不做时的消耗时间,看看哪种方式更有效率。

import multiprocessing as mpdef job(q):    res=0    for i in range(1000000):        res += i + i**2 + i**3    q.put(res)#由于多进程是多核运算 多进程代码命名为multicore()def multicore():    q = mp.Queue()    p1 = mp.Process(target=job, args=(q,))    p2 = mp.Process(target=job, args=(q,))    p1.start()    p2.start()    p1.join()    p2.join()    res1 = q.get()    res2 = q.get()    print('multicore:',res1 + res2)#创建多线程import threading as tddef multithread():    q = mp.Queue() # thread可放入process同样的queue中    t1 = td.Thread(target=job, args=(q,))    t2 = td.Thread(target=job, args=(q,))    t1.start()    t2.start()    t1.join()    t2.join()    res1 = q.get()    res2 = q.get()    print('multithread:', res1 + res2)#创建普通函数def normal():    res = 0    for _ in range(2):        for i in range(1000000):            res += i + i**2 + i**3    print('normal:', res)import timeif __name__ == '__main__':    st = time.time()    normal()    st1 = time.time()    print('normal time:', st1 - st)    multithread()    st2 = time.time()    print('multithread time:', st2 - st1)    multicore()    print('multicore time:', time.time() - st2)#输出normal: 499999666667166666000000normal time: 1.6875250339508057multithread: 499999666667166666000000multithread time: 3.1562907695770264multicore: 499999666667166666000000multicore time: 1.0937612056732178

这次运行时间依然是:多进程 < 普通 < 多线程。 发现多核/多进程最快,说明在同时间运行了多个任务。 而多线程的运行时间居然比什么都不做的程序还要慢一点,说明多线程还是有短板。

2.4 进程池Pool

进程池就是将所要运行的东西,放到池子里,Python会自行解决多进程的问题

2.4.1 进程池Pool()和map()

#定义一个Poolpool = mp.Pool()

有了池子之后,就可以让池子对应某一个函数,向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。

接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果 res = pool.map(job, range(10))

import multiprocessing as mpdef job(x):    return x*xdef multicore():    pool = mp.Pool()    res = pool.map(job,range(10))    print(res)if __name__ == '__main__':    multicore()#输出[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2.4.2 自定义核数量

怎么知道Pool是否真的调用了多个核呢?可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况
打开CPU负载:活动监视器 > CPU > CPU负载(单击一下即可)
Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量,

def multicore():    pool = mp.Pool(processes=3) # 定义CPU核数量为3    res = pool.map(job, range(10))    print(res)

2.4.3 apply_async()

Pool除了map()外,还有可以返回结果的方式,就是apply_async()。apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值

def multicore():    pool = mp.Pool()    res = pool.map(job, range(10))    print(res)    res = pool.apply_async(job, (2,))    # 用get获得结果    print(res.get())#运行结果[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]4

2.4.4 apply_async()输出多个结果

在apply_async()中多传入几个值
res = pool.apply_async(job, (2,3,4,)) #报错 TypeError: job() takes exactly 1 argument (3 given) 即apply_async()只能输入一组参数。
将apply_async() 放入迭代器中,定义一个新的multi_res
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
取出值时需要一个一个取出来
print([res.get() for res in multi_res])
合并代码

def multicore():    pool = mp.Pool()    res = pool.map(job, range(10))    print(res)    res = pool.apply_async(job, (2,))    # 用get获得结果    print(res.get())    # 迭代器,i=0时apply一次,i=1时apply一次等等    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]    # 从迭代器中取出    print([res.get() for res in multi_res])#运行结果[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()4[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res
  • Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
  • map() 放入迭代参数,返回多个结果
  • apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代

2.5 共享内存 shared memory

2.5.1 Shared Value

使用Value数据存储在一个共享的内存表中。

import multiprocessing as mpvalue1 = mp.Value('i', 0)value2 = mp.Value('d', 3.14)

其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型

2.5.2 Shared Array
在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据
array = mp.Array('i', [1, 2, 3, 4])
这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。

2.6 进程锁

2.6.1 不加锁

import multiprocessing as mpdef job(x):    return x*xdef multicore():    pool = mp.Pool()    res = pool.map(job, range(10))    print(res)    res = pool.apply_async(job, (2,))    # 用get获得结果    print(res.get())    # 迭代器,i=0时apply一次,i=1时apply一次等等    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]    # 从迭代器中取出    print([res.get() for res in multi_res])if __name__ == '__main__':    multicore()#输出145891213161720

上面代码中定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1和p2 中设定了不同的累加值。所以来看下这两个进程是否会出现冲突。

2.6.2 加锁

import multiprocessing as mpimport timedef job(v, num, l):    l.acquire() # 锁住    for _ in range(5):        time.sleep(0.1)        v.value += num # 获取共享内存        print(v.value)    l.release() # 释放def multicore():    l = mp.Lock() # 定义一个进程锁    v = mp.Value('i', 0) # 定义共享内存    p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入    p2 = mp.Process(target=job, args=(v,3,l))    p1.start()    p2.start()    p1.join()    p2.join()if __name__ == '__main__':    multicore()#运行一下,看看是否还会出现抢占资源的情况12345811141720

运行结果显示,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

转载于:https://www.cnblogs.com/eugene0/p/11546028.html

你可能感兴趣的文章
设计模式之装饰者模式
查看>>
一道不知道哪里来的容斥题
查看>>
Blender Python UV 学习
查看>>
window添加右键菜单
查看>>
入手腾龙SP AF90mm MACRO
查看>>
python学习4 常用内置模块
查看>>
Window7上搭建symfony开发环境(PEAR)
查看>>
ResolveUrl的用法
查看>>
Linux内核态、用户态简介与IntelCPU特权级别--Ring0-3
查看>>
第23月第24天 git命令 .git-credentials git rm --cached git stash clear
查看>>
java SE :标准输入/输出
查看>>
一些方便系统诊断的bash函数
查看>>
<转>关于MFC的多线程类 CSemaphore,CMutex,CCriticalSection,CEvent
查看>>
jquery中ajax返回值无法传递到上层函数
查看>>
css3之transform-origin
查看>>
[转]JavaScript快速检测浏览器对CSS3特性的支持
查看>>
Master选举原理
查看>>
[ JAVA编程 ] double类型计算精度丢失问题及解决方法
查看>>
小别离
查看>>
微信小程序-发起 HTTPS 请求
查看>>