Recursive parallel readdir() with drb and slave libraries

J

John Stoffel

Hi all,

I'm trying to write a ruby script to do a recursive descent, counting up
the size of each directory and it's contents. Obviously, you just
recurse down to the bottom, and work your way back up.

But I need to break this into parallel processes because of the sheer
size of my directories, on the order of 1-5Tbs in size, with millions of
files. I managed to write a version using the Ruby thread library, but
since Ruby 1.8.6 is not truly threaded, it didn't buy me anything.

So I've tried to use the 'drb' and 'slave' libraries to make my life
simpler. Basically, I want to have N slaves going at any one time.
When processing a directory, and I find a sub-directory to descend, I
make a call back using DRb to see if I can fork off another slave to do
the work, or if I just recursively do the work myself.

Once I'm done in a directory, I wait on any slaves still working below
me, before I return my results to the next higher level.

I'm obviously doing something completely wrong here, but I'm missing out
what. Can anyone help me look at this code and tell me what I'm doing
wrong here? Or what I'm missing when you try to run it and it just
barfs (sorta) all over the place?

#!/usr/bin/ruby

require 'getoptlong'
#require 'thread'
require 'slave'
require 'drb'

$VERSION = "v1.0";
$max_slaves = 1
$count = 50

URI = "drbunix:///tmp/readdir+" + Process.pid.to_s

slave_cnt_mutex = Mutex.new

opts = GetoptLong.new(
[ "--help", "-h", GetoptLong::NO_ARGUMENT],
[ "--kids", "-k", GetoptLong::REQUIRED_ARGUMENT]
)

#---------------------------------------------------------------------
class Master
def initialize(max_slaves=1)
@slaves = []
@max_slaves = max_slaves
@slave_count = 1
@slave_mutex = Mutex.new
# Probably not needed...
@slave_cv = ConditionVariable.new
end

def count
@slave_count
end

# Increment the count of slaves, returning 1 if incremented, 0 if not.
def addslave
ok = nil
@slave_mutex.synchronize do
if (@slave_count < @max_slaves) then
@slave_count += 1
ok = 1
end
end
ok
end

# Decrement the count of slaves
def remslave
@slave_mutex.synchronize do
if (@slave_count > 1) then
@slave_count -= 1
end
end
@slave_count
end

end

#---------------------------------------------------------------------
# Read a directory and add to the database; this function is recursive
# for sub-directories

def readdir(master,dir)

#puts "readdir(#{dir})"

size_file = {}
size_dir = {}
size_total = 0

# Slave pool for this level of readdir() recursion.
pool = []

# Traverse the directory and collect the size of all files and
# directories

begin
Dir.foreach(dir) do |f|
#print " #{f},"
if(f != "." && f != "..") then
f_full = addpath(dir, f)
stat = File.lstat(f_full)

if(!stat.symlink?) then

if(stat.file?) then
#puts " File: #{f}"
size = File.size(f_full)
size_file[f] = size
size_total += size
end

if(stat.directory?) then
#puts " DIR: #{f}, thread_count = #{@thread_count},
max_slaves = #{@max_slaves}."
if ($max_slaves <= 1) then
#puts " no threads."
size = readdir(master,f_full)
if (size > 0) then
size_dir[f] = size
size_total += size
end
else
ok = master.addslave
if (ok)
puts " Threaded!"
pool << Slave.object:)async => true) {
size = readdir(master,f_full)
puts "size = #{size}"
}
else
puts " no free threads, do anyway"
size = readdir(master,f_full)
if(size > 0) then
size_dir[f] = size
size_total += size
end
end
end
end
end
end
end
rescue SystemCallError => errmsg
puts errmsg
end

pool.each { |slave|
size_total += slave.value
#master.remslave
slave.shutdown
}

#Puts "Dir: #{dir} = #{size_total}"
return size_total
end

#---------------------------------------------------------------------
def usage
puts
puts "usage: readdir-drb [--kids NUM] <dir>"
puts " defaults to #{$max_kids} children"
puts
puts " version: #{$version}"
puts
end

#---------------------------------------------------------------------
def addpath(a, b)
return a + b if(a =~ /\/$/)
return a + "/" + b
end



#---------------------------------------------------------------------
# Main
#---------------------------------------------------------------------
$DEBUG = true

opts.each do |opt,arg|
case opt
when "--kids"
$max_slaves = arg.to_i
else
usage
exit
end
end

if ARGV.length != 1
puts "Missing dir argument (try --help)"
exit 0
end

dir = ARGV.shift

# Start the server which does the counting and coordination between
slaves.
DRb.start_service URI, Master.new($max_slaves)

# Fire up the first slave process which will do the work of readdir()
master = DRbObject.new_with_uri URI

# Now let's try to do a recursive readdir() algorith with threads.

size = readdir(master,dir)

sizekb = size / 1024;
sizemb = sizekb / 1024;
sizegb = sizemb / 1024;

puts ""
puts "Total size: #{size} B"
puts "Total size: #{sizekb} KB"
puts "Total size: #{sizemb} MB"
puts "Total size: #{sizegb} GB"
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,731
Messages
2,569,432
Members
44,832
Latest member
GlennSmall

Latest Threads

Top