为什么使用 python 的进程池处理并发的 TCP 请求,会导致客户端并发卡住?

为什么使用 python 的进程池处理并发的 TCP 请求,会导致客户端并发卡住?

服务端代码:

import os

import socket

import sys

import time

import threading

from loguru import logger

from concurrent.futures import ThreadPoolExecutor

from concurrent.futures._base import Future

import multiprocessing

default_encoding: str = 'utf-8'

pool = ThreadPoolExecutor(

max_workers=20,

thread_name_prefix='simple-work-thread-pool'

)

def init_serversocket() -> socket.socket:

serversocket = socket.socket(

family=socket.AF_INET,

type=socket.SOCK_STREAM

)

# 获取本地主机名

host = socket.gethostname()

logger.debug(f'host {host}')

port = 6001

# 绑定端口号

serversocket.bind(('0.0.0.0', port))

# 设置最大连接数,超过后排队

serversocket.listen(2048)

return serversocket

def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:

send_len: int = clientsocket.send(response_body)

clientsocket.close()

return send_len

def start_request(clientsocket: socket.socket, addr: tuple) -> int:

try:

pid = os.getpid()

logger.debug(f'pid: {pid}, get message from {addr}')

request_body: bytes = clientsocket.recv(2048)

request_text: str = request_body.decode(encoding=default_encoding)

response_text: str = f'server get message: {request_text}'

response_body: bytes = response_text.encode(default_encoding)

# time.sleep(1)

send_len = send_response(

clientsocket=clientsocket, addr=addr, response_body=response_body)

logger.debug(f'发送了响应')

return send_len

except Exception as error:

logger.exception(error)

def start_request_callback(future: Future) -> None:

send_len: int = future.result()

logger.debug(

f'{threading.current_thread().name}, send payload len is {send_len}')

if __name__ == "__main__":

serversocket = init_serversocket()

pool = multiprocessing.Pool(processes=16)

while True:

clientsocket, addr = serversocket.accept()

clientsocket: socket.socket

addr: tuple

# future: Future = pool.submit(start_request, clientsocket, addr)

# future.add_done_callback(start_request_callback)

pool.apply_async(start_request, (clientsocket, addr))

pool.close()

pool.join()

服务端使用进程池并发处理来自客户端的 TCP 请求

客户端代码:

python">from base64 import encode

import socket # 客户端 发送一个数据,再接收一个数据

import json

from loguru import logger

from concurrent.futures import ThreadPoolExecutor

failture_requests = []

def send_request(index:int):

try:

# 声明socket类型,同时生成链接对象

clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

clientsocket.connect(('127.0.0.1', 6001)) # 建立一个链接,连接到本地的6969端口

payload = b'ponponon'

clientsocket.send(payload)

data = clientsocket.recv(1024)

payload = data.decode()

logger.debug(index)

clientsocket.close()

logger.debug('请求完成')

except Exception as error:

failture_requests.append(index)

logger.exception(error)

pool = ThreadPoolExecutor(max_workers=2)

for index in range(100):

pool.submit(send_request,index)

pool.shutdown(wait=True)

logger.debug(failture_requests)

logger.debug(len(failture_requests))

因为我的客户端使用线程池并发,只要 max_workers 大于 1 就会一直卡死

测试平台是 macos 13.3.1 (22E261) + python3.10.10,会卡死
但是我在 ubuntu20.04 + python3.11.3 上就是一切正常,不会出现卡死的问题

但是 max_workers 是 1 就没有问题,我怀疑问题出在服务端,但是具体原因我分析不出来

服务端如果用线程池,就一切正常,服务端和客户端都正常。
但是我只想用多进程


回答:

import os

import socket

import sys

import time

import threading

from loguru import logger

from concurrent.futures import ThreadPoolExecutor

from concurrent.futures._base import Future

import multiprocessing

default_encoding: str = 'utf-8'

def init_serversocket() -> socket.socket:

serversocket = socket.socket(

family=socket.AF_INET,

type=socket.SOCK_STREAM

)

# 获取本地主机名

host = socket.gethostname()

logger.debug(f'host {host}')

port = 6001

# 绑定端口号

serversocket.bind(('0.0.0.0', port))

# 设置最大连接数,超过后排队

serversocket.listen(2048)

return serversocket

def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:

send_len: int = clientsocket.send(response_body)

clientsocket.close()

return send_len

def start_request(clientsocket_fd: int, addr: tuple) -> int:

clientsocket = socket.fromfd(clientsocket_fd, socket.AF_INET, socket.SOCK_STREAM)

os.close(clientsocket_fd)

try:

pid = os.getpid()

logger.debug(f'pid: {pid}, get message from {addr}')

request_body: bytes = clientsocket.recv(2048)

request_text: str = request_body.decode(encoding=default_encoding)

response_text: str = f'server get message: {request_text}'

response_body: bytes = response_text.encode(default_encoding)

# time.sleep(1)

send_len = send_response(

clientsocket=clientsocket, addr=addr, response_body=response_body)

logger.debug(f'发送了响应')

return send_len

except Exception as error:

logger.exception(error)

finally:

clientsocket.close()

def worker_process(clientsocket_fd, addr):

start_request(clientsocket_fd, addr)

if __name__ == "__main__":

serversocket = init_serversocket()

pool = multiprocessing.Pool(processes=16)

while True:

try:

clientsocket, addr = serversocket.accept()

clientsocket_fd = clientsocket.fileno()

pool.apply_async(worker_process, (clientsocket_fd, addr))

except Exception as error:

logger.exception(error)

pool.close()

pool.join()

以上是 为什么使用 python 的进程池处理并发的 TCP 请求,会导致客户端并发卡住? 的全部内容, 来源链接: utcz.com/p/938869.html

回到顶部