## 多进程实现TCP服务端并发 ```python import socket from multiprocessing import Process def get_server(): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('0.0.0.0', 8889)) server.listen(5) return server def run(conn): try: while True: redata = conn.recv(1024) print(redata.decode('utf8')) conn.sendall(redata.decode('utf8').upper().encode('utf8')) except BaseException: print('客户端端口连接') if __name__ == '__main__': while True: conn, addr = get_server().accept() p = Process(target=run, args=(conn,)) p.start() ```   ## 互斥锁代码实操 ```python import json import time import random from multiprocessing import Process, RLock def check_ticket(name): with open(r'data.json', 'r', encoding='utf-8') as f: redata = json.load(f) print(f'{name}剩余:{redata.get("ticket_num")}张') def buy(name): with open(r'data.json', 'r', encoding='utf-8') as f: redata = json.load(f) if redata.get('ticket_num') > 0: time.sleep(random.randint(1, 3)) redata['ticket_num'] -= 1 with open(r'data.json', 'w', encoding='utf-8') as f: json.dump(redata, f, ensure_ascii=False) print(f'{name}抢票成功') else: print(f'{name}抢票失败') def run(name, lock): check_ticket(name) lock.acquire() buy(name) lock.release() if __name__ == '__main__': lock = RLock() for num in range(10): p = Process(target=run, args=(num, lock)) p.start() ``` ## 线程理论 ```python 进程 进程其实是资源单位 表示一块内存空间 进程是cpu分配资源的最小单位 线程 线程才是执行单位 表示真正的代码指令 线程是cpu运行的最小单位 我们可以将进程比喻成车间 线程是车间里面的流水线 一个进程内部至少含有一个线程 ``` 1. 一个进程内可以开设多个线程 2. 同一个进程下的多个线程是共享资源的的 3. 创建进程与线程的区别 创建进程的消耗要远远大于线程 ## 创建线程的两种方式 ```python import threading def task(num): print(f'线程{num}') for num in range(10): t = threading.Thread(target=task, args=(num,)) t.start() import threading class Mythread(threading.Thread): def __init__(self, num): self.num = num super().__init__(args=(num,)) # super().__init__() def run(self) -> None: print(f'我是线程{self.num}') self.a() def a(self): print(f'我是线程{self.num}') for num in range(10): t = Mythread(num) t.start() ``` ## 线程的诸多特性 ```python 1.join方式 主线程等待这个子线程运行完毕后再向下执行 2.同一个进程内多个线程数据共享 import threading num = 666 def task(): global num num = 888 t = threading.Thread(target=task) t.start() import time time.sleep(1) print(num) # 888 3.current_thread() # 虚拟线程对象 4.active_count() # 当前进程存活线程数 ``` ## GIL全局解释器锁 ```python # 官方文档对GIL的解释 In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces. ``` 1. 再Cpython解释器中存在全局解释器锁简称GIL python解释器有很多类型CPython Jpython PYPython(我们一般常用的是CPython解释器) 2. GIL本质上也是一把互斥锁 用来阻止同一个进程中多个线程同时执行 3. GIL的存在是因为CPython解释器中内存管理不是线程安全的(垃圾回收机制) Python为了可以更加有效地利用多核处理器的性能,会支持多线程的操作,但是线程是非独立的,所以同一进程里面的线程数据是共享的,当各个线程去访问资源时会出现竞争的状态,也就是说数据可能会同时被多个线程占用,这样会造成线程的不安全问题,而解决多线程之间数据完整性和状态同步最直接的方式就是加锁。 每一个运行的进程都有个一 垃圾回收线程 垃圾回收机制 引用计数 标记清楚 分代回收  ## 验证GIL锁的存在 ```python from threading import Thread num = 100 def task(): global num num -= 1 t_list = [] for i in range(100): t = Thread(target=task) t.start() t_list.append(t) for t in t_list: t.join() print(num) ``` ## GIL与普通互斥锁区别 ```python 既然CPython解释器中有GIL 那么以后写代码是不是就不需要操作锁了!!! """ GIL只能够保证进程内多线程数据不会被垃圾回收机制弄乱并不能确保程序里面的数据是否安全 """ import time from threading import Thread,Lock num = 100 def task(mutex): global num mutex.acquire() count = num time.sleep(0.1) num = count - 1 mutex.release() mutex = Lock() t_list = [] for i in range(100): t = Thread(target=task,args=(mutex,)) t.start() t_list.append(t) for t in t_list: t.join() print(num) ``` ## python多线程是否有用 ```python 需要分情况 情况1 单个cpu 多个cpu 情况2 IO密集型(代码有IO操作) 计算密集型(代码没有IO操作) 1.单个cpu IO密集型 多进程 申请额外的空间 消耗更多的资源 多线程 消耗资源相对较少 通过多道技术 ps:多线程有优势 计算密集型 多进程 申请额外的空间 消耗更多的资源(总耗时+申请空间+拷贝代码+切换) 多线程 消耗资源相对较少 通过多道技术(总耗时+切换) ps:多线程有优势 2.多个cpu IO密集型 多进程 总耗时(单个进程的耗时+IO+申请空间+拷贝代码) 多线程 总耗时(单个进程的耗时+IO) 多线程有优势 计算密集型 多进程 总耗时(单个进程的耗时) 多线程 总耗时(多个进程的综合) from threading import Thread from multiprocessing import Process import os import time def work(): # 计算密集型 res = 1 for i in range(1, 100000): res *= i if __name__ == '__main__': # print(os.cpu_count()) # 12 查看当前计算机CPU个数 start_time = time.time() # p_list = [] # for i in range(12): # 一次性创建12个进程 # p = Process(target=work) # p.start() # p_list.append(p) # for p in p_list: # 确保所有的进程全部运行完毕 # p.join() t_list = [] for i in range(12): t = Thread(target=work) t.start() t_list.append(t) for t in t_list: t.join() print('总耗时:%s' % (time.time() - start_time)) # 获取总的耗时 """ 计算密集型 多进程:5.665567398071289 多线程:30.233906745910645 """ def work(): time.sleep(2) # 模拟纯IO操作 if __name__ == '__main__': start_time = time.time() # t_list = [] # for i in range(100): # t = Thread(target=work) # t.start() # for t in t_list: # t.join() p_list = [] for i in range(100): p = Process(target=work) p.start() for p in p_list: p.join() print('总耗时:%s' % (time.time() - start_time)) """ IO密集型 多线程:0.0149583816528320 多进程:0.6402878761291504 """ ``` ## 死锁现象 ```python acquire() release() from threading import Thread,Lock import time mutexA = Lock() # 产生一把锁 mutexB = Lock() # 产生一把锁 class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print(f'{self.name}抢到了A锁') mutexB.acquire() print(f'{self.name}抢到了B锁') mutexB.release() print(f'{self.name}释放了B锁') mutexA.release() print(f'{self.name}释放了A锁') def func2(self): mutexB.acquire() print(f'{self.name}抢到了B锁') time.sleep(1) mutexA.acquire() print(f'{self.name}抢到了A锁') mutexA.release() print(f'{self.name}释放了A锁') mutexB.release() print(f'{self.name}释放了B锁') for i in range(10): obj = MyThread() obj.start() ``` ## 信号量 ```python 再python并发编程中信号量相当于多把锁(公共厕所) from threading import Semaphore, Thread sp = Semaphore(5) import time class MyThread(Thread): def run(self): with sp: print('hahahaha') time.sleep(3) for i in range(20): t = MyThread() t.start() ``` ## event事件 ```python from threading import Thread, Event import time ev = Event() def wait(ev): print('等待信号执行') ev.wait() print('收到了开始执行') def run(): print('准备执行') time.sleep(3) ev.set() t2 = Thread(target=run) t2.start() for i in range(20): t = Thread(target=wait, args=(ev,)) t.start() ``` ## 进程池与线程池 ```python 进程和线程能否无限制的创建 不可以 因为硬件的发展跟不上软件 有物理极限 如果我们再编写代码的过程中无限制的创建进程或者线程可能会导致计算机崩溃 池 降低程序的执行效率 但是保证了计算机硬件安全 进程池 提前创建好固定数量的进程供后续程序的调用 超出则等待 线程池 提前创建好固定数量的线程供后续程序的调用 超出则等待 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os import time # pool = ProcessPoolExecutor(os.cpu_count()) # 根据不同机器的cpu个数动态创建os.cpu_count() # 不填默认为cpu个数 pool = ThreadPoolExecutor() # 默认为cpu个数加4如果大于32则为32 def done(res): print(res.result()) def run(num): print(f'进程{num}') time.sleep(1.5) return num if __name__ == '__main__': for i in range(30): fature = pool.submit(run, i) fature.add_done_callback(done) # 执行完毕后反馈机制 ``` ## 协程 1. 进程:资源单位 2. 线程:执行单位 3. 协程:单线程下实现并发(效率极高) 在代码层面欺骗cpu 让cpu觉得我们的代码里面没有IO操作 实际上IO操作被我们自己写的代码检测 一旦有 立刻让代码执行别的 (该技术完全是程序员自己弄出来的 名字也是程序员自己起的) 核心:自己写代码完成切换+保存状态 ```python from gevent import monkey monkey.patch_all() # 只要放在join前面就行 from gevent import spawn import time def func1(num): print('开始第一个函数', num) time.sleep(3) print('第一个sleep 3秒后', num) def func2(num): print('开始第二个函数', num) time.sleep(2) print('第二个函数sleep 3秒后', num) # print也是io操作 p1 = spawn(func1, 1) p2 = spawn(func2, 2) # 不用要下面也行速度最快好像 p1.start() # 建议把他们放到最后最大化减少整体时间 p2.start() # 只有这个没有join是主线程不等待spwn函数,有了下面的则会等待 # p1.join() # 跟线程进程稍微优点区别这个是让函数执行完毕就行,没有先后顺序 # p2.join() time.sleep(5) ``` ## 协程实现并发 ```python # 协程主线程执行完毕子线程立即结束不会等待子线程执行完毕默认情况 import socket from gevent import monkey;monkey.patch_all() # 固定编写 用于检测所有的IO操作(猴子补丁) from gevent import spawn def communication(sock): while True: data = sock.recv(1024) print(data.decode('utf8')) sock.send(data.upper()) def get_server(): server = socket.socket() server.bind(('127.0.0.1', 8080)) server.listen(5) while True: sock, addr = server.accept() # IO操作 spawn(communication, sock) s1 = spawn(get_server) s1.join() 如何不断的提升程序的运行效率 多进程下开多线程 多线程下开协程 ``` Last modification:November 30th, 2022 at 09:45 pm © 允许规范转载 Support 如果觉得我的文章对你有用,请随意赞赏 ×Close Appreciate the author Sweeping payments
Comment here is closed