pythonParamiko的SSH用法
说明
1、将所有设备信息写入文本文档。
简单地使用txt,将登录信息构建成字典。
2、初始化SSH连接和执行命令。
3、分析此需求指定的命令和输出结果。
将结果存储在文件中。
4、增加多线程执行。
提高效率。
5、添加Linux的crontab。
每小时收集一次信息(服务器配置)
实例
#!/usr/bin/env python# -*- coding:utf-8 -*-
import re
import time
from concurrent.futures import ThreadPoolExecutor
import paramiko
def get_device_list(filename):
"""从文本文件读取设备列表,返回由字典组成的列表。
文本内容格式为:ip,用户名,密码,别名,例如:
1.1.1.1 admin admin sw1
1.1.1.2 admin admin sw2
......
Args:
filename ([str]): 文件名称
"""
with open(filename, 'r') as f:
device_list = []
for line in f.readlines():
ip, username, password, name = line.strip().split()
device_list.append(
{
"ip": ip,
"username": username,
"password": password,
"name": name,
}
)
return device_list
class NetworkDevice(object):
def __init__(self, ip="", username="", password="'", name="", port=22,):
self.conn = None
if ip:
self.ip = ip.strip()
elif name:
self.name = name.strip()
else:
raise ValueError("需要设备连接地址(ip 或 别名)")
self.port = int(port)
self.username = username
self.password = password
self._open_ssh()
def _open_ssh(self):
"""初始化 SSH 连接,调起一个模拟终端,会话结束前可以一直执行命令。
Raises:
e: 抛出 paramiko 连接失败的任何异常
"""
ssh_connect_params = {
"hostname": self.ip,
"port": self.port,
"username": self.username,
"password": self.password,
"look_for_keys": False,
"allow_agent": False,
"timeout": 5, # TCP 连接超时时间
}
conn = paramiko.SSHClient()
conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
conn.connect(**ssh_connect_params)
except Exception as e:
raise e
self.conn = conn.invoke_shell(term="vt100", width=500, height=1000)
return ""
def exec_cmd(self, cmd, recv_time=3):
"""登录设备,执行命令
Args:
cmd ([type]): 命令字符串
recv_time (int, optional): 读取回显信息的超时时间. Defaults to 3.
Raises:
EOFError: 没有任何信息输出,说明连接失败。
Returns:
output:
"""
cmd = cmd.strip() + "\n"
self.conn.sendall("screen disable\n")
self.conn.sendall(cmd)
time.sleep(int(recv_time))
output = self.conn.recv(1024*1024)
if len(output) == 0:
raise EOFError("连接可能被关闭,没有任何信息输出")
return output.decode('utf-8', 'ignore')
dev = {
"ip":"192.168.56.21",
"username":"netdevops",
"password":"Admin@h3c.com",
"name": "sw1"
}
# sw1 = NetworkDevice(**dev)
# ret = sw1.exec_cmd("dis version")
# print(ret)
def parse_interface_drop(output):
"""把设备的输出队列丢包信息解析成累加值
命令及输出示例如下:
# [H3C]dis qos queue-statistics interface outbound | in "^ Drop"
# Dropped: 0 packets, 0 bytes
"""
ptn = re.compile(r"\s(\S+):\s+(\d+)\s+(\S+),\s+(\d+)\s+(\S+)")
count = 0
for i in ptn.findall(output):
count += int(i[1])
return count
def run(cmd, **conn_parms):
"""登录单台设备,执行指定命令,解析丢包统计
"""
sw = NetworkDevice(**conn_parms)
output = sw.exec_cmd(cmd)
drop_count = parse_interface_drop(output)
return "%s %s %s"%(
conn_parms.get("name"),
conn_parms.get("ip"),
drop_count)
# cmd = r'dis qos queue-statistics interface outbound | in "^ Drop"'
# ret = run(cmd,**dev)
# print(ret)
if __name__== "__main__":
"""获取设备列表,使用多线程登录设备获取信息并返回
"""
with ThreadPoolExecutor(10) as pool:
futures = []
cmd = r'dis qos queue-statistics interface outbound | in "^ Drop"'
dev_info = get_device_list("./iplist.txt")
for d in dev_info:
future = pool.submit(run, cmd, **d)
futures.append(future)
# for f in futures:
# print(f.result())
# 根据执行时间把结果写入文件,精确到小时
with open("./drops/%s.log"%time.strftime("%Y%m%d_%H"),'w') as f:
for line in futures:
f.write(line.result() + "\n")
以上就是python Paramiko的SSH用法,希望对大家有所帮助。更多Python学习指路:python基础教程
本文教程操作环境:windows7系统、Python 3.9.1,DELL G3电脑。
以上是 pythonParamiko的SSH用法 的全部内容, 来源链接: utcz.com/z/545828.html