學(xué)習(xí)過(guò)操作系統(tǒng)的小伙伴應(yīng)該對(duì)資源的控制有所了解——資源是一定的,不能讓所有進(jìn)程無(wú)控制地進(jìn)行搶占。這就要求我們對(duì)進(jìn)行進(jìn)行并發(fā)控制。那么python是如何實(shí)現(xiàn)多進(jìn)程并發(fā)控制的呢?主要是使用互斥鎖和新號(hào)量來(lái)控制進(jìn)程資源調(diào)度。接下來(lái)小編就來(lái)介紹怎么使用python實(shí)現(xiàn)多進(jìn)程并發(fā)控制吧。
一、了解鎖
應(yīng)用場(chǎng)景舉例描述: Lock 互斥鎖:舉例說(shuō)明–有三個(gè)同事同時(shí)需要上廁所,但是只有一間廁所,將同事比作進(jìn)程,多個(gè)進(jìn)程并打搶占一個(gè)廁所,我們要保證順序優(yōu)先, 一個(gè)個(gè)來(lái),那么就必須串行,先到先得,有人使用了,就鎖住,結(jié)束后剩余的人繼續(xù)爭(zhēng)搶
1、利用Lock來(lái)處理
模擬三個(gè)同事?lián)屨紟?/p>
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
def task1(p, lock):
# 上鎖
lock.acquire()
print(f'{p} 開(kāi)始排泄')
time.sleep(random.randint(1, 3))
print(f'{p} 排泄結(jié)束')
# 解鎖
lock.release()
def task2(p, lock):
lock.acquire()
print(f'{p} 開(kāi)始排泄')
time.sleep(random.randint(1, 3))
print(f'{p} 排泄結(jié)束')
lock.release()
def task3(p, lock):
lock.acquire()
print(f'{p} 開(kāi)始排泄')
time.sleep(random.randint(1, 3))
print(f'{p} 排泄結(jié)束')
lock.release()
if __name__ == '__main__':
# 實(shí)例化一個(gè)鎖對(duì)象
mutex = Lock()
# 將鎖以參數(shù)的形式傳入進(jìn)程對(duì)象
p1 = Process(target=task1, args=('task1', mutex,))
p2 = Process(target=task2, args=('task2', mutex,))
p3 = Process(target=task3, args=('task3', mutex,))
p1.start()
p2.start()
p3.start()
執(zhí)行結(jié)果:
# 輸出結(jié)果:三個(gè)進(jìn)程開(kāi)始爭(zhēng)搶互斥鎖,先搶到的先執(zhí)行,執(zhí)行結(jié)束后,釋放掉鎖,剩下的繼續(xù)爭(zhēng)搶
task1 開(kāi)始排泄
task1 排泄結(jié)束
task2 開(kāi)始排泄
task2 排泄結(jié)束
task3 開(kāi)始排泄
task3 排泄結(jié)束
1、 注意:
- 互斥鎖在函數(shù)中,lock.acquire()上鎖一次就要lock.release()解鎖一次,在上鎖與解鎖之間寫(xiě)需要執(zhí)行的代碼。
- 如果連續(xù)上鎖兩次以上,就會(huì)出現(xiàn)死鎖現(xiàn)象,代碼將不繼續(xù)執(zhí)行下去。必須是鎖一次解一次。
2、 lock和join比較:
- 共同點(diǎn)------都可以把并行變成串行,保證了執(zhí)行順序
- 不同點(diǎn)------join是人為設(shè)定了順序,lock是讓其爭(zhēng)搶順序,保證了公平性
2、利用反射,來(lái)優(yōu)化上面的代碼
上面的代碼雖然起到了先進(jìn)先出,一進(jìn)一出的效果,但是并不完美??偹苤?,我們上廁所是誰(shuí)先搶到誰(shuí)先上,并不是說(shuō)按照代碼里start()順序執(zhí)行。應(yīng)該由先搶占到的進(jìn)程限制性才更合理。
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
import sys
def task1(p, lock):
# 上鎖
lock.acquire()
print(f'{p} 開(kāi)始打印')
time.sleep(random.randint(1, 3))
print(f'{p} 打印結(jié)束')
# 解鎖
lock.release()
def task2(p, lock):
lock.acquire()
print(f'{p} 開(kāi)始打印')
time.sleep(random.randint(1, 3))
print(f'{p} 打印結(jié)束')
lock.release()
def task3(p, lock):
lock.acquire()
print(f'{p} 開(kāi)始打印')
time.sleep(random.randint(1, 3))
print(f'{p} 打印結(jié)束')
lock.release()
if __name__ == '__main__':
slock = Lock()
for i in range(1,4):
p = Process(target=getattr(sys.modules[__name__], f'task{i}'), args=(f'task{i}', slock))
p.start()
輸出結(jié)果:
task2 開(kāi)始打印
task2 打印結(jié)束
task3 開(kāi)始打印
task3 打印結(jié)束
task1 開(kāi)始打印
task1 打印結(jié)束
二、進(jìn)程并發(fā)控制 semaphore
semaphore(信號(hào)量):用來(lái)控制對(duì)共享資源的訪問(wèn)數(shù)量,可以控制同一時(shí)刻并發(fā)的進(jìn)程數(shù)
信號(hào)量: 也是一把鎖,但是不保證數(shù)據(jù)安全性,同時(shí)開(kāi)啟多個(gè)線程,但是規(guī)定了同時(shí)并發(fā)執(zhí)行的上限,后面走多少,進(jìn)多少。(用于控制并發(fā)數(shù)量)
1.多進(jìn)程控制示例(1)
# 舉例說(shuō)明:一間廁所有5個(gè)坑位,最多只能同時(shí)有5個(gè)人上廁所,當(dāng)前時(shí)刻有20個(gè)人想上廁所,但是只能讓5個(gè)人進(jìn)去,然后出來(lái)多少個(gè),才能進(jìn)去多少個(gè)上廁所
# 從一個(gè)模塊引用多個(gè)功能的時(shí)候,用逗號(hào)隔開(kāi)
from threading import Semaphore, Thread, currentThread
import time
import random
sem = Semaphore(3) # 并發(fā)執(zhí)行數(shù)設(shè)置為5
def task():
sem.acquire()
print(f'{currentThread().name}')
time.sleep(random.randint(1,3))
sem.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task)
t.start()
執(zhí)行結(jié)果:首次并發(fā)量是3,后面先搶到鎖先執(zhí)行
Thread-1
Thread-2
Thread-3Thread-4
Thread-5Thread-6
Thread-7Thread-8
Process finished with exit code 0
2.多進(jìn)程控制示例(2)
import multiprocessing
import time
def worker(s, i):
s.acquire()
print(time.strftime('%Y-%m-%d %H:%M:%S'), multiprocessing.current_process().name + " 搶占并獲得鎖,運(yùn)行")
time.sleep(i)
print(time.strftime('%Y-%m-%d %H:%M:%S'), multiprocessing.current_process().name + " 運(yùn)行結(jié)束,釋放鎖")
s.release()
if __name__ == '__main__':
s = multiprocessing.Semaphore(2)
for i in range(8):
p = multiprocessing.Process(target=worker, args=(s, 1))
p.start()
執(zhí)行結(jié)果:
在執(zhí)行結(jié)果輸出的終端,每阻塞一次,按下回車(chē)鍵,可以更加清晰的看出進(jìn)程的并發(fā)執(zhí)行。
由下面執(zhí)行結(jié)果可以看出,同一時(shí)刻,有兩個(gè)進(jìn)程在執(zhí)行
2021-05-18 22:50:37 Process-1 搶占并獲得鎖,運(yùn)行
2021-05-18 22:50:37 Process-2 搶占并獲得鎖,運(yùn)行2021-05-18 22:50:38 Process-1 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:38 Process-3 搶占并獲得鎖,運(yùn)行
2021-05-18 22:50:38 Process-2 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:38 Process-4 搶占并獲得鎖,運(yùn)行2021-05-18 22:50:39 Process-3 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:39 Process-5 搶占并獲得鎖,運(yùn)行
2021-05-18 22:50:39 Process-4 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:39 Process-6 搶占并獲得鎖,運(yùn)行2021-05-18 22:50:40 Process-5 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:40 Process-7 搶占并獲得鎖,運(yùn)行
2021-05-18 22:50:40 Process-6 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:40 Process-8 搶占并獲得鎖,運(yùn)行2021-05-18 22:50:41 Process-7 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:41 Process-8 運(yùn)行結(jié)束,釋放鎖Process finished with exit code 0
三、進(jìn)程同步之LOCK
多個(gè)進(jìn)程并發(fā)執(zhí)行,提高資源利用率,從而提高效率,但是有時(shí)候我們需要在某一時(shí)刻只能有一個(gè)進(jìn)程訪問(wèn)某個(gè)共享資源的話,就需要使用鎖LOCK
1.不加LOCK的示例
import multiprocessing
import time
def task1():
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task1 輸出信息')
time.sleep(1)
n -= 1
def task2():
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task2 輸出信息')
time.sleep(1)
n -= 1
def task3():
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task3 輸出信息')
time.sleep(1)
n -= 1
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task1)
p2 = multiprocessing.Process(target=task2)
p3 = multiprocessing.Process(target=task3)
p1.start()
p2.start()
p3.start()
執(zhí)行結(jié)果:
2021-59-18 22:59:46 task1 輸出信息
2021-59-18 22:59:46 task2 輸出信息
2021-59-18 22:59:46 task3 輸出信息2021-59-18 22:59:47 task1 輸出信息
2021-59-18 22:59:47 task2 輸出信息
2021-59-18 22:59:47 task3 輸出信息2021-59-18 22:59:48 task1 輸出信息
2021-59-18 22:59:48 task2 輸出信息
2021-59-18 22:59:48 task3 輸出信息Process finished with exit code 0
2.加上LOCK的示例
有兩種加鎖方式:首先將 lock = multiprocessing.Lock() 生成鎖對(duì)象lock
- with lock: with會(huì)在執(zhí)行前啟動(dòng)lock,在執(zhí)行結(jié)束后關(guān)閉lock
- lock.acquire() … lock.release() : 注意,這倆必須是一個(gè)接一個(gè)的對(duì)應(yīng)關(guān)系
import multiprocessing
import time
def task1(lock):
with lock:
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task1 輸出信息')
time.sleep(1)
n -= 1
def task2(lock):
lock.acquire()
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task2 輸出信息')
time.sleep(1)
n -= 1
lock.release()
def task3(lock):
lock.acquire()
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task3 輸出信息')
time.sleep(1)
n -= 1
lock.release()
if __name__ == '__main__':
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=task1, args=(lock,))
p2 = multiprocessing.Process(target=task2, args=(lock,))
p3 = multiprocessing.Process(target=task3, args=(lock,))
p1.start()
p2.start()
p3.start()
執(zhí)行結(jié)果
2021-11-18 23:11:37 task1 輸出信息
2021-11-18 23:11:38 task1 輸出信息
2021-11-18 23:11:39 task1 輸出信息
2021-11-18 23:11:40 task2 輸出信息
2021-11-18 23:11:41 task2 輸出信息
2021-11-18 23:11:42 task2 輸出信息
2021-11-18 23:11:43 task3 輸出信息
2021-11-18 23:11:44 task3 輸出信息
2021-11-18 23:11:45 task3 輸出信息
Process finished with exit code 0
到此這篇python實(shí)現(xiàn)多進(jìn)程并發(fā)控制的文章就介紹到這了,更多python進(jìn)程相關(guān)的學(xué)習(xí)內(nèi)容請(qǐng)搜索W3Cschool以前的文章或繼續(xù)瀏覽下面的相關(guān)文章。