为什么使用 python 的进程池处理并发的 TCP 请求,会导致客户端并发卡住?
服务端代码:
import osimport socket
import sys
import time
import threading
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future
import multiprocessing
default_encoding: str = 'utf-8'
pool = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix='simple-work-thread-pool'
)
def init_serversocket() -> socket.socket:
serversocket = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_STREAM
)
# 获取本地主机名
host = socket.gethostname()
logger.debug(f'host {host}')
port = 6001
# 绑定端口号
serversocket.bind(('0.0.0.0', port))
# 设置最大连接数,超过后排队
serversocket.listen(2048)
return serversocket
def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:
send_len: int = clientsocket.send(response_body)
clientsocket.close()
return send_len
def start_request(clientsocket: socket.socket, addr: tuple) -> int:
try:
pid = os.getpid()
logger.debug(f'pid: {pid}, get message from {addr}')
request_body: bytes = clientsocket.recv(2048)
request_text: str = request_body.decode(encoding=default_encoding)
response_text: str = f'server get message: {request_text}'
response_body: bytes = response_text.encode(default_encoding)
# time.sleep(1)
send_len = send_response(
clientsocket=clientsocket, addr=addr, response_body=response_body)
logger.debug(f'发送了响应')
return send_len
except Exception as error:
logger.exception(error)
def start_request_callback(future: Future) -> None:
send_len: int = future.result()
logger.debug(
f'{threading.current_thread().name}, send payload len is {send_len}')
if __name__ == "__main__":
serversocket = init_serversocket()
pool = multiprocessing.Pool(processes=16)
while True:
clientsocket, addr = serversocket.accept()
clientsocket: socket.socket
addr: tuple
# future: Future = pool.submit(start_request, clientsocket, addr)
# future.add_done_callback(start_request_callback)
pool.apply_async(start_request, (clientsocket, addr))
pool.close()
pool.join()
服务端使用进程池并发处理来自客户端的 TCP 请求
客户端代码:
python">from base64 import encodeimport socket # 客户端 发送一个数据,再接收一个数据
import json
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
failture_requests = []
def send_request(index:int):
try:
# 声明socket类型,同时生成链接对象
clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsocket.connect(('127.0.0.1', 6001)) # 建立一个链接,连接到本地的6969端口
payload = b'ponponon'
clientsocket.send(payload)
data = clientsocket.recv(1024)
payload = data.decode()
logger.debug(index)
clientsocket.close()
logger.debug('请求完成')
except Exception as error:
failture_requests.append(index)
logger.exception(error)
pool = ThreadPoolExecutor(max_workers=2)
for index in range(100):
pool.submit(send_request,index)
pool.shutdown(wait=True)
logger.debug(failture_requests)
logger.debug(len(failture_requests))
因为我的客户端使用线程池并发,只要 max_workers 大于 1 就会一直卡死
测试平台是 macos 13.3.1 (22E261) + python3.10.10,会卡死
但是我在 ubuntu20.04 + python3.11.3 上就是一切正常,不会出现卡死的问题
但是 max_workers 是 1 就没有问题,我怀疑问题出在服务端,但是具体原因我分析不出来
服务端如果用线程池,就一切正常,服务端和客户端都正常。
但是我只想用多进程
回答:
import osimport socket
import sys
import time
import threading
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future
import multiprocessing
default_encoding: str = 'utf-8'
def init_serversocket() -> socket.socket:
serversocket = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_STREAM
)
# 获取本地主机名
host = socket.gethostname()
logger.debug(f'host {host}')
port = 6001
# 绑定端口号
serversocket.bind(('0.0.0.0', port))
# 设置最大连接数,超过后排队
serversocket.listen(2048)
return serversocket
def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:
send_len: int = clientsocket.send(response_body)
clientsocket.close()
return send_len
def start_request(clientsocket_fd: int, addr: tuple) -> int:
clientsocket = socket.fromfd(clientsocket_fd, socket.AF_INET, socket.SOCK_STREAM)
os.close(clientsocket_fd)
try:
pid = os.getpid()
logger.debug(f'pid: {pid}, get message from {addr}')
request_body: bytes = clientsocket.recv(2048)
request_text: str = request_body.decode(encoding=default_encoding)
response_text: str = f'server get message: {request_text}'
response_body: bytes = response_text.encode(default_encoding)
# time.sleep(1)
send_len = send_response(
clientsocket=clientsocket, addr=addr, response_body=response_body)
logger.debug(f'发送了响应')
return send_len
except Exception as error:
logger.exception(error)
finally:
clientsocket.close()
def worker_process(clientsocket_fd, addr):
start_request(clientsocket_fd, addr)
if __name__ == "__main__":
serversocket = init_serversocket()
pool = multiprocessing.Pool(processes=16)
while True:
try:
clientsocket, addr = serversocket.accept()
clientsocket_fd = clientsocket.fileno()
pool.apply_async(worker_process, (clientsocket_fd, addr))
except Exception as error:
logger.exception(error)
pool.close()
pool.join()
以上是 为什么使用 python 的进程池处理并发的 TCP 请求,会导致客户端并发卡住? 的全部内容, 来源链接: utcz.com/p/938869.html