Request for comments - concurrent ssh client

M

mk

Hello everyone,

Since I'm not happy with shmux or pssh, I wrote my own "concurrent ssh"
program for parallel execution of SSH commands on multiple hosts. Before
I release program to the wild, I would like to hear (constructive)
comments on what may be wrong with the program and/or how to fix it.
(note: the program requires paramiko ssh client module)

#!/usr/local/bin/python -W ignore::DeprecationWarning

import time
import sys
import os
import operator
import paramiko
import threading
import subprocess
import optparse

usage = "Usage: cssh [options] IP1 hostname2 IP3 hostname4
....\n\n(IPs/hostnames on the commandline are actually optional, they can
be specified in the file, see below.)"
op = optparse.OptionParser(usage=usage)

op.add_option('-c','--cmd',dest='cmd',help="""Command to run. Mutually
exclusive with -s.""")
op.add_option('-s','--script',dest='script',help="""Script file to run.
Mutually exclusive with -c. Script can have its own arguments, specify
them in doublequotes, like "script -arg arg".""")
op.add_option('-i','--script-dir',dest='scriptdir',help="""The directory
where script will be copied and executed. Defaults to /tmp.""")
op.add_option('-l','--cleanup',dest='cleanup',action='store_true',help="""Delete
the script on remote hosts after executing it.""")
op.add_option('-f','--file',dest='file',help="""File with hosts to use,
one host per line. Concatenated with list of hosts/IP addresses
specified at the end of the commandline. Optionally, in a line of the
file you can specify sequence: "Address/Hostname Username Password
SSH_Port" separated by spaces (additional parameters can be specified on
a subset of lines; where not specified, relevant parameters take default
values).""")
op.add_option('-d','--dir',dest='dir',help='Directory for storing
standard output and standard error of command. If specified, directory
will be created, with subdirs named IPs/hostnames and relevant files
stored in those subdirs.')
op.add_option('-u','--username',dest='username',help="""Username to
specify for SSH. Defaults to 'root'.""")
op.add_option('-p','--password',dest='password',help="""Password.
Password is used first; if connection fails using password, cssh uses
SSH key (default or specified).""")
op.add_option('-o','--port',dest='port',help="""Default SSH port.""")
op.add_option('-k','--key',dest='key',help="""SSH Key file. Defaults to
'/root/.ssh/id_dsa'.""")
op.add_option('-n','--nokey',dest='nokey',action="store_true",
help="""Turns off using SSH key.""")
op.add_option('-t','--timeout',dest='timeout',help="""SSH connection
timeout. Defaults to 20 seconds.""")
op.add_option('-m','--monochromatic',dest='mono',action='store_true',help="""Do
not use colors while printing output.""")
op.add_option('-r','--maxthreads',dest='maxthreads',help="""Maximum
number of threads working concurrently. Default is 100. Exceeding 200 is
generally not recommended due to potential exhaustion of address space
(each thread can use 10 MB of address space and 32-bit systems have a
maximum of 4GB of address space).""")
op.add_option('-q','--quiet',dest='quiet',action='store_true',help="""Quiet.
Do not print out summaries like IPs for which communication succeeded or
failed, etc.""")

# add resource file?

(opts, args) = op.parse_args()

failit = False

if opts.cmd == None and opts.script == None:
print "You have to specify one of the following: command to run,
using -c command or --cmd command, or script to run, using -s scriptfile
or --script scriptfile."
print
failit = True

if opts.cmd != None and opts.script != None:
print "Options command (-c) and script (-s) are mutually exclusive.
Specify either one."
print
failit = True

if opts.cmd == None and opts.script != None:
try:
scriptpath = opts.script.split()[0]
scriptfo = open(scriptpath,'r')
scriptfo.close()
except IOError:
print "Could not open script file %s." % opts.script
print
failit = True

if opts.file == None and args == []:
print "You have to specify at least one of the following:"
print " - list of IPs/hostnames at the end of the command line
(after all options)"
print " - list of IPs/hostnames stored in file specified after -f
or --file option (like: -f hostnames.txt)"
print " You can also specify both sources. In that case IP/hostname
lists will be concatenated."
print
failit = True

if opts.password == None and opts.nokey:
print "Since using key has been turned off using -n option, you
have to specify password using -p password or --password password."
print
failit = True

if opts.key is not None and opts.nokey:
print "Options -n and -k keyfile are mutually exclusive. Specify
either one."
print
failit = True

if failit:
sys.exit(0)

if opts.scriptdir == None:
opts.scriptdir = '/tmp'

if opts.cleanup == None:
opts.cleanup = False

if opts.key == None:
opts.key = '/root/.ssh/id_dsa'

if opts.port == None:
opts.port = 22

if opts.nokey:
opts.key = None

if opts.timeout == None:
opts.timeout = 20

if opts.mono == None:
opts.mono = False

if opts.maxthreads == None:
opts.maxthreads = 100

if opts.quiet == None:
opts.quiet = False


HEADER = '\033[95m'
BLACK = '\033[30m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
OKBLUE = '\033[94m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
ENDC = '\033[0m'

if opts.mono:
HEADER = BLACK = RED = GREEN = YELLOW = OKBLUE = BLUE = MAGENTA =
CYAN = WHITE = ENDC = ''

hosts = args[:]
fhosts = []
fname = opts.file

if fname is not None:
try:
fhosts = open(fname).readlines()
except IOError, e:
print "Error:", str(e)
sys.exit(1)

hosts.extend(fhosts)
hosts = [ s.strip() for s in hosts if s != '' and s != None and s != '\n' ]
hosts = [ s.split() for s in hosts ]

if hosts == []:
print "Error: list of hosts is empty. Quitting"
sys.exit(1)


class SSHThread(threading.Thread):
def __init__(self, lock, cmd, ip, username, sshprivkey=None,
passw=None, port=22, script=None, scriptdir=None):

threading.Thread.__init__(self)

self.lock = lock
self.cmd = cmd
self.ip = ip
self.username = username
self.sshprivkey = sshprivkey
self.passw = passw
self.port = port
self.conobj = None
self.confailed = True
if script != None:
scriptcomp = script.strip().split()
self.scriptpath = scriptcomp[0]
self.scriptname = self.scriptpath.split('/')[-1]
self.scriptargs = script[len(scriptpath):]
self.scriptdir = scriptdir.strip().rstrip('/')
self.rspath = self.scriptdir + '/' + self.scriptname
self.finished = False

def ping(self, lock, ip):
subp = subprocess.Popen(['/bin/ping', '-c', '1', ip],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
so, se = subp.communicate()
return (so, se)

def ssh_connect(self):
self.conobj = paramiko.SSHClient()
aap = paramiko.AutoAddPolicy()
self.conobj.set_missing_host_key_policy(aap)
loginsuccess = False
if self.passw is not None:
try:
self.conobj.connect(self.ip, username=self.username,
password=self.passw, port=self.port, timeout=opts.timeout,
allow_agent=False, look_for_keys = False)
loginsuccess = True
except:
pass
if not loginsuccess and self.sshprivkey is not None:
try:
self.conobj.connect(self.ip, username=self.username,
key_filename=self.sshprivkey, port=self.port, timeout=opts.timeout)
loginsuccess = True
except:
pass
if not loginsuccess:
self.conobj = None
self.finished = True

def execcmds(self):
so = se = ''
try:
si, so, se = self.conobj.exec_command(self.cmd)
sol = so.readlines()
sel = se.readlines()
so = ''.join([ s.replace('\r\n','\n') for s in sol ])
se = ''.join([ s.replace('\r\n','\n') for s in sel ])
except:
pass
return (so, se)

def sendscript(self):
fo = open(self.scriptpath,'rb')
cnt = ''.join(fo.readlines())
transport = self.conobj.get_transport()
channel = transport.open_session()
destpath = self.scriptdir + '/' + self.scriptname
try:
channel.exec_command('scp -t -v %s\n' % destpath)
except paramiko.SSHException, e:
channel.close()
return str(e)
fl = 'C0755 %d 1\n' % os.path.getsize(self.scriptpath)
channel.send(fl)
while not channel.recv_ready():
time.sleep(0.1)
try:
channel.send(cnt)
except socket.error, e:
channel.close()
return str(e)
channel.close()
return ''

def setcmdtoscript(self):
self.cmd = self.scriptdir + '/' + self.scriptname + self.scriptargs

def execcmdonscript(self, cmd):
si, so, se = self.conobj.exec_command(cmd)
sol = so.readlines()
sel = se.readlines()
if sol != [] or sel != []:
self.lock.acquire()
print RED + "Host %s, Error while executing %s on script:"
% (self.ip, cmd), "".join(sel), "".join(sol) + ENDC
self.lock.release()

def chmodscript(self):
# just in case, as sometimes and on some operating systems the
execution flags on the script are not always set
self.execcmdonscript('chmod 0755 %s' % self.rspath)

def delscript(self):
self.execcmdonscript('rm -f %s' % self.rspath)

def run(self):
self.ssh_connect()
so, se = ('', '')
res = ''
if self.conobj != None:
if self.cmd == None:
res = self.sendscript()
self.setcmdtoscript()
self.chmodscript()
if res == '':
so, se = self.execcmds()
if opts.cleanup:
time.sleep(0.5)
self.delscript()
self.lock.acquire()
print OKBLUE + "%-20s" % self.ip + ENDC, ":",
if self.conobj == None:
print RED + "SSH connection failed" + ENDC
self.confailed = True
elif res != '':
print RED + "Sending script failed: %s" % res + ENDC
self.confailed = True
else:
self.confailed = False
print OKBLUE + "SSH connection successful" + ENDC
print so
if se != '':
print MAGENTA + "command standard error output:" + ENDC
print se
if opts.dir != None:
if not os.path.isdir(opts.dir):
os.mkdir(opts.dir)
path = opts.dir + os.sep + self.ip
if not os.path.isdir(path):
os.mkdir(path)
of = open(path + os.sep + 'stdout','w')
of.write(so)
of.close()
of = open(path + os.sep + 'stderr','w')
of.write(se)
of.close()
self.lock.release()
self.finished = True

def sshclose(self):
if self.conobj != None:
self.conobj.close()


lock = threading.Lock()

queue = []
thfinished = []

def getparams(h):
ip = h[0]
try:
username = h[1]
except IndexError:
username = opts.username

try:
passw = h[2]
except IndexError:
passw = opts.password

port = None
try:
port = int(h[3])
except IndexError:
port = 22
except ValueError, e:
print RED + "%-20s" % ip, ": error converting port:", str(e) + ENDC
return (ip, username, passw, port)

while len(hosts) > 0:
if len(queue) <= opts.maxthreads:
h = hosts.pop()
(ip, username, passw, port) = getparams(h)
if port != None:
th = SSHThread(lock, opts.cmd, ip, username=username,
sshprivkey=opts.key, passw=passw, port=port, script=opts.script,
scriptdir=opts.scriptdir)
queue.append((ip, th))
th.daemon = True
th.start()
else:
thfinished.append((ip,None))
else:
time.sleep(1)
for ip, th in queue:
if th.finished:
th.sshclose()
th.join()
thfinished.append((ip,th))
queue.remove((ip,th))


while len(queue) > 0:
for ip, th in queue:
if th.finished:
th.sshclose()
th.join()
thfinished.append((ip,th))
queue.remove((ip,th))
time.sleep(1)

if not opts.quiet:
print
print OKBLUE + 'Communication SUCCEEDED for following IP addresses
(SSH could open connection):' + ENDC

for ip, th in thfinished:
if th != None and not th.confailed:
print ip

print
print OKBLUE + 'Communication FAILED for following IP addresses
(SSH could not open connection / error in parameters):' + ENDC

for ip, th in thfinished:
if th == None or th.confailed:
print ip
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,009
Latest member
GidgetGamb

Latest Threads

Top