基于MQTT(EMQ)的消息发布订阅-python实现

python

python代码实现 

安装:pip install paho-mqtt

实现Publish-发送消息:

 1 #!/usr/bin/env python  

2 # encoding: utf-8

3 """

4 @version: v1.0

5 @author: W_H_J

6 @license: Apache Licence

7 @contact: 415900617@qq.com

8 @software: PyCharm

9 @file: clicentMqttTest.py

10 @time: 2019/2/22 14:19

11 @describe: mqtt客户端

12 """

13 import json

14 import sys

15 import os

16 import paho.mqtt.client as mqtt

17 import time

18

19 sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))

20 sys.path.append("..")

21

22 TASK_TOPIC = 'test' # 客户端发布消息主题

23

24 client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))

25 """

26 client_id是连接到代理。如果client_id的长度为零或为零,则行为为由使用的协议版本定义。如果使用MQTT v3.1.1,

27 那么一个零长度的客户机id将被发送到代理,代理将被发送为客户端生成一个随机变量。如果使用MQTT v3.1,那么id将是

28 随机生成的。在这两种情况下,clean_session都必须为True。如果这在这种情况下不会产生ValueError。

29 注意:一般情况下如果客户端服务端启用两个监听那么客户端client_id 不能与服务器相同,如这里用时间"20190222142358"作为它的id,

30 如果与服务器id相同,则无法接收到消息

31 """

32 client = mqtt.Client(client_id, transport='tcp')

33

34 client.connect("127.0.0.1", 1883, 60) # 此处端口默认为1883,通信端口期keepalive默认60

35 client.loop_start()

36

37

38 def clicent_main(message: str):

39 """

40 客户端发布消息

41 :param message: 消息主体

42 :return:

43 """

44 time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))

45 payload = {"msg": "%s" % message, "data": "%s" % time_now}

46 # publish(主题:Topic; 消息内容)

47 client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False))

48 print("Successful send message!")

49 return True

50

51

52 if __name__ == '__main__':

53 msg = "我是一条测试数据!"

54 clicent_main(msg)

View Code

实现Subscribe-订阅

 1 #!/usr/bin/env python  

2 # encoding: utf-8

3 """

4 @version: v1.0

5 @author: W_H_J

6 @license: Apache Licence

7 @contact: 415900617@qq.com

8 @software: PyCharm

9 @file: serverMqttTest.py

10 @time: 2019/2/22 14:35

11 @describe: mqtt 服务端

12 """

13 import json

14 import sys

15 import os

16 import time

17 import paho.mqtt.client as mqtt

18 sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))

19 sys.path.append("..")

20

21 REPORT_TOPIC = 'test' # 主题

22

23

24 def on_connect(client, userdata, flags, rc):

25 print('connected to mqtt with resurt code ', rc)

26 client.subscribe(REPORT_TOPIC) # 订阅主题

27

28

29 def on_message(client, userdata, msg):

30 """

31 接收客户端发送的消息

32 :param client: 连接信息

33 :param userdata:

34 :param msg: 客户端返回的消息

35 :return:

36 """

37 print("Start server!")

38 payload = json.loads(msg.payload.decode('utf-8'))

39 print(payload)

40

41

42 def server_conenet(client):

43 client.on_connect = on_connect # 启用订阅模式

44 client.on_message = on_message # 接收消息

45 client.connect("127.0.0.1", 1883, 60) # 链接

46 # client.loop_start() # 以start方式运行,需要启动一个守护线程,让服务端运行,否则会随主线程死亡

47 client.loop_forever() # 以forever方式阻塞运行。

48

49

50 def server_stop(client):

51 client.loop_stop() # 停止服务端

52 sys.exit(0)

53

54

55 def server_main():

56 client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))

57 client = mqtt.Client(client_id, transport='tcp')

58 server_conenet(client)

59

60

61 if __name__ == '__main__':

62 # 启动监听

63 server_main()

View Code

--------------------- 原作者写得非常好,记下来,怕自己以后找不到

作者:凉城的夜
来源:CSDN
原文:https://blog.csdn.net/sinat_32651363/article/details/87876978
版权声明:本文为博主原创文章,转载请附上博文链接!

以上是 基于MQTT(EMQ)的消息发布订阅-python实现 的全部内容, 来源链接: utcz.com/z/387152.html

回到顶部