Suggestions for a distributed job queue

T

Tony Arcieri

[Note: parts of this message were removed to make it a legal post.]

I'm looking at replacing our homebrew job queue with something better, and
I'm wondering what's out there. If there's something that meets our needs
I'd like to use it; otherwise I may end up building our next gen job queue
by assembling various components that exist already. I've seen a couple
dozen solutions for this in the Ruby world, but haven't scrutinized many of
them in depth and am unsure if there are any that fully meet our needs.

We have many different kinds of jobs, however the main ones I'm worried
about are rather CPU intensive, take a long time to execute, and operate on
large amounts of input and output data.

- Distributed: we certainly have too many jobs to run on a single computer.
We need to distribute them across a number of workers. Beyond that we'd
like to automatically provision more workers when the backlog of jobs gets
too large, and shut down workers if too many of them are idle.

- Fault Tolerant: as with any distributed system, fault tolerance is
important. An evil monkey should be able to futz with any part of the
system, crashing computers willy nilly, and it should continue to Just
Work. If a job breaks down and is somehow lost anywhere along the way, the
system should eventually detect this and retry the job. From a design
perspective, I would like to see as much of the system as stateless as
possible. The agents executing the jobs and message queues should be
stateless. Ideally the entirety of the state of the system is kept in the
database, and that's the only part of the system we need to ensure
recovering state from after a crash. If a worker or message queue goes down
we shouldn't need to worry about recovering jobs-in-flight, they should
simply be retried.

- Idempotent: going along with a stateless approach to fault tolerance, if
the system does misdetect a failed job and ends up executing it twice, the
system should detect this and discard the redundant results. Having the
same job accidentally complete multiple times should not screw up the state
of the system.

- Support for Temporary Failures: sometimes a job fails in such a way that
it should be retried (i.e. external resources needed to complete a job are
temporarily unavailable). The system should support retrying these jobs in
a sensible way, and it'd be nice to specify on a job-by-job basis how many
attempts should be made before the system should give up and consider it a
permanent failure.

- Support for Live Upgrades: we'd like to be able to add new types of jobs
to the system . So along with this, agents should know what types of jobs
they support, and not request unsupported jobs from the message queue.

I suppose some of my comments are presupposing the architecture: a list of
jobs stored in the database, command and control processes which load jobs
into and read results from the message queues, and agents which pull jobs
from and report jobs to the message queues, and the message queues
themselves. I'd be open to other designs, so long as they have the above
properties.

Given that, is there an existing job queue that meets my needs that I should
be checking out?
 
W

Walton Hoops

-----Original Message-----
From: (e-mail address removed) [mailto:[email protected]] On Behalf Of Tony
Arcieri

I'm looking at replacing our homebrew job queue with something better,
and
I'm wondering what's out there. If there's something that meets our
needs
I'd like to use it; otherwise I may end up building our next gen job
queue
by assembling various components that exist already. I've seen a
couple
dozen solutions for this in the Ruby world, but haven't scrutinized
many of
them in depth and am unsure if there are any that fully meet our needs.

We have many different kinds of jobs, however the main ones I'm worried
about are rather CPU intensive, take a long time to execute, and
operate on
large amounts of input and output data.

- Distributed: we certainly have too many jobs to run on a single
computer.
We need to distribute them across a number of workers. Beyond that
we'd
like to automatically provision more workers when the backlog of jobs
gets
too large, and shut down workers if too many of them are idle.

- Fault Tolerant: as with any distributed system, fault tolerance is
important. An evil monkey should be able to futz with any part of the
system, crashing computers willy nilly, and it should continue to Just
Work. If a job breaks down and is somehow lost anywhere along the way,
the
system should eventually detect this and retry the job. From a design
perspective, I would like to see as much of the system as stateless as
possible. The agents executing the jobs and message queues should be
stateless. Ideally the entirety of the state of the system is kept in
the
database, and that's the only part of the system we need to ensure
recovering state from after a crash. If a worker or message queue goes
down
we shouldn't need to worry about recovering jobs-in-flight, they should
simply be retried.

- Idempotent: going along with a stateless approach to fault tolerance,
if
the system does misdetect a failed job and ends up executing it twice,
the
system should detect this and discard the redundant results. Having
the
same job accidentally complete multiple times should not screw up the
state
of the system.

- Support for Temporary Failures: sometimes a job fails in such a way
that
it should be retried (i.e. external resources needed to complete a job
are
temporarily unavailable). The system should support retrying these
jobs in
a sensible way, and it'd be nice to specify on a job-by-job basis how
many
attempts should be made before the system should give up and consider
it a
permanent failure.

- Support for Live Upgrades: we'd like to be able to add new types of
jobs
to the system . So along with this, agents should know what types of
jobs
they support, and not request unsupported jobs from the message queue.

I suppose some of my comments are presupposing the architecture: a list
of
jobs stored in the database, command and control processes which load
jobs
into and read results from the message queues, and agents which pull
jobs
from and report jobs to the message queues, and the message queues
themselves. I'd be open to other designs, so long as they have the
above
properties.

Given that, is there an existing job queue that meets my needs that I
should
be checking out?

I don't know off the top of my head if it does _all_ of that, but
I'd look at Ruby Queue as a distributed work queue.
http://raa.ruby-lang.org/project/rq/
 
T

Tony Arcieri

[Note: parts of this message were removed to make it a legal post.]

I don't know off the top of my head if it does _all_ of that, but
I'd look at Ruby Queue as a distributed work queue.
http://raa.ruby-lang.org/project/rq/

I've looked at RubyQueue in the past and it is rather interesting, however
it as a number of issues which would prevent us from using it.

For starters, it uses NFS as the distribution protocol, and using NFS isn't
really practical in our environment.
 
D

David Masover

For starters, it uses NFS as the distribution protocol, and using NFS isn't
really practical in our environment.

Not that I doubt it, but I'm curious what the limitation is. Is it
scalability?
 
K

Kaspar Schiess

Hi,

I am using an AMQP compliant queue for this. With its permanent queuing
and routing mechanisms it can be made to meet many if not all of your
requirements, I believe.

As job items I am using thrift RPC method calls, which is very
convenient on both sides (server, client). The library that allows you
to do this is here: http://github.com/kschiess/thrift_amqp_transport
(currently being redesigned).

There are many solutions in this space. Most recently, people have been
using resque (github I believe) and AMQP-Queues. Other solutions can be
found using the database or the filesystem, as you already know.

my 2 cents
kaspar
 
T

Tony Arcieri

[Note: parts of this message were removed to make it a legal post.]

Not that I doubt it, but I'm curious what the limitation is. Is it
scalability?

More like security. This is running partially in our datacenter and
partially on EC2. While I'm sure it "can be done", it really doesn't seem
like the ideal solution.
 
D

David Masover

More like security. This is running partially in our datacenter and
partially on EC2. While I'm sure it "can be done", it really doesn't seem
like the ideal solution.

Worth mentioning: EC2's internal IPs can be pretty much completely firewalled
off, and VPNs are easy enough to set up. Probably easily doable.

But you're right, probably not the ideal solution.
 
B

Brian Candler

A couple of other options:

(1) AMQP, e.g. rabbitmq. I believe it comes with ruby bindings. Can be
made as fault-tolerant as you like :)

(2) Depending on your needs, you could consider rolling your own with
DRb. This means at least you know the system inside-out and can easily
customise it - although avoiding the queue server itself being a SPOF is
awkward.

Here are a couple of working proofs-of-concept.

In-RAM queue
============
---- server ----
require 'drb'
require 'thread'
q = Queue.new # or SizedQueue.new(1000)
DRb.start_service("druby://127.0.0.1:9911", q)
DRb.thread.join

---- client ----
require 'drb'
DRb.start_service
q = DRbObject.new(nil, "druby://localhost:9911")
q.push "abc"
puts q.pop

On-disk queue using Madeleine
=============================
---- server ----
require 'rubygems'
require 'madeleine'

class MadQueue
def initialize(madeleine)
@madeleine = madeleine
end

# Read operations don't need to go via command objects (if you don't
# care about synchronization)
def length
@madeleine.system.length
end

class Pusher
def initialize(data)
@data = data
end
def execute(system)
system.push(@data)
end
end

class Popper
def execute(system)
system.shift
end
end

def push(data)
@madeleine.execute_command(Pusher.new(data))
end

def pop
@madeleine.execute_command(Popper.new)
end
end

require 'drb'
madeleine = SnapshotMadeleine.new("madqueue.dir") { [] }

Thread.new(madeleine) {
puts "Taking snapshot every 30 seconds."
while true
sleep(30)
madeleine.take_snapshot
end
}

DRb.start_service("druby://127.0.0.1:9911", MadQueue.new(madeleine))
DRb.thread.join

---- client ----
Same as above
 
T

Tony Arcieri

[Note: parts of this message were removed to make it a legal post.]

Worth mentioning: EC2's internal IPs can be pretty much completely
firewalled
off, and VPNs are easy enough to set up. Probably easily doable.

Then what happens when the VPN goes down and the leaky abstraction that is
NFS's synchronous API grinds your message queue to a halt?
 
P

Phillip Gawlowski

Then what happens when the VPN goes down and the leaky abstraction that is
NFS's synchronous API grinds your message queue to a halt?

Same thing as when you have an EC2 outage: You save state, and resume
processing ASAP.
 
T

Tony Arcieri

[Note: parts of this message were removed to make it a legal post.]

Same thing as when you have an EC2 outage: You save state, and resume
processing ASAP.

Oof. Well for one, due to the nature of the synchronous filesystem API,
it's hard for processes in userspace to detect when things are amiss in the
underlying NFS layers.

Also, if you read my OP, saving state (aside from the state of "what jobs
have not been run yet") and recovering jobs in flight is something I want to
avoid. If the system fails I'd rather it simply fail and restore it to a
clean state. That way, you can have only one stateful part of the system,
and that's the only part you need to worry about recovering state from after
a failure.
 
P

Phillip Gawlowski

Oof. Well for one, due to the nature of the synchronous filesystem API,
it's hard for processes in userspace to detect when things are amiss in the
underlying NFS layers.

And implementing some sort of keep-alive/heartbeat system is too much
work, as well.
Also, if you read my OP, saving state (aside from the state of "what jobs
have not been run yet") and recovering jobs in flight is something I want to
avoid.
Well, I didn't *mean* that you implement a whole synchronization
framework (what it comes down to). Alas, I implied it.
If the system fails I'd rather it simply fail and restore it to a
clean state. That way, you can have only one stateful part of the system,
and that's the only part you need to worry about recovering state from after
a failure.

And this would probably be best done on "your" end of the network, too.
That way you could ignore the EC2 nodes for the time being, in case of
some form of network outage (assuming I understood you correctly, in
that only part of your nodes are in Amazon's cloud).

Not having had a look at RQueue's implemtation details, maybe you could,
without too much effort, port to the Devil From Redmond's SMB system,
via Samba.

Or roll your own, as has been suggested, with DRb. :)
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,766
Messages
2,569,569
Members
45,042
Latest member
icassiem

Latest Threads

Top