[ANN] ruby queue : rq-2.3.1

Discussion in 'Ruby' started by Ara.T.Howard, Dec 11, 2005.

  1. Ara.T.Howard

    Ara.T.Howard Guest




    - added 'stage' option to submit mode, which allows submission in a 'holding'
    state. thanks to for this fix!


    rq v2.3.1

    rq (queue | export RQ_Q=q) mode [mode_args]* [options]*

    ruby queue (rq) is a tool used to create instant linux clusters by managing
    sqlite databases as nfs mounted priority work queues. multiple instances of
    rq running from multiples hosts can work from these queues to distribute
    processing load to n nodes - bringing many dozens of otherwise powerful cpus
    to their knees with a single blow. clearly this software should be kept out
    of the hands of free radicals, seti enthusiasts, and mr. jeff safran.

    the central concept of rq is that n nodes work in isolation to pull jobs from
    an central nfs mounted priority work queue in a synchronized fashion. the
    nodes have absolutely no knowledge of each other and all communication if done
    via the queue meaning that, so long as the queue is available via nfs and a
    single node is running jobs from it, the system will continue to process jobs.
    there is no centralized process whatsoever - all nodes work to take jobs from
    the queue and run them as fast as possible. this creates a system which load
    balances automatically and is robust in face of node failures.

    the first argument to any rq command is the name of the queue. this name may
    be omitted if, and only if, the environment variable RQ_Q has been set to
    contain the absolute path of target queue.

    rq operates in one of the modes create, submit, resubmit, list, status,
    delete, update, query, execute, configure, snapshot, lock, backup, rotate,
    feed, or help. depending on the mode of operation and the options used the
    meaning of 'mode_args' may change.


    the following mode abbreviations exist

    c => create
    s => submit
    r => resubmit
    l => list
    ls => list
    t => status
    d => delete
    rm => delete
    u => update
    q => query
    e => execute
    C => configure
    S => snapshot
    L => lock
    b => backup
    R => rotate
    f => feed
    h => help

    not all modes have abbreviations

    create, c :

    create a queue. the queue must be located on an nfs mounted file system
    visible from all nodes intended to run jobs from it. nfs locking must be
    functional on this file system.

    examples :

    0) to create a queue
    ~ > rq /path/to/nfs/mounted/q create
    or simply
    ~ > rq /path/to/nfs/mounted/q c

    submit, s :

    submit jobs to a queue to be proccesed by a feeding node. any 'mode_args'
    are taken as the command to run. note that 'mode_args' are subject to shell
    expansion - if you don't understand what this means do not use this feature
    and pass jobs on stdin.

    when running in submit mode a file may by specified as a list of commands to
    run using the '--infile, -i' option. this file is taken to be a newline
    separated list of commands to submit, blank lines and comments (#) are
    allowed. if submitting a large number of jobs the input file method is
    MUCH, more efficient. if no commands are specified on the command line rq
    automatically reads them from STDIN. yaml formatted files are also allowed
    as input (http://www.yaml.org/) - note that the output of nearly all rq
    commands is valid yaml and may, therefore, be piped as input into the submit
    command. the leading '---' of yaml file may not be omitted.

    when submitting the '--priority, -p' option can be used here to determine
    the priority of jobs. priorities may be any whole number - zero is the
    default. note that submission of a high priority job will NOT supplant
    currently running low priority jobs, but higher priority jobs WILL always
    migrate above lower priority jobs in the queue in order that they be run as
    soon as possible. constant submission of high priority jobs may create a
    starvation situation whereby low priority jobs are never allowed to run.
    avoiding this situation is the responsibility of the user. the only
    guaruntee rq makes regarding job execution is that jobs are executed in an
    'oldest highest priority' order and that running jobs are never supplanted.
    jobs submitted with the '--stage' option will not be run by any node and
    will remain in a 'holding' state until updated (see update mode) into the
    'pending' mode, this option allows jobs to entered, or staged, in the queue
    and made candidates for running at a later date.

    examples :

    0) submit the job ls to run on some feeding host

    ~ > rq q s ls

    1) submit the job ls to run on some feeding host, at priority 9

    ~ > rq -p9 q s ls

    2) submit 42000 jobs (quietly) from a command file, marking them as
    restartable should the node they are running on reboot.

    ~ > wc -l cmdfile
    ~ > rq q s --quiet --restartable < cmdfile

    3) submit 42 priority 9 jobs from a command file.

    ~ > wc -l cmdfile
    ~ > rq -p9 q s < cmdfile

    4) submit 42 priority 9 jobs from a command file, marking them as
    'important' using the '--tag, -t' option.

    ~ > wc -l cmdfile
    ~ > rq -p9 -timportant q s < cmdfile

    5) re-submit all the 'important' jobs (see 'query' section below)

    ~ > rq q query tag=important | rq q s

    6) re-submit all jobs which are already finished (see 'list' section

    ~ > rq q l f | rq q s

    7) stage the job wont_run_yet to the queue in a 'holding' state. no
    feeder will run this job until it's state is upgraded to 'pending'

    ~ > rq q s --stage wont_run_yet

    resubmit, r :

    resubmit jobs back to a queue to be proccesed by a feeding node. resubmit
    is essentially equivalent to submitting a job that is already in the queue
    as a new job and then deleting the original job except that using resubmit
    is atomic and, therefore, safer and more efficient. read docs for delete
    and submit for more info.

    examples :

    0) resubmit job 42 to the queue. afterwards

    list, l, ls :

    list mode lists jobs of a certain state or job id. state may be one of
    pending, holding, running, finished, dead, or all. any 'mode_args' that are
    numbers are taken to be job id's to list.

    states may be abbreviated to uniqueness, therefore the following shortcuts
    apply :

    p => pending
    h => holding
    r => running
    f => finished
    d => dead
    a => all

    examples :

    0) show everything in q
    ~ > rq q list all
    ~ > rq q l all
    ~ > export RQ_Q=q
    ~ > rq l

    1) show q's pending jobs
    ~ > rq q list pending

    2) show q's running jobs
    ~ > rq q list running

    3) show q's finished jobs
    ~ > rq q list finished

    4) show job id 42
    ~ > rq q l 42

    5) show q's holding jobs
    ~ > rq q list holding

    status, t :

    status mode shows the global state the queue. there are no 'mode_args'.
    the meaning of each state is as follows:

    pending => no feeder has yet taken this job
    holding => a hold has been placed on this job, thus no feeder will start
    running => a feeder has taken this job
    finished => a feeder has finished this job
    dead => rq died while running a job, has restarted, and moved
    this job to the dead state

    note that rq cannot move jobs into the dead state unless it has been
    restarted. this is because no node has any knowledge of other nodes and
    cannot possibly know if a job was started on a node that died, or is simply
    taking a very long time. only the node that dies, upon restart, can
    determine that is has jobs that 'were started before it started' and move
    these jobs into the dead state. normally only a machine crash would cause a
    job to be placed into the dead state. dead jobs are never automatically
    restarted, this is the responsibility of an operator.

    examples :

    0) show q's status

    ~ > rq q t

    delete, d :

    delete combinations of pending, holding, finished, dead, or jobs specified
    by jid. the delete mode is capable of parsing the output of list and query
    modes, making it possible to create custom filters to delete jobs meeting
    very specific conditions.

    'mode_args' are the same as for list.

    note that it is NOT possible to delete a running job. rq has a
    decentralized architechture which means that compute nodes are completely
    independant of one another; an extension is that there is no way to
    communicate the deletion of a running job from the queue the the node
    actually running that job. it is not an error to force a job to die
    prematurely using a facility such as an ssh command spawned on the remote
    host to kill it. once a job has been noted to have finished, whatever the
    exit status, it can be deleted from the queue.

    examples :

    0) delete all pending, finished, and dead jobs from a queue

    ~ > rq q d all

    1) delete all pending jobs from a queue

    ~ > rq q d p

    2) delete all finished jobs from a queue

    ~ > rq q d f

    3) delete jobs via hand crafted filter program

    ~ > rq q list | yaml_filter_prog | rq q d

    an example ruby filter program (you have to love this)

    require 'yaml'
    joblist = YAML::load STDIN
    y joblist.select{|job| job['command'] =~ /bombing_program/}

    this program reads the list of jobs (yaml) from stdin and then dumps
    only those jobs whose command matches 'bombing_program', which is
    subsequently piped to the delete command.

    update, u :

    update assumes all leading arguments are jids to update with subsequent
    key=value pairs. currently only the 'command', 'priority', and 'tag' fields
    of pending jobs can be generically updated and the 'state' field may be
    toggled between pending and holding.


    0) update the priority of job 42

    ~ > rq q update 42 priority=7

    1) update the priority of all pending jobs

    ~ > rq q update pending priority=7

    2) query jobs with a command matching 'foobar' and update their command
    to be 'barfoo'

    ~ > rq q q "command like '%foobar%'" |\
    rq q u command=barfoo

    3) place a hold on jid 2

    ~ > rq q u 2 state=holding

    4) place a hold on all jobs with tag=disk_filler

    ~ > rq q q tag=disk_filler | rq q u state=holding

    5) remove the hold on jid 2

    ~ > rq q u 2 state=pending

    query, q :

    query exposes the database more directly the user, evaluating the where
    clause specified on the command line (or read from STDIN). this feature can
    be used to make a fine grained slection of jobs for reporting or as input
    into the delete command. you must have a basic understanding of SQL syntax
    to use this feature, but it is fairly intuitive in this limited capacity.


    0) show all jobs submitted within a specific 10 minute range

    ~ > rq q query "started >= '2004-06-29 22:51:00' and started < '2004-06-29 22:51:10'"

    1) shell quoting can be tricky here so input on STDIN is also allowed to
    avoid shell expansion

    ~ > cat constraints.txt
    started >= '2004-06-29 22:51:00' and
    started < '2004-06-29 22:51:10'

    ~ > rq q query < contraints.txt
    or (same thing)

    ~ > cat contraints.txt| rq q query

    2) this query output might then be used to delete those jobs

    ~ > cat contraints.txt | rq q q | rq q d

    3) show all jobs which are either finished or dead

    ~ > rq q q "state='finished' or state='dead'"

    4) show all jobs which have non-zero exit status

    ~ > rq q query exit_status!=0

    5) if you plan to query groups of jobs with some common feature consider
    using the '--tag, -t' feature of the submit mode which allows a user to
    tag a job with a user defined string which can then be used to easily
    query that job group

    ~ > rq q submit --tag=my_jobs < joblist
    ~ > rq q query tag=my_jobs

    6) in general all but numbers will need to be surrounded by single quotes
    unless the query is a 'simple' one. a simple query is a query with no
    boolean operators, not quotes, and where every part of it looks like

    key op value

    with ** NO SPACES ** between key, op, and value. if, and only if, the
    query is 'simple' rq will contruct the where clause appropriately. the
    operators accepted, and their meanings, are

    = : equivalence : sql =
    =~ : matches : sql like
    !~ : not matches : sql not like

    match, in the context is ** NOT ** a regular expression but a sql style
    string match. about all you need to know about sql matches is that the
    '%' char matches anything. multiple simple queries will be joined with
    boolean 'and'

    this sounds confusing - it isn't. here are some examples of simple

    query :
    rq q query tag=important

    where_clause :
    "( tag = 'important' )"

    query :
    rq q q priority=6 restartable=true

    where_clause :
    "( priority = 6 ) and ( restartable = 'true' )"

    query :
    rq q q command=~%bombing_job% runner=~%node_1%

    where_clause :
    "( command like '%bombing_job%') and (runner like '%node_1%')"

    execute, e :

    execute mode is to be used by expert users with a knowledge of sql syntax
    only. it follows the locking protocol used by rq and then allows the user
    to execute arbitrary sql on the queue. unlike query mode a write lock on
    the queue is obtained allowing a user to definitively shoot themselves in
    the foot. for details on a queue's schema the file 'db.schema' in the queue
    directory should be examined.

    examples :

    0) list all jobs

    ~ > rq q execute 'select * from jobs'

    configure, C :

    this mode is not supported yet.

    snapshot, p :

    snapshot provides a means of taking a snapshot of the q. use this feature
    when many queries are going to be run; for example when attempting to figure
    out a complex pipeline command your test queries will not compete with the
    feeders for the queue's lock. you should use this option whenever possible
    to avoid lock competition.


    0) take a snapshot using default snapshot naming, which is made via the
    basename of the q plus '.snapshot'

    ~ > rq /path/to/nfs/q snapshot

    1) use this snapshot to chceck status

    ~ > rq ./q.snapshot status

    2) use the snapshot to see what's running on which host

    ~ > rq ./q.snapshot list running | grep `hostname`

    note that there is also a snapshot option - this option is not the same as
    the snapshot command. the option can be applied to ANY command. if in
    effect then that command will be run on a snapshot of the database and the
    snapshot then immediately deleted. this is really only useful if one were
    to need to run a command against a very heavily loaded queue and did not
    wish to wait to obtain the lock. eg.

    0) get the status of a heavily loaded queue

    ~ > rq q t --snapshot

    1) same as above

    ~ > rq q t -s

    ** IMPORTANT **

    a really great way to hang all processing in your queue is to do this

    rq q list | less

    and then leave for the night. you hold a read lock you won't release
    until less dies. this is what snapshot is made for! use it like

    rq q list -s | less

    now you've taken a snapshot of the queue to list so your locks affect no

    lock, L :

    lock the queue and then execute an arbitrary shell command. lock mode uses
    the queue's locking protocol to safely obtain a lock of the specified type
    and execute a command on the user's behalf. lock type must be one of

    (r)ead | (sh)ared | (w)rite | (ex)clusive

    examples :

    0) get a read lock on the queue and make a backup

    ~ > rq q L read -- cp -r q q.bak

    (the '--' is needed to tell rq to stop parsing command line
    options which allows the '-r' to be passed to the 'cp' command)

    ** IMPORTANT **

    this is another fantastic way to freeze your queue - use with care!

    backup, b :

    backup mode is exactly the same as getting a read lock on the queue and
    making a copy of it. this mode is provided as a convenience.

    0) make a backup of the queue using default naming ( qname + timestamp + .bak )

    ~ > rq q b

    1) make a backup of the queue as 'q.bak'

    ~ > rq q b q.bak

    rotate, r :

    rotate mode is conceptually similar to log rolling. normally the list of
    finished jobs will grow without bound in a queue unless they are manually
    deleted. rotation is a method of trimming finished jobs from a queue
    without deleting them. the method used is that the queue is copied to a
    'rotation'; all jobs that are dead or finished are deleted from the original
    queue and all pending and running jobs are deleted from the rotation. in
    this way the rotation becomes a record of the queue's finished and dead jobs
    at the time the rotation was made.

    0) rotate a queue using default rotation name

    ~ > rq q rotate

    1) rotate a queue naming the rotation

    ~ > rq q rotate q.rotation

    2) a crontab entry like this could be used to rotate a queue daily

    59 23 * * * rq q rotate `date +q.%Y%m%d`

    feed, f :

    take jobs from the queue and run them on behalf of the submitter as quickly
    as possible. jobs are taken from the queue in an 'oldest highest priority'
    first order.

    feeders can be run from any number of nodes allowing you to harness the CPU
    power of many nodes simoultaneously in order to more effectively clobber
    your network, anoy your sysads, and set output raids on fire.

    the most useful method of feeding from a queue is to do so in daemon mode so
    that if the process loses it's controling terminal it will not exit when you
    exit your terminal session. use the '--daemon, -d' option to accomplish
    this. by default only one feeding process per host per queue is allowed to
    run at any given moment. because of this it is acceptable to start a feeder
    at some regular interval from a cron entry since, if a feeder is alreay
    running, the process will simply exit and otherwise a new feeder will be
    started. in this way you may keep feeder processing running even acroess
    machine reboots without requiring sysad intervention to add an entry to the
    machine's startup tasks.

    examples :

    0) feed from a queue verbosely for debugging purposes, using a minimum and
    maximum polling time of 2 and 4 respectively. you would NEVER specify
    polling times this brief except for debugging purposes!!!

    ~ > rq q feed -v4 -m2 -M4

    1) same as above, but viewing the executed sql as it is sent to the

    ~ > RQ_SQL_DEBUG=1 rq q f -v4 -m2 -M4

    2) feed from a queue in daemon mode - logging to /home/ahoward/rq.log

    ~ > rq q f -d -l/home/ahoward/rq.log

    log rolling in daemon mode is automatic so your logs should never need
    to be deleted to prevent disk overflow.

    3) use something like this sample crontab entry to keep a feeder running
    forever - it attempts to (re)start every fifteen minutes but exits if
    another process is already feeding.

    # your crontab file - sample only

    */15 * * * * /full/path/to/bin/rq /full/path/to/nfs/mounted/q f -d -l/home/username/cfq.log -q

    the '--quiet, -q' here tells rq to exit quietly (no STDERR)
    when another process is found to already be feeding so that no cron
    message would be sent under these conditions.

    start :

    the start mode is equivalent to running the feed mode except the --daemon is
    implied so the process instantly goes into the background. also, if no log
    (--log) is specified in start mode a default one is used. the default is

    ENV['HOME'] + '/' + File::basename(queue) + '.log'

    the crontab line above could just as well be

    */15 * * * * /full/path/to/bin/rq /full/path/to/nfs/mounted/q start -q

    with the resulting log ending up in ~/q.log

    examples :

    0) start a daemon process feeding from q

    ~ > rq q start

    shutdown :

    tell a running feeder to finish any pending jobs and then to exit. this is
    equivalent to sending signal 'SIGTERM' to the process - this is what using
    'kill pid' does by default.

    examples :

    0) stop a feeding process, if any, that is feeding from q. allow all jobs
    to be finished first.

    ~ > rq q shutdown


    if you are keeping your feeder alive with a crontab entry you'll need to
    comment it out before doing this or else it will simply re-start!!!

    stop :

    tell any running feeder to stop NOW. this sends signal 'SIGKILL' (-9) to
    the feeder process. the same warning as for shutdown applies!!!

    examples :

    0) stop a feeding process, if any, that is feeding from q. allow NO jobs
    to be finished first - exit instantly.

    ~ > rq q stop

    feeder :

    show the pid, if any, of the feeder

    ~ > rq q feeder

    feeder <15366>

    help, h :

    this message

    examples :

    0) get this message

    ~> rq q help
    ~> rq help

    - realize that your job is going to be running on a remote host and this has
    implications. paths, for example, should be absolute, not relative.
    specifically the submitted job script must be visible from all hosts
    currently feeding from a queue as must be the input and output

    - jobs are currently run under the bash shell using the --login option.
    therefore any settings in your .bashrc will apply - specifically your PATH
    setting. you should not, however, rely on jobs running with any given

    - you need to consider __CAREFULLY__ what the ramifications of having multiple
    instances of your program all potentially running at the same time will be.
    for instance, it is beyond the scope of rq to ensure multiple instances of a
    given program will not overwrite each others output files. coordination
    of programs is left entirely to the user.

    - the list of finished jobs will grow without bound unless you sometimes
    delete some (all) of them. the reason for this is that rq cannot know when
    the user has collected the exit_status of a given job, and so keeps this
    information in the queue forever until instructed to delete it. if you have
    collected the exit_status of you job(s) it is not an error to then delete
    that job from the finished list - the information is kept for your
    informational purposes only. in a production system it would be normal to
    periodically save, and then delete, all finished jobs.

    - know that it is a VERY bad idea to spawn several dozen process all
    reading/writing huge output files to a single NFS server. use this paradigm

    copy data locally from input space
    work on date
    move data to output space

    the vsftp daemon is an excellent utility to have running on hosts in your
    cluster so anonymous ftp can be used to get/put data.

    - know that nfs locking is very, very easy to break with firewalls put in
    place by over zealous system administrators. be postive not only that nfs
    locking works, but that lock recovery server/client crash or reboot works as
    well. http://nfs.sourceforge.net/ is the place to learn about NFS. my
    experience thus far is that there are ZERO properly configured NFS
    installations in the world. please test yours. contact me for a simple
    script which can assist you. donations of beer may be required.

    RQ_Q: set to the full path of nfs mounted queue

    the queue argument to all commands may be omitted if, and only if, the
    environment variable 'RQ_Q' contains the full path to the q. eg.

    ~ > export RQ_Q=/full/path/to/my/q

    this feature can save a considerable amount of typing for those weak of

    success : $? == 0
    failure : $? != 0

    - kim baugh : patient tester and design input
    - jeff safran : the guy can break anything
    - chris elvidge : made it possible
    - trond myklebust : tons of help with nfs
    - jamis buck : for writing the sqlite bindings for ruby
    - _why : for writing yaml for ruby
    - matz : for writing ruby


    0 < bugno && bugno <= 42

    reports to

    --priority=priority, -p
    modes <submit> : set the job(s) priority - lowest(0) .. highest(n) -
    (default 0)
    --tag=tag, -t
    modes <submit> : set the job(s) user data tag
    modes <submit> : set the job(s) required runner(s)
    modes <submit> : set the job(s) to be restartable on node reboot
    modes <submit> : set the job(s) initial state to be holding (default
    modes <submit> : infile
    --quiet, -q
    modes <submit, feed> : do not echo submitted jobs, fail silently if
    another process is already feeding
    --daemon, -D
    modes <feed> : spawn a daemon
    modes <feed> : the maximum number of concurrent jobs run
    modes <feed> : specify transaction retries
    modes <feed> : specify min sleep
    modes <feed> : specify max sleep
    --snapshot, -s
    operate on snapshot of queue
    --verbosity=verbostiy, -v
    0|fatal < 1|error < 2|warn < 3|info < 4|debug - (default info)
    --log=path, -l
    set log file - (default stderr)
    daily | weekly | monthly - what age will cause log rolling (default
    size in bytes - what size will cause log rolling (default nil)
    --help, -h
    this message
    show version number


    | ara [dot] t [dot] howard [at] noaa [dot] gov
    | all happiness comes from the desire for others to be happy. all misery
    | comes from the desire for oneself to be happy.
    | -- bodhicaryavatara
    Ara.T.Howard, Dec 11, 2005
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Paul L. Du Bois

    Queue.Queue-like class without the busy-wait

    Paul L. Du Bois, Mar 24, 2005, in forum: Python
    Antoon Pardon
    Apr 4, 2005
  2. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Russell Warren
    Jun 27, 2006
  3. Kceiw
    Jim Langston
    Mar 14, 2006
  4. Gabriel Rossetti
    Jerry Hill
    Apr 25, 2008
  5. Kris

Share This Page