Python 实现WebSocket通信

python

在网上转载的一个使用Python实现的WebSocket通信案例,放在这里只作为留档。

import socket,struct,hashlib,base64

def get_headers(data):

headers = {}

data = str(data, encoding="utf-8")

header, body = data.split("\r\n\r\n", 1)

header_list = header.split("\r\n")

for i in header_list:

i_list = i.split(":", 1)

if len(i_list) >= 2:

headers[i_list[0]] = "".join(i_list[1::]).strip()

else:

i_list = i.split(" ", 1)

if i_list and len(i_list) == 2:

headers["method"] = i_list[0]

headers["protocol"] = i_list[1]

return headers

def parse_payload(payload):

payload_len = payload[1] & 127

if payload_len == 126:

mask = payload[4:8]

decoded = payload[8:]

elif payload_len == 127:

mask = payload[10:14]

decoded = payload[14:]

else:

mask = payload[2:6]

decoded = payload[6:]

bytes_list = bytearray()

for i in range(len(decoded)):

chunk = decoded[i] ^ mask[i % 4]

bytes_list.append(chunk)

body = str(bytes_list, encoding='utf-8')

return body

def send_msg(conn, msg_bytes):

token = b"\x81"

length = len(msg_bytes)

if length < 126:

token += struct.pack("B", length)

elif length <= 0xFFFF:

token += struct.pack("!BH", 126, length)

else:

token += struct.pack("!BQ", 127, length)

msg = token + msg_bytes

conn.sendall(msg)

return True

def recv_msg(conn):

recv = conn.recv(8096)

ref_string = parse_payload(recv)

return ref_string

if __name__ == "__main__":

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

sock.bind(("127.0.0.1", 10083))

sock.listen(5)

conn, addr = sock.accept()

data = conn.recv(8096)

headers = get_headers(data)

# 对请求头中的sec-websocket-key进行加密

response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \

"Upgrade:websocket\r\n" \

"Connection: Upgrade\r\n" \

"Sec-WebSocket-Accept: %s\r\n" \

"WebSocket-Location: ws://%s\r\n\r\n"

magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'

if headers.get('Sec-WebSocket-Key'):

value = headers['Sec-WebSocket-Key'] + magic_string

ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest())

response_str = response_tpl % (ac.decode('utf-8'), headers.get("Host"))

conn.sendall(bytes(response_str, encoding="utf-8"))

while True:

print(recv_msg(conn))

send_msg(conn, b"hello lyshark")

多线程

import socket,struct,hashlib,base64

import threading

def get_headers(data):

headers = {}

data = str(data, encoding="utf-8")

header, body = data.split("\r\n\r\n", 1)

header_list = header.split("\r\n")

for i in header_list:

i_list = i.split(":", 1)

if len(i_list) >= 2:

headers[i_list[0]] = "".join(i_list[1::]).strip()

else:

i_list = i.split(" ", 1)

if i_list and len(i_list) == 2:

headers["method"] = i_list[0]

headers["protocol"] = i_list[1]

return headers

def parse_payload(payload):

payload_len = payload[1] & 127

if payload_len == 126:

mask = payload[4:8]

decoded = payload[8:]

elif payload_len == 127:

mask = payload[10:14]

decoded = payload[14:]

else:

mask = payload[2:6]

decoded = payload[6:]

bytes_list = bytearray()

for i in range(len(decoded)):

chunk = decoded[i] ^ mask[i % 4]

bytes_list.append(chunk)

body = str(bytes_list, encoding='utf-8')

return body

def send_msg(conn, msg_bytes):

first_byte = b"\x81"

length = len(msg_bytes)

if length < 126:

first_byte += struct.pack("B", length)

elif length <= 0xFFFF:

first_byte += struct.pack("!BH", 126, length)

else:

first_byte += struct.pack("!BQ", 127, length)

msg = first_byte + msg_bytes

conn.sendall(msg)

return True

def recv_msg(conn):

data_recv = conn.recv(8096)

if data_recv[0:1] == b"\x81":

data_parse = parse_payload(data_recv)

return data_parse

return False

def handler_accept(sock):

while True:

conn, addr = sock.accept()

data = conn.recv(8096)

headers = get_headers(data)

response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \

"Upgrade:websocket\r\n" \

"Connection: Upgrade\r\n" \

"Sec-WebSocket-Accept: %s\r\n" \

"WebSocket-Location: ws://%s\r\n\r\n"

magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'

if headers.get('Sec-WebSocket-Key'):

value = headers['Sec-WebSocket-Key'] + magic_string

ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest())

response_str = response_tpl % (ac.decode('utf-8'), headers.get("Host"))

conn.sendall(bytes(response_str, encoding="utf-8"))

t = threading.Thread(target=handler_msg, args=(conn, ))

t.start()

def handler_msg(conn):

with conn as c:

while True:

try:

recv = recv_msg(c)

print("接收数据: {}".format(recv))

send_msg(c, bytes("hello lyshark", encoding="utf-8"))

except Exception:

exit(0)

if __name__ == "__main__":

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

sock.bind(("127.0.0.1", 10083))

sock.listen(5)

t = threading.Thread(target=handler_accept(sock))

t.start()

前端

<!DOCTYPE html>

<html>

<head lang="en">

<meta charset="UTF-8">

<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.js"></script>

</head>

<body>

<ul ></ul>

<form class="form">

<input type="text" placeholder="请输入发送的消息" class="message" />

<input type="button" value="发送" />

<input type="button" value="连接" />

</form>

<script>

var oUl=document.getElementById('content');

var oConnect=document.getElementById('connect');

var oSend=document.getElementById('send');

var oInput=document.getElementById('message');

var ws=null;

oConnect.onclick=function(){

ws=new WebSocket('ws://127.0.0.1:10083');

ws.onopen=function(){

oUl.innerHTML+="<li>客户端已连接</li>";

}

ws.onmessage=function(evt){

console.log("fdsa")

oUl.innerHTML+="<li>"+evt.data+"</li>";

}

ws.onclose=function(){

oUl.innerHTML+="<li>客户端已断开连接</li>";

};

ws.onerror=function(evt){

oUl.innerHTML+="<li>"+evt.data+"</li>";

};

};

oSend.onclick=function(){

if(ws){

ws.send($("#message").val())

}

}

</script>

</body>

</html>


改进,监控图形变化:

def handler_msg(conn):

with conn as c:

while True:

try:

times = time.strftime("%M:%S", time.localtime())

data = psutil.cpu_percent(interval=None, percpu=True)

print("处理时间: {} --> 处理负载: {}".format(times,data))

send_msg(c, bytes(json.dumps({"response":[times,data]}), encoding="utf-8"))

time.sleep(60)

except Exception:

exit(0)

if __name__ == "__main__":

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

sock.bind(("127.0.0.1", 10083))

sock.listen(5)

t = threading.Thread(target=handler_accept(sock))

t.start()

客户端

<script type="text/javascript" src="https://cdn.bootcss.com/echarts/4.6.0/echarts.min.js"></script>

<center><div ></div></center>

<script type="text/javascript" charset="UTF-8">

var display = function(time,cpu) {

var main = echarts.init(document.getElementById(("main")));

var option = {

xAxis: {

type: 'category',

data: time

},

yAxis: {

type: 'value'

},

series: [{

type: 'line',

smooth:0.3,

symbol: 'none',

color: 'blue',

smooth: true,

areaStyle: {

color: '#0000CD',

origin: 'start',

opacity: 0.5

},

data: cpu

}]

};

main.setOption(option,true);

};

</script>

<script type="text/javascript" charset="UTF-8">

var ws=new WebSocket('ws://127.0.0.1:10083');

var time =["","","","","","","","","",""];

var cpu = [0,0,0,0,0,0,0,0,0,0];

ws.onmessage=function(evt)

{

var recv = JSON.parse(evt.data);

time.push(recv.response[0]);

cpu.push(parseFloat(recv.response[1]));

if(time.length >=10){

time.shift();

cpu.shift();

console.log("时间:" + time + " --> CPU数据: " + cpu);

display(time,cpu)

}

}

</script>

实现远程命令执行

<html>

<head>

<meta charSet="utf-8">

<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1">

<link rel="stylesheet" href="https://cdn.lyshark.com/xterm/xterm.css"/>

<script src="https://cdn.lyshark.com/xterm/xterm.js"></script>

<script src="https://cdn.lyshark.com/jquery/3.5.1/jquery.min.js" type="text/javascript"></script>

</head>

<body>

<div ></div>

<input type="text" />

<input type="text" />

<input type="button" >

<!--实现格式化字符串-->

<script type="text/javascript">

$.format = function(source, params)

{

if (arguments.length == 1) return function()

{

var args = $.makeArray(arguments);

args.unshift(source);

return $.format.apply(this, args);

};

if (arguments.length > 2 && params.constructor != Array)

{

params = $.makeArray(arguments).slice(1);

}

if (params.constructor != Array)

{

params = [params];

}

$.each(params,

function(i, n)

{

source = source.replace(new RegExp("\\{" + i + "\\}", "g"), n);

});

return source;

};

</script>

<!--打开终端,并开始执行命令-->

<script type="text/javascript">

$(function(){

var window_width = $(window).width()-200;

var window_height = $(window).height()-300;

var term = new Terminal(

{

cols: Math.floor(window_width/9),

rows: Math.floor(window_height/20),

convertEol: true,

cursorBlink:false,

});

var sock = new WebSocket("ws://127.0.0.1:10083");

sock.onopen = function () {

term.open(document.getElementById('terminal'));

console.log('WebSocket Open');

};

sock.onmessage = function (recv) {

var data = JSON.parse(recv.data);

console.log(data['addr'] + ' -- ' + data['status']);

var temp = "\x1B[1;3;35m 地址:[ {0} ] \x1B[0m --> \x1B[1;3;33m 状态:[ {1} ] \x1B[0m";

var string = $.format(temp, data['addr'],data['status']);

term.writeln(string);

};

$('#send_message').click(function () {

var message ={"address":null,"command":null};

message['address'] = $("#address").val();

message['command'] = $("#command").val();

var send_data = JSON.stringify(message);

window.sock.send(send_data);

});

window.sock = sock;

});

</script>

后端

def CalculationIP(Addr_Count):

ret = []

try:

IP_Start = str(Addr_Count.split("-")[0]).split(".")

IP_Heads = str(IP_Start[0] + "." + IP_Start[1] + "." + IP_Start[2] +".")

IP_Start_Range = int(Addr_Count.split(".")[3].split("-")[0])

IP_End_Range = int(Addr_Count.split("-")[1])

for item in range(IP_Start_Range,IP_End_Range+1):

ret.append(IP_Heads+str(item))

return ret

except Exception:

return 0

def handler_msg(conn):

with conn as c:

while True:

try:

ref_json = eval(recv_msg(c))

address = ref_json.get("address")

command = ref_json.get("command")

address_list = CalculationIP(address)

for ip in address_list:

response = {'addr': ip, 'status': 'success'}

print("对主机: {} --> 执行: {}".format(ip,command))

send_msg(c, bytes(json.dumps(response) , encoding="utf-8"))

time.sleep(1)

except Exception:

exit(0)

以上是 Python 实现WebSocket通信 的全部内容, 来源链接: utcz.com/z/387074.html

回到顶部