Python的多线程服务器可以在同一时间

我玩弄了解多线程处理一个客户端的消息,所以我写了下面的客户机/服务器应用程序,其中,服务器发送命令给客户端,客户端检查此命令,如果它等于到'a'它发送一个回复到服务器。Python的多线程服务器可以在同一时间

在我创建了两个插槽和一个线程的服务器代码;第一个套接字将命令发送(发布)到所有连接(订阅)的客户端。在线程第二插座等待来自客户的任何答复,但因为线程执行一些阻塞的操作(如存储客户端在数据库中发送的信息),它可以在同一时间,即使插座(REQ-REP处理一个客户端套接字)可以同时接收多条消息。

server.py

import zmq 

import logging

import threading

import time

logging.basicConfig(level=logging.DEBUG)

class Server(object):

def __init__(self):

self.context = zmq.Context()

self.pub_port = 7777

self.rep_port = 7778

self.pub_socket = None

self.rep_socket = None

self.interface = "*"

def bind_ports(self):

logging.debug("[bind_ports] binding the ports....")

self.pub_socket = self.context.socket(zmq.PUB)

pub_bind_str = "tcp://{}:{}".format(self.interface, self.pub_port)

self.pub_socket.bind(pub_bind_str)

self.rep_socket = self.context.socket(zmq.REP)

rep_bind_str = "tcp://{}:{}".format(self.interface, self.rep_port)

self.rep_socket.bind(rep_bind_str)

def received_info(self):

while True:

# logging.debug("[received_flow] ")

cl_data = self.rep_socket.recv_json()

logging.info("[received_data] data <{}>".format(flow))

self.rep_socket.send(b"\x00")

self.blocking_op(cl_data)

def blocking_op(self, data):

time.sleep(1) # simulating some blocking operations e.g. storing info in a database

def push_instruction(self, cmd):

logging.debug("[push_inst] Sending the instruction <%s> to the clients...",

# logging.debug("[push_inst] Sending the instruction <%s> to the agents ...",

cmd)

instruction = {"cmd": cmd}

self.pub_socket.send_json(instruction)

def create_thread(self):

thread = threading.Thread(target=self.received_info)

thread.daemon = True

thread.start()

logging.debug("[create_thread] Thread created <{}>".format(

thread.is_alive()))

def start_main_loop(self):

logging.debug("[start_main_loop] Loop started....")

self.bind_ports()

self.create_thread()

while True:

cmd = input("Enter your command: ")

self.push_instruction(cmd)

if __name__ == "__main__":

Server().start_main_loop()

client.py

import zmq 

import logging

import random

import time

logging.basicConfig(level=logging.DEBUG)

class Client(object):

def __init__(self):

self.context = zmq.Context()

self.sub_socket = None

self.req_socket = None

self.pub_port = 7777

self.req_port = 7778

self.server_ip = 'localhost'

self.client_id = ""

def connect_to_server(self):

logging.debug("[conn_to_serv] Connecting to the server ....")

self.sub_socket = self.context.socket(zmq.SUB)

self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")

conn_str = "tcp://{}:{}".format(self.server_ip, self.pub_port)

self.sub_socket.connect(conn_str)

self.req_socket = self.context.socket(zmq.REQ)

req_conn_str = "tcp://{}:{}".format(self.server_ip, self.req_port)

self.req_socket.connect(req_conn_str)

def get_instruction(self):

inst = self.sub_socket.recv_json()

logging.debug("[get_inst] Server sent inst")

cmd = inst["cmd"]

return cmd

def send_flow(self, x, y):

flow = {

"client_id": self.client_id,

"x": x,

"y": y

}

self.req_socket.send_json(flow)

def start_main_loop(self):

logging.debug("starting the main loop ....")

self.client_id = input("What is your id: ")

self.connect_to_server()

while True:

inst = self.get_instruction()

logging.info("[Main_loop] inst<{}>".format(inst))

if inst == "a":

# time.sleep(random.uniform(.6, 1.5))

self.send_flow("xxx", "yyy")

self.req_socket.recv()

logging.debug("[main_loop] server received the flow")

if __name__ == "__main__":

Client().start_main_loop()

我将不胜感激,如果有人能帮助我提高了服务器,以便它可以在为多个客户的信息同时。

回答:

我是不是能够运行您的代码和测试,但如果你的问题是receive_info()拦截,你会绕过通过启动一个线程来处理实际的响应。像这样的东西(可能包含错别字,我不能与你的代码来测试 - 例如不知道什么flow是)

def handle_response(self, data): 

logging.info("[received_data] data <{}>".format(flow))

self.rep_socket.send(b"\x00")

self.blocking_op(data)

def received_info(self):

while True:

# logging.debug("[received_flow] ")

cl_data = self.rep_socket.recv_json()

_t = threading.Thread(target=self.handle_response, args=(cl_data,))

_t.start()

这有你的received_info()循环,因为它是,而是做着处理在那里,启动一个新线程来处理响应。它需要花费什么才能完成,然后线程死亡,但是您的received_info()将立即准备好等待新的响应。

以上是 Python的多线程服务器可以在同一时间 的全部内容, 来源链接: utcz.com/qa/262330.html

回到顶部