Job queue using xmlrpc and threading

Discussion in 'Python' started by psaffrey@googlemail.com, Sep 22, 2008.

  1. Guest

    I'm trying to implement an application that will listen for requests,
    run them when they are present but also be able to add new requests
    even while it's running. I've tried to do this using the thread and
    xmlrpc modules - the idea is that an XML-RPC exposed object tell the
    queue thread object to add a job. If there are no jobs running, it
    creates a file, adds the new job to the end and then another
    consumption thread starts working through the jobs in the file. New
    jobs coming in are just added to the end of the file by the queue
    thread.

    Unfortunately, I can't get it to work. The problem is that the
    consumption thread seems to read the job queue before it gets written,
    even though I've used a lock. I've also had the application get to the
    stage where it ignores ctrl-c, which is a little worrying - I fear it
    doesn't bode well for future stability... I don't have a lot of
    experience with multi-threaded applications, so I may well have chosen
    a poor approach.

    I've posted the code below. It's in three parts, the job queue, the
    manager that listens for new requests and an application to add jobs
    to the queue. Sorry for the long listings...

    Any guidance gratefully received,

    Peter

    ===
    testqueue.py:

    import thread
    import time
    import shutil
    import os

    class JobQueue:

    def __init__(self, filename):
    self.queuefile = filename
    self.jobthread = 0
    # lock for the jobfile queue file
    self.jfqlock = thread.allocate_lock()

    def addJob(self, jobfileuri, email):
    self.jfqlock.acquire()
    if not self.jobthread:
    print "starting jobfile consumption thread"
    if os.access(self.queuefile, os.R_OK):
    print "cleaning stale jobfile queue file"
    try:
    os.remove(self.queuefile)
    except:
    print "problem removing jobfile queue file"
    raise
    self.jobthread = thread.start_new_thread(self.main, ())
    else:
    print "using existing jobfile consumption thread in file",
    self.queuefile
    fh = open(self.queuefile, "a")
    # choose "::::" as a delimiter
    print >> fh, jobfileuri + "::::" + email
    self.jfqlock.release()
    return 1

    def main(self):
    while 1:
    self.jfqlock.acquire()
    rfh = open(self.queuefile, "r")
    # breakpoint()
    finput = rfh.readline()
    print "found:", finput
    if not finput:
    print "finished jobfiles. Closing thread"
    rfh.close()
    self.jobthread = 0
    self.jfqlock.release()
    return
    else:
    print "found jobfile in queue: attempting to run"
    # most of this is to shift up the lines in the file
    tmpname = self.queuefile + ".tmp"
    wfh = open(tmpname, "w")
    line = rfh.readline()
    while line:
    wfh.write(line)
    line = rfh.readline()
    wfh.close()
    rfh.close()
    shutil.move(tmpname, self.queuefile)
    self.jfqlock.release()
    # lop off the trailing line break
    print
    print "***run Starting***"
    try:
    self.runJob(finput[:-1])
    print "***run finished***"
    except:
    print "***run failed***"
    print

    def runJob(self, job):
    time.sleep(2.0)
    print "running job", job
    if not report:
    print "some problem with run. Cannot mail out report"
    return


    ===
    queuemanager.py

    from testqueue import JobQueue
    from SimpleXMLRPCServer import *


    class QM:
    def __init__(self, filename):
    self.jq = JobQueue("queue.txt")

    def addJob(self, jobname):
    self.jq.addJob(jobname, "tester@testdomain")

    if __name__=="__main__":
    qm = QM("jobqueue.txt")
    rpcserver = SimpleXMLRPCServer(("localhost", 8000))
    rpcserver.register_instance(qm)
    rpcserver.serve_forever()

    ===
    addjob.py:

    import xmlrpclib
    import sys

    server = "localhost"
    port = 8000

    serveradd = "http://%s:%s" % (server, port)
    manager = xmlrpclib.ServerProxy(serveradd)

    jobname = sys.argv[1]

    manager.addJob(jobname)
    , Sep 22, 2008
    #1
    1. Advertising

  2. Jeff Guest

    Jeff, Sep 22, 2008
    #2
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Marco Aschwanden
    Replies:
    2
    Views:
    793
    Shalabh Chaturvedi
    Dec 29, 2003
  2. Etienne Posthumus
    Replies:
    1
    Views:
    1,099
    Roger Binns
    Apr 1, 2004
  3. David Hirschfield

    Question about xmlrpc and threading

    David Hirschfield, Feb 15, 2006, in forum: Python
    Replies:
    3
    Views:
    367
    Martin P. Hellwig
    Feb 16, 2006
  4. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Replies:
    4
    Views:
    672
    Russell Warren
    Jun 27, 2006
  5. Kris
    Replies:
    0
    Views:
    467
Loading...

Share This Page