Generic Queued Parallel Process Runner

#!/usr/bin/env python

# pyGenericParallelQueuer.py by CryptWizard 
# A generic program that launches X number of a certain process at a time, passing different data to the child's stdin
# Example: pyGenericParallelQueuer.py 20 "sslftp user:pass@host:21" < cmds.txt
#
# Inspired by #darkhold@Rizon
# [20/03 11:13:55] <@TheFluff> the box I'm doing this on is in japan and has a funky connection that maxes out at like 500kb/s per connection to the US or europe

import sys, os
from Queue import Queue
from subprocess import Popen, PIPE
from threading import Thread

stdinQueue = Queue(9001)
cmdLine = ''
workingDir = ''

def actualWork(stdin, cmd):
	try:
		#os.chdir(workingDir)
		p = Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE)
		stdoutdata, stderrdata = p.communicate(stdin)
		print stdoutdata
		#print stderrdata
	except:
		print sys.exc_info()

class RunnerThread(Thread):
	def __init__(self, cmd):
		Thread.__init__(self)
		self.cmd = cmd
		self.daemon = True
	def run(self):
		while True:
			i = stdinQueue.get()
			actualWork(i, self.cmd)
			stdinQueue.task_done()

def main():
	if len(sys.argv) < 2:
		print 'Usage: %s  ""' % sys.argv[0]
		print 'stdin: new line seperated list of stdin values to pass to each child'
		print 'Stops reading at EOF (either redirect from file, or Ctrl + Z on Windows/Ctrl + D on *NIX'

	print 'Simultaneous Processes: ' + sys.argv[1]
	cmdLine = sys.argv[2]
	print 'Command Line: ' + cmdLine
	workingDir = os.getcwd()
	print 'Working Directory: ' + workingDir

	for i in sys.stdin.readlines():
		if i == '':
			break
		stdinQueue.put(i)
	print 'Jobs Queued: %d' % stdinQueue.qsize()

	tpsize = int(sys.argv[1])
	if stdinQueue.qsize() < tpsize:
		tpsize = stdinQueue.qsize()
	print 'Spawning %d threads...' % tpsize

	for i in xrange(tpsize):
		t = RunnerThread(cmdLine)
		t.daemon = True
		t.start()

	print 'Waiting for threads to complete...'
	stdinQueue.join()
	print 'Done!'

if __name__ == '__main__':
	sys.exit(main())

Leave a Reply