1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# -*- coding: utf-8 -*- import paramiko import threading def run(host_ip, username, password, command): ssh = paramiko.SSHClient() try : ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host_ip, 22 , username, password) print ( '===================exec on [%s]=====================' % host_ip) stdin, stdout, stderr = ssh.exec_command(command, timeout = 300 ) out = stdout.readlines() for o in out: print (o.strip( '\n' )) except Exception as ex: print ( 'error, host is [%s], msg is [%s]' % (host_ip, ex.message)) finally : ssh.close() if __name__ = = '__main__' : # 将需要批量执行命令的host ip地址填到这里 # eg: host_ip_list = ['IP1', 'IP2'] host_ip_list = [ '147.116.20.19' ] for _host_ip in host_ip_list: # 用户名,密码,执行的命令填到这里 run(_host_ip, 'tzgame' , 'tzgame@1234' , 'df -h' ) run(_host_ip, 'tzgame' , 'tzgame@1234' , 'ping -c 5 220.181.38.148' ) |
pycrypto,由于 paramiko 模块内部依赖pycrypto,所以先下载安装pycrypto
1
2
|
pip3 install pycrypto pip3 install paramiko |
(1)基于用户名和密码的连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import paramiko # 创建SSH对象 ssh = paramiko.SSHClient() # 允许连接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接服务器 ssh.connect(hostname = 'c1.salt.com' , port = 22 , username = 'GSuser' , password = '123' ) # 执行命令 stdin, stdout, stderr = ssh.exec_command( 'ls' ) # 获取命令结果 result = stdout.read() # 关闭连接 ssh.close() |
(2)基于公钥秘钥连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import paramiko private_key = paramiko.RSAKey.from_private_key_file( '/home/auto/.ssh/id_rsa' ) # 创建SSH对象 ssh = paramiko.SSHClient() # 允许连接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接服务器 ssh.connect(hostname = 'c1.salt.com' , port = 22 , username = 'wupeiqi' , key = private_key) # 执行命令 stdin, stdout, stderr = ssh.exec_command( 'df' ) # 获取命令结果 result = stdout.read() # 关闭连接 ssh.close() |
SFTPClient:
用于连接远程服务器并进行上传下载功能。
(1)基于用户名密码上传下载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import paramiko transport = paramiko.Transport(( 'hostname' , 22 )) transport.connect(username = 'GSuser' ,password = '123' ) sftp = paramiko.SFTPClient.from_transport(transport) # 将location.py 上传至服务器 /tmp/test.py sftp.put( '/tmp/location.py' , '/tmp/test.py' ) # 将remove_path 下载到本地 local_path sftp.get( 'remove_path' , 'local_path' ) transport.close() |
(2)基于公钥秘钥上传下载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import paramiko private_key = paramiko.RSAKey.from_private_key_file( '/home/auto/.ssh/id_rsa' ) transport = paramiko.Transport(( 'hostname' , 22 )) transport.connect(username = 'GSuser' , pkey = private_key ) sftp = paramiko.SFTPClient.from_transport(transport) # 将location.py 上传至服务器 /tmp/test.py sftp.put( '/tmp/location.py' , '/tmp/test.py' ) # 将remove_path 下载到本地 local_path sftp.get( 'remove_path' , 'local_path' ) transport.close() |
下面是多线程执行版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
#!/usr/bin/python #coding:utf-8 import threading import subprocess import os import sys sshport = 13131 log_path = 'update_log' output = {} def execute(s, ip, cmd, log_path_today): with s: cmd = '''ssh -p%s root@%s -n "%s" ''' % (sshport, ip, cmd) ret = subprocess.Popen(cmd, shell = True , stdout = subprocess.PIPE, stderr = subprocess.STDOUT) output[ip] = ret.stdout.readlines() if __name__ = = "__main__" : if len (sys.argv) ! = 3 : print "Usage: %s config.ini cmd" % sys.argv[ 0 ] sys.exit( 1 ) if not os.path.isfile(sys.argv[ 1 ]): print "Usage: %s is not file!" % sys.argv[ 1 ] sys.exit( 1 ) cmd = sys.argv[ 2 ] f = open (sys.argv[ 1 ], 'r' ) list = f.readlines() f.close() today = datetime.date.today() log_path_today = '%s/%s' % (log_path,today) if not os.path.isdir(log_path_today): os.makedirs(log_path_today) threading_num = 100 if threading_num > len ( list ): threading_num = len ( list ) s = threading.Semaphore(threading_num) for line in list : ip = line.strip() t = threading.Thread(target = execute,args = (s, ip,cmd,log_path_today)) t.setDaemon( True ) t.start() main_thread = threading.currentThread() for t in threading. enumerate (): if t is main_thread: continue t.join() for ip,result in output.items(): print "%s: " % ip for line in result: print " %s" % line.strip() print "Done!" |
以上脚本读取两个参数,第一个为存放IP的文本,第二个为shell命令
执行效果如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# -*- coding:utf-8 -*- import requests from requests.exceptions import RequestException import os, time import re from lxml import etree import threading lock = threading.Lock() def get_html(url): response = requests.get(url, timeout = 10 ) # print(response.status_code) try : if response.status_code = = 200 : # print(response.text) return response.text else : return None except RequestException: print ( "请求失败" ) # return None def parse_html(html_text): html = etree.HTML(html_text) if len (html) > 0 : img_src = html.xpath( "//img[@class='photothumb lazy']/@data-original" ) # 元素提取方法 # print(img_src) return img_src else : print ( "解析页面元素失败" ) def get_image_pages(url): html_text = get_html(url) # 获取搜索url响应内容 # print(html_text) if html_text is not None : html = etree.HTML(html_text) # 生成XPath解析对象 last_page = html.xpath( "//div[@class='pages']//a[last()]/@href" ) # 提取最后一页所在href链接 print (last_page) if last_page: max_page = re. compile (r '(\d+)' , re.S).search(last_page[ 0 ]).group() # 使用正则表达式提取链接中的页码数字 print (max_page) print ( type (max_page)) return int (max_page) # 将字符串页码转为整数并返回 else : print ( "暂无数据" ) return None else : print ( "查询结果失败" ) def get_all_image_url(page_number): base_url = 'https://imgbin.com/free-png/naruto/' image_urls = [] x = 1 # 定义一个标识,用于给每个图片url编号,从1递增 for i in range ( 1 , page_number): url = base_url + str (i) # 根据页码遍历请求url try : html = get_html(url) # 解析每个页面的内容 if html: data = parse_html(html) # 提取页面中的图片url # print(data) # time.sleep(3) if data: for j in data: image_urls.append({ 'name' : x, 'value' : j }) x + = 1 # 每提取一个图片url,标识x增加1 except RequestException as f: print ( "遇到错误:" , f) continue # print(image_urls) return image_urls def get_image_content(url): try : r = requests.get(url, timeout = 15 ) if r.status_code = = 200 : return r.content return None except RequestException: return None def main(url, image_name): semaphore.acquire() # 加锁,限制线程数 print ( '当前子线程: {}' . format (threading.current_thread().name)) save_path = os.path.dirname(os.path.abspath( '.' )) + '/pics/' try : file_path = '{0}/{1}.jpg' . format (save_path, image_name) if not os.path.exists(file_path): # 判断是否存在文件,不存在则爬取 with open (file_path, 'wb' ) as f: f.write(get_image_content(url)) f.close() print ( '第{}个文件保存成功' . format (image_name)) else : print ( "第{}个文件已存在" . format (image_name)) semaphore.release() # 解锁imgbin-多线程-重写run方法.py except FileNotFoundError as f: print ( "第{}个文件下载时遇到错误,url为:{}:" . format (image_name, url)) print ( "报错:" , f) raise except TypeError as e: print ( "第{}个文件下载时遇到错误,url为:{}:" . format (image_name, url)) print ( "报错:" , e) class MyThread(threading.Thread): """继承Thread类重写run方法创建新进程""" def __init__( self , func, args): """ :param func: run方法中要调用的函数名 :param args: func函数所需的参数 """ threading.Thread.__init__( self ) self .func = func self .args = args def run( self ): print ( '当前子线程: {}' . format (threading.current_thread().name)) self .func( self .args[ 0 ], self .args[ 1 ]) # 调用func函数 # 因为这里的func函数其实是上述的main()函数,它需要2个参数;args传入的是个参数元组,拆解开来传入 if __name__ = = '__main__' : start = time.time() print ( '这是主线程:{}' . format (threading.current_thread().name)) urls = get_all_image_url( 5 ) # 获取所有图片url列表 thread_list = [] # 定义一个列表,向里面追加线程 semaphore = threading.BoundedSemaphore( 5 ) # 或使用Semaphore方法 for t in urls: # print(i) m = MyThread(main, (t[ "value" ], t[ "name" ])) # 调用MyThread类,得到一个实例 thread_list.append(m) for m in thread_list: m.start() # 调用start()方法,开始执行 for m in thread_list: m.join() # 子线程调用join()方法,使主线程等待子线程运行完毕之后才退出 end = time.time() print (end - start) # get_image_pages(<a href="https://imgbin.com/free-png/Naruto" rel="external nofollow">https://imgbin.com/free-png/Naruto</a>) |
以上就是python ssh 执行shell命令的示例的详细内容,更多关于python ssh 执行shell命令的资料请关注服务器之家其它相关文章!
原文链接:https://www.cnblogs.com/chengxuyonghu/archive/2004/01/13/13638870.html