ArtsAutosBooksBusinessEducationEntertainmentFamilyFashionFoodGamesGenderHealthHolidaysHomeHubPagesPersonal FinancePetsPoliticsReligionSportsTechnologyTravel
  • »
  • Technology»
  • Computers & Software»
  • Computer Science & Programming

Batch Processing with Python

Updated on July 16, 2012

Introduction

So, your work is technically demanding and you've got a brand new, multi-core machine. Are you actually using all those cores to get your work done faster? If the answer is no (and assuming your jobs can be ran in parallel chunks), then you might consider trying the code below. While absolutely not the only way to run multiple jobs (MPI, for example, is excellent for running jobs across not only multiple cores, but multiple machines), this little script has come in handy more times than I can remember. Written in Python, it accepts a list of commands (strings) and batches them out the operating system for execution N-jobs at a time, where N is the number of CPUs (threads) your machine has available. As written, you can import this into a bigger program and handle reduction (compiling all the results) on an ad hoc basis. Alternatively, you can wrap the module in a script for command-line execution (which is what I generally do).

batchjobs.py

import threading, os, sys
from multiprocessing import cpu_count

NUM_CPUS = cpu_count()

def batch_process(command_list, batch_size=NUM_CPUS):
    iteratorlock = threading.Lock()
    exceptions = []
    cmd = command_list.__iter__()
    
    def runall():
        while True:
            iteratorlock.acquire()

            try:
                try:
                    if exceptions: return
                    next_job = cmd.next()
                finally: iteratorlock.release()
            except StopIteration: return
            
            try: os.system(next_job)
            except Exception(next_job):
                failure = sys.exc_info()
                iteratorlock.acquire()
                try: exceptions.append(failure)
                finally: iteratorlock.release()
                    
    jobs = [threading.Thread(target=runall) for j in xrange(batch_size)]
    for job in jobs: job.start()
    for job in jobs: job.join()
    
    if exceptions:
        alfa, beta, dlta = exceptions[0]
        raise alfa, beta, dlta

Usage Example

As I mentioned, I wrap batchjobs.py in a script for command-line usage. This is great if, say, you're running simulations with a bunch of different parameter sets and the results are printed to the screen or uploaded to a database. In the case of screen output, you can simply redirect to a file.

Shown below is my script; note I pass the number of cores to use. This allows one to keep a core un-pegged for monitoring, email, etc., which is especially nice when you're running a multi-day simulation on your personal machine (as was the case with my MS thesis).

run_batch script

#!/usr/bin/python

import sys
sys.path.append('/home/tbone/Projects/pylib')
import batchjobs

if __name__ == '__main__':

    f = open(sys.argv[1])
    jobs = [j.strip() for j in f.readlines()]
    f.close()

    cores = int(sys.argv[2])

    batchjobs.batch_process(jobs, cores)

Conclusion

This is not the only way to skin quick parallel processing, but it works for me. For more sophistication, consider looking into MPI (Message Passing Interface).

Comments

    0 of 8192 characters used
    Post Comment

    No comments yet.