Job queue using xmlrpc and threading

P

psaffrey

I'm trying to implement an application that will listen for requests,
run them when they are present but also be able to add new requests
even while it's running. I've tried to do this using the thread and
xmlrpc modules - the idea is that an XML-RPC exposed object tell the
queue thread object to add a job. If there are no jobs running, it
creates a file, adds the new job to the end and then another
consumption thread starts working through the jobs in the file. New
jobs coming in are just added to the end of the file by the queue
thread.

Unfortunately, I can't get it to work. The problem is that the
consumption thread seems to read the job queue before it gets written,
even though I've used a lock. I've also had the application get to the
stage where it ignores ctrl-c, which is a little worrying - I fear it
doesn't bode well for future stability... I don't have a lot of
experience with multi-threaded applications, so I may well have chosen
a poor approach.

I've posted the code below. It's in three parts, the job queue, the
manager that listens for new requests and an application to add jobs
to the queue. Sorry for the long listings...

Any guidance gratefully received,

Peter

===
testqueue.py:

import thread
import time
import shutil
import os

class JobQueue:

def __init__(self, filename):
self.queuefile = filename
self.jobthread = 0
# lock for the jobfile queue file
self.jfqlock = thread.allocate_lock()

def addJob(self, jobfileuri, email):
self.jfqlock.acquire()
if not self.jobthread:
print "starting jobfile consumption thread"
if os.access(self.queuefile, os.R_OK):
print "cleaning stale jobfile queue file"
try:
os.remove(self.queuefile)
except:
print "problem removing jobfile queue file"
raise
self.jobthread = thread.start_new_thread(self.main, ())
else:
print "using existing jobfile consumption thread in file",
self.queuefile
fh = open(self.queuefile, "a")
# choose "::::" as a delimiter
print >> fh, jobfileuri + "::::" + email
self.jfqlock.release()
return 1

def main(self):
while 1:
self.jfqlock.acquire()
rfh = open(self.queuefile, "r")
# breakpoint()
finput = rfh.readline()
print "found:", finput
if not finput:
print "finished jobfiles. Closing thread"
rfh.close()
self.jobthread = 0
self.jfqlock.release()
return
else:
print "found jobfile in queue: attempting to run"
# most of this is to shift up the lines in the file
tmpname = self.queuefile + ".tmp"
wfh = open(tmpname, "w")
line = rfh.readline()
while line:
wfh.write(line)
line = rfh.readline()
wfh.close()
rfh.close()
shutil.move(tmpname, self.queuefile)
self.jfqlock.release()
# lop off the trailing line break
print
print "***run Starting***"
try:
self.runJob(finput[:-1])
print "***run finished***"
except:
print "***run failed***"
print

def runJob(self, job):
time.sleep(2.0)
print "running job", job
if not report:
print "some problem with run. Cannot mail out report"
return


===
queuemanager.py

from testqueue import JobQueue
from SimpleXMLRPCServer import *


class QM:
def __init__(self, filename):
self.jq = JobQueue("queue.txt")

def addJob(self, jobname):
self.jq.addJob(jobname, "tester@testdomain")

if __name__=="__main__":
qm = QM("jobqueue.txt")
rpcserver = SimpleXMLRPCServer(("localhost", 8000))
rpcserver.register_instance(qm)
rpcserver.serve_forever()

===
addjob.py:

import xmlrpclib
import sys

server = "localhost"
port = 8000

serveradd = "http://%s:%s" % (server, port)
manager = xmlrpclib.ServerProxy(serveradd)

jobname = sys.argv[1]

manager.addJob(jobname)
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,763
Messages
2,569,563
Members
45,039
Latest member
CasimiraVa

Latest Threads

Top