【本文導讀】文中有許多不妥之處,敬請批評指正!python編寫的服務端,有八種實現並發的方式,如阻塞(對等)套接字實現並發、非阻塞套接字實現並發、epoll實現並發、多進程實現並發、多線程實現並發、進程池實現並發、線程池實現並發、協程實現並發等。

一、什麼是並發?

1、套接字:是python與操作系統兩者間的一個介面,通過設置對等的IP與埠,實現數據的發送與接收。套接字有三種:服務端監聽套接字、服務端對等套接字、客戶端(對等)套接字,其中服務端對等套接字與客戶端套接字實現數據的傳輸。

2、阻塞:套接字傳輸數據時,會出現accept與recv兩種阻塞,服務端在沒有新的套接字來之前,不能處理已經建立連接的套接字的請求,此時出現accept阻塞;服務端在沒有接受到客戶端請求數據之前,不能與其他客戶端建立連接,此時出現recv阻塞,客戶端也會出現類似recv阻塞。

3、進程與並行:多進程實現並行,遵循輪巡調度機制,由多個cpu同時運行,有效解決阻塞問題。並行屬於並發。

4、線程與並發:多線程實現並發,遵循輪巡調度機制,一個cpu在一定時間內的並行,由python解釋器調度。

二、實現並發的八種方式

(一)阻塞(對等)套接字實現並發的服務端

1、阻塞(對等)套接字實現並發的服務端

# 阻塞(對等)套接字實現並發的服務端
import socket # 導入模塊socket(API介面)
server = socket.socket() # 創建服務端
server.bind((0.0.0.0,8881)) # 綁定ip與埠
server.listen(5) # 開始監聽
while True:
conn, address = server.accept() # 獲取服務端對等套接字
while True: # 向客戶端接收發送數據
client_data = conn.recv(1000)
if client_data:
print("客戶端發來的數據",client_data.decode())
conn.send(client_data)
else:
con.close() # 關閉服務端
break

(二)非阻塞套接字實現並發的服務端

1、非阻塞套接字實現並發的服務端:

# 非阻塞套接字實現並發的服務端
import socket # 導入模塊socket
server = socket.socket() # 創建服務端
server.setblocking(False) # 套接字設成非阻塞
server.bind((0.0.0.0,8882)) # 綁定ip與埠
server.listen(5) # 開始監聽
conn_list = [] # 套接字列表
while True:
try:
conn,address = server.accept() # 獲取服務端對等套接字
conn.setblocking(False) # 套接字設置成非阻塞
conn_list.append(conn) # 套接字放入列表
except BlockingIOError as error: # 捕獲異常
pass
for i in conn_list:
try: # 向客戶端接收發送數據
recv_data = conn.recv(1024)
if recv_data:
print("客戶端發來的數據",recv_data.decode())
conn.send(recv_data)
else:
conn.close() # 關閉服務端
conn_list.remove(conn) # 移除套接字
except BlockingIOError as error: #捕獲異常
pass

(三)epoll實現並發的服務端

1、通過epoll實現並發的服務端

# 通過epoll(io多路復用)實現並發的服務端
import socket # 導入模塊
import selectors # 導入模塊
epoll_selector = selectors.EpollSelector() # 實例化epoll(liunx)
server = socket.socket() # 創建服務端
server.bind((0.0.0.0,8888)) # 綁定ip與埠
server.listen(1000) # 開始監聽
def recv(conn): # 向客戶端接收或發送數據
recv_data = conn.recv(1024)
if recv_data:
print("客戶端發來的數據",recv_data.decode())
conn.send(recv_data)
else:
epoll_selector.unregister(conn) # 取消事件
conn.close() # 關閉對等套接字
def accept(server):
conn,addr = server.accept() # 獲取服務端對等套接字
# 回調函數處理第二個recv阻塞,註冊事件:操作系統監控已獲取conn、數據等
epoll_selector.register(conn,selectors.EVENT_READ,recv)
# 回調函數處理第一個accept阻塞,註冊事件:操作系統監控已獲取server、數據等
epoll_selector.register(server,selectors.EVENT_READ,accept)
while True: # 事件循環
events = epoll_selector.select() # 查詢準備好的事件,返回列表
# 用print(events)查詢事件內容
for key ,mask in events: # 拆包,獲取socket、
callback = key.data # 獲取accept
sock = key.fileobj # 獲取準備好的客戶端套接字
callback(sock) # 調用accept函數

(四)多進程實現並發的服務端

1、多進程實現並發的服務端

# 多進程實現並發的服務端
import socket # 導入模塊
from multiprocessing import Process # 導入多進程模塊
server = socket.socket() # 創建服務端
server.bind((0.0.0.0,9091)) # 綁定ip與埠
server.listen(1000) # 開始監聽
def fun(conn): # 向客戶端接收或發送數據
while True:
recv_data = conn.recv(1024)
if recv_data:
print("客戶端發來的數據",recv_data.decode())
conn.send(recv_data)
else:
conn.close()
break
while True: # 主進程
conn,addr = server.accept() # 獲取服務端對等套接字
p = Process(target=fun,args=(conn,)) # 實例化子進程(連接客戶端)
p.start() # 開啟子進程

(五)多線程實現並發的服務端

1、多線程實現並發的服務端:

# 多線程實現並發的客戶端
import socket # 導入模塊
import threading # 導入多線程模塊
server = socket.socket() # 創建服務端
server.bind((0.0.0.0,6668)) # 綁定ip與埠
server.listen(1000) # 開始監聽
def fun(conn): # 向客戶端接收或發送數據
while True:
recv_data = conn.recv(1024)
if recv_data:
print("客戶端發來的數據",recv_data.decode())
conn.send(recv_data)
else:
conn.close()
while True: # 主線程
conn,address = server.accept() # 獲取服務端對等套接字
t = threading.Thread(target=fun,args=(conn,)) # 實例化子線程(連接客戶端)

(六)使用進程池(進程池)實現並發的服務端

1、使用進程池(進程池)實現並發的服務端

# 使用進程池(進程池)實現並發的服務端
import socket # 導入模塊
from multiprocessing import Pool,cpu_count # 導入進程池
from multiprocessing.pool import ThreadPool # 導入線程池
server = socket.socket() # 創建服務端
server.bind((0.0.0.0,5678)) # 綁定ip與埠
server.listen(1000) # 開始監聽
def work_thread(conn): # 向客戶端接收或發送數據
while True:
recv_data = conn.recv(1000)
if recv_data:
print("客戶端發來的數據",recv_data.decode())
conn.send(recv_data)
else:
conn.close()
break
def work_process(server):
t_pool = ThreadPool(2*cpu_count()) # 準備線程
while True:
conn,address = server.accept() # 獲取服務端對等連接套接字
t_pool.apply_async(work_thread,args=(conn,))# 進程池(客戶端收發數據)
n = cpu_count() # CPU的核數
p = Pool(n) # 實例化進程池
for i in range(n):
p.apply_async(work_process,args=(server,)) #進程池(accept阻塞)
p.close()
p.join()

(七)使用進程池(多線程)實現並發的服務端

1、使用進程池(多線程)實現並發的服務端

# 使用進程池(多線程)實現並發的服務端
import socket # 導入模塊
from multiprocessing import Pool,cpu_count
from threading import Thread # 導入多線程
server = socket.socket() # 創建服務端
server.bind((0.0.0.0,7777)) # 綁定ip與埠
server.listen(1000) # 開始監聽
def work_thread(conn): # 向客戶端接收發送數據
while True:
recv_data = conn.recv(1000)
if recv_data:
print("客戶端發來的數據",recv_data.decode())
conn.send(recv_data)
else:
conn.close()
break
def work_process(server):
while True:
conn,addr = server.accept() # 獲取服務端對等套接字
t = Thread(target=work_thread,args=(conn,)) # 多線程
t.start()
n = cpu_count()
p = Pool(n)
for i in range(n):
p.apply_async(work_process,args=(server,)) # 進程池(accept阻塞)
p.close()
p.join()

(八)協程實現並發的服務端

1、使用協程實現並發的服務端

# 使用協程實現並發的服務端
from gevent import monkey;monkey.patch_socket() # 替換python自帶的socket
import gevent # 導入協程
import socket # 導入模塊
server = socket.socket() # 創建服務端
server.bind((0.0.0.0,8989)) # 綁定ip與埠
server.listen(1000) # 開始監聽
def worker(conn): # 向客戶端接收發送數據
while True:
recv_data = conn.recv(1000)
if recv_data:
print("客戶端發來的數據",recv_data.decode())
conn.send(recv_data)
else:
conn.close()
break
while True:
conn,address = server.accept() # 獲取服務端對等套接字
gevent.spawn(worker,conn) # 實例化協程,並將conn作為參數傳入

三、實現並發的具體操作:

1、先啟動服務端(以協程並發為例),埠自己設置。

2、再啟動兩個客戶端,輸入數據,與服務端實現數據傳輸。


推薦閱讀:
相關文章