python 多进程队列数据处理详解

我就废话不多说了,直接上代码吧!

# -*- coding:utf8 -*-

import paho.mqtt.client as mqtt

from multiprocessing import Process, Queue

import time, random, os

import camera_person_num

MQTTHOST = "172.19.4.4"

MQTTPORT = 1883

mqttClient = mqtt.Client()

q = Queue()

# 连接MQTT服务器

def on_mqtt_connect():

mqttClient.connect(MQTTHOST, MQTTPORT, 60)

mqttClient.loop_start()

# 消息处理函数

def on_message_come(lient, userdata, msg):

# print(msg.topic + ":" + str(msg.payload.decode("utf-8")))

q.put(msg.payload.decode("utf-8")) # 放入队列

print("产生消息", msg.payload.decode("utf-8"))

# 消息处理开启多进程

# p = Process(target=talk, args=("/camera/person/num/result", msg.payload.decode("utf-8")))

# p.start()

def consumer(q, pid):

print("开启消费序列进程", pid)

while True:

msg = q.get()

# p = Process(target=talk, args=("/camera/person/num/result", msg, pid))

# p.start()

talk("/camera/person/num/result", msg, pid)

# subscribe 消息订阅

def on_subscribe():

mqttClient.subscribe("test123", 1) # 主题为"test"

mqttClient.on_message = on_message_come # 消息到来处理函数

# publish 消息发布

def on_publish(topic, msg, qos):

mqttClient.publish(topic, msg, qos);

# 多进程中发布消息需要重新初始化mqttClient

def talk(topic, msg, pid):

cameraPsersonNum = camera_person_num.CameraPsersonNum(msg)

t_max, t_mean, t_min = cameraPsersonNum.personNum()

# time.sleep(20)

print("消费消息", pid, msg)

mqttClient2 = mqtt.Client()

mqttClient2.connect(MQTTHOST, MQTTPORT, 60)

mqttClient2.loop_start()

mqttClient2.publish(topic, '{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}', 1)

mqttClient2.disconnect()

def main():

on_mqtt_connect()

on_subscribe()

for i in range(1, 3):

c1 = Process(target=consumer, args=(q, i))

c1.start()

while True:

pass

if __name__ == '__main__':

main()

以上这篇python 多进程队列数据处理详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。

以上是 python 多进程队列数据处理详解 的全部内容, 来源链接: utcz.com/z/339247.html

回到顶部