为什么 python 的进程池,无法监听同一个端口?

为什么 python 的进程池,无法监听同一个端口?

import socket

import sys

import time

import threading

from loguru import logger

from concurrent.futures import ThreadPoolExecutor

from concurrent.futures._base import Future

import multiprocessing

default_encoding: str = 'utf-8'

pool = ThreadPoolExecutor(

max_workers=20,

thread_name_prefix='simple-work-thread-pool'

)

def init_serversocket() -> socket.socket:

serversocket = socket.socket(

family=socket.AF_INET,

type=socket.SOCK_STREAM

)

# 获取本地主机名

host = socket.gethostname()

logger.debug(f'host {host}')

port = 6001

# 绑定端口号

serversocket.bind(('0.0.0.0', port))

# 设置最大连接数,超过后排队

serversocket.listen(5)

return serversocket

def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:

send_len: int = clientsocket.send(response_body)

clientsocket.close()

return send_len

def start_request(clientsocket: socket.socket, addr: tuple) -> int:

try:

logger.debug(f'get message from {addr}')

request_body: bytes = clientsocket.recv(2048)

request_text: str = request_body.decode(encoding=default_encoding)

response_text: str = f'server get message: {request_text}'

response_body: bytes = response_text.encode(default_encoding)

time.sleep(1)

return send_response(clientsocket=clientsocket, addr=addr, response_body=response_body)

except Exception as error:

logger.exception(error)

def start_request_callback(future: Future) -> None:

send_len: int = future.result()

logger.debug(

f'{threading.current_thread().name}, send payload len is {send_len}')

serversocket = init_serversocket()

if __name__ == "__main__":

pool = multiprocessing.Pool(processes=2)

while True:

clientsocket, addr = serversocket.accept()

clientsocket: socket.socket

addr: tuple

# future: Future = pool.submit(start_request, clientsocket, addr)

# future.add_done_callback(start_request_callback)

pool.apply_async(start_request, (clientsocket, addr))

pool.close()

pool.join()

Nginx、gunicorn 都是 master-slave 架构的,n 个进程监听同一个端口

所以,我想自己实现一下,就写了上面的代码。(我参考了:https://blog.csdn.net/L13763338360/article/details/106519027,里面说到,只要 fork 子进程之前,bind 就行)

但是,运行上面的代码,报错 OSError: [Errno 48] Address already in use


回答:

serversocket = init_serversocket() 这行代码要写到 main 里面。


回答:

你可以用多线程而不是用多进程来处理并发请求:

from concurrent.futures import ThreadPoolExecutor

from concurrent.futures._base import Future

# ...

if __name__ == "__main__":

with ThreadPoolExecutor(max_workers=2) as pool:

while True:

clientsocket, addr = serversocket.accept()

clientsocket: socket.socket

addr: tuple

future: Future = pool.submit(start_request, clientsocket, addr)

future.add_done_callback(start_request_callback)

方法二:

import socket

import socketserver

import sys

import time

import threading

from loguru import logger

from concurrent.futures import ThreadPoolExecutor

from concurrent.futures._base import Future

import multiprocessing

default_encoding: str = 'utf-8'

pool = ThreadPoolExecutor(

max_workers=20,

thread_name_prefix='simple-work-thread-pool'

)

host = socket.gethostname()

logger.debug(f'host {host}')

port = 6001

class MyTCPHandler(socketserver.BaseRequestHandler):

def handle(self):

try:

logger.debug(f'get message from {self.client_address}')

request_body: bytes = self.request.recv(2048)

request_text: str = request_body.decode(encoding=default_encoding)

response_text: str = f'server get message: {request_text}'

response_body: bytes = response_text.encode(default_encoding)

time.sleep(1)

send_len: int = self.request.send(response_body)

self.request.close()

logger.debug(f'{threading.current_thread().name}, send payload len is {send_len}')

except Exception as error:

logger.exception(error)

def serve_forever(serversocket):

server = socketserver.ThreadingTCPServer(('0.0.0.0', port), MyTCPHandler, bind_and_activate=False)

server.socket = serversocket

server.serve_forever()

serversocket = socket.socket(

family=socket.AF_INET,

type=socket.SOCK_STREAM

)

serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

serversocket.bind(('0.0.0.0', port))

serversocket.listen(5)

if __name__ == "__main__":

pool = multiprocessing.Pool(processes=2)

for _ in range(2):

pool.apply_async(serve_forever, (serversocket,))

pool.close()

pool.join()

以上是 为什么 python 的进程池,无法监听同一个端口? 的全部内容, 来源链接: utcz.com/p/938867.html

回到顶部