为什么 python 的进程池,无法监听同一个端口?
import socketimport sys
import time
import threading
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future
import multiprocessing
default_encoding: str = 'utf-8'
pool = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix='simple-work-thread-pool'
)
def init_serversocket() -> socket.socket:
serversocket = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_STREAM
)
# 获取本地主机名
host = socket.gethostname()
logger.debug(f'host {host}')
port = 6001
# 绑定端口号
serversocket.bind(('0.0.0.0', port))
# 设置最大连接数,超过后排队
serversocket.listen(5)
return serversocket
def send_response(clientsocket: socket.socket, addr: tuple, response_body: bytes) -> int:
send_len: int = clientsocket.send(response_body)
clientsocket.close()
return send_len
def start_request(clientsocket: socket.socket, addr: tuple) -> int:
try:
logger.debug(f'get message from {addr}')
request_body: bytes = clientsocket.recv(2048)
request_text: str = request_body.decode(encoding=default_encoding)
response_text: str = f'server get message: {request_text}'
response_body: bytes = response_text.encode(default_encoding)
time.sleep(1)
return send_response(clientsocket=clientsocket, addr=addr, response_body=response_body)
except Exception as error:
logger.exception(error)
def start_request_callback(future: Future) -> None:
send_len: int = future.result()
logger.debug(
f'{threading.current_thread().name}, send payload len is {send_len}')
serversocket = init_serversocket()
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=2)
while True:
clientsocket, addr = serversocket.accept()
clientsocket: socket.socket
addr: tuple
# future: Future = pool.submit(start_request, clientsocket, addr)
# future.add_done_callback(start_request_callback)
pool.apply_async(start_request, (clientsocket, addr))
pool.close()
pool.join()
Nginx、gunicorn 都是 master-slave 架构的,n 个进程监听同一个端口
所以,我想自己实现一下,就写了上面的代码。(我参考了:https://blog.csdn.net/L13763338360/article/details/106519027,里面说到,只要 fork 子进程之前,bind 就行)
但是,运行上面的代码,报错 OSError: [Errno 48] Address already in use
回答:
serversocket = init_serversocket()
这行代码要写到 main 里面。
回答:
你可以用多线程而不是用多进程来处理并发请求:
from concurrent.futures import ThreadPoolExecutorfrom concurrent.futures._base import Future
# ...
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=2) as pool:
while True:
clientsocket, addr = serversocket.accept()
clientsocket: socket.socket
addr: tuple
future: Future = pool.submit(start_request, clientsocket, addr)
future.add_done_callback(start_request_callback)
方法二:
import socketimport socketserver
import sys
import time
import threading
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import Future
import multiprocessing
default_encoding: str = 'utf-8'
pool = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix='simple-work-thread-pool'
)
host = socket.gethostname()
logger.debug(f'host {host}')
port = 6001
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
try:
logger.debug(f'get message from {self.client_address}')
request_body: bytes = self.request.recv(2048)
request_text: str = request_body.decode(encoding=default_encoding)
response_text: str = f'server get message: {request_text}'
response_body: bytes = response_text.encode(default_encoding)
time.sleep(1)
send_len: int = self.request.send(response_body)
self.request.close()
logger.debug(f'{threading.current_thread().name}, send payload len is {send_len}')
except Exception as error:
logger.exception(error)
def serve_forever(serversocket):
server = socketserver.ThreadingTCPServer(('0.0.0.0', port), MyTCPHandler, bind_and_activate=False)
server.socket = serversocket
server.serve_forever()
serversocket = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_STREAM
)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('0.0.0.0', port))
serversocket.listen(5)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=2)
for _ in range(2):
pool.apply_async(serve_forever, (serversocket,))
pool.close()
pool.join()
以上是 为什么 python 的进程池,无法监听同一个端口? 的全部内容, 来源链接: utcz.com/p/938867.html