How to make only one thread sleep?

R

Ralph Moritz

Hi,

I've written a little multi-threaded file monitoring program. When it
starts it creates a detached "factory" thread, which loops infinitely,
creating detached worker threads. In the main thread we enter a
"monitor" loop which scans a directory for new files at intervals and
inserts them into a queue for the factory thread to extract. The
factory thread will block if the maximum number of allowed worker
threads are already running or there is no data in the queue.

This design was chosen so that the monitor loop can continue running
even if the factory thread blocks. Some problems have appeared though:

* When I call sleep() in the main thread (monitor loop), it affects
the entire process, rather than just the main thread. How do I put
just one, specific thread to sleep?

* There appears to be a memory leak somewhere. When I run the program
with perl -w I get warnings about leaked scalars. How do I track
this down?

My program is included below in it's entirety. Any advice on the
design and the code will be much appreciated.

TIA,
Ralph
 
R

Ralph Moritz

I said:
My program is included below in it's entirety.

Sorry, I forgot to include the program :-] Here it is:

use strict;
use threads;
use threads::shared;
use Thread::Semaphore;
use Thread::Queue;
use File::Copy;
use Digest::MD5 qw(md5);

# ------ Fallback config values ------------------

# Path to application root directory
our $root_dir = '/home/ralph/Sources/QMSv2';

# Directory containing inbound files
our $data_dir = "$root_dir/inbound";

# Directory to put processed files
our $result_dir = "$root_dir/processed";

# Directory to put files containing errors
our $error_dir = "$root_dir/errors";

# Path to the log file
our $logfile = "$root_dir/logs/monit.log";

# Frequency to scan inbound directory (in seconds)
our $scan_interval = 60;

# File extensions to match against
our $file_ext = qr/.txt$/;

# Maximum number of worker threads to create
our $max_workers = 10;

# -----------------------------------------------

# HACK: hard-code path to config file
my $config_file = '/home/ralph/Sources/QMSv2/config.pl';
read_config($config_file);

my $io_sem = Thread::Semaphore->new; # I/O semaphore
my $th_sem = Thread::Semaphore->new($max_workers); # Thread semaphore
my $queue = Thread::Queue->new; # Data queue
my %files :shared;

monitor_loop($data_dir);

sub monitor_loop {
log_msg('Entering monitor loop');
my $factory = threads->create(\&factory_loop);
$factory->detach;

while (1) {
log_msg('Woke up. Scanning for files...');

opendir(my $indir, $data_dir) or
log_msg("Error: failed to open directory $data_dir: $!")
&& exit 1;
my @flist = grep { $_ = "$data_dir/$_" if /$file_ext/ }
readdir($indir);
closedir($indir) or
log_msg("Error: failed to close directory $data_dir: $!")
&& exit 1;

foreach my $fn (@flist) {
open(my $in, $fn) or log_msg("Error: failed to open file $fn:
$!")
&& exit 1;
my $digest = Digest::MD5->new->addfile($in)->hexdigest;
close($in);

# Ignore files which are still being processed.
lock(%files);
unless (exists $files{$digest}) {
log_msg("Queueing file $fn...");
$files{$digest} = $fn;
$queue->enqueue($digest);
}
}

log_msg("Sleeping...");

# BUG: this puts the whole process to sleep, instead
# of just the current thread.
sleep($scan_interval);
}
}

sub factory_loop {
log_msg('Entering factory loop');
while (1) {
$th_sem->down;
my $digest = $queue->dequeue;
log_msg('Dispatching worker thread...');
my $worker = threads->create(\&process_file, $digest);
$worker->detach;
}
}

sub process_file {
my $digest = shift;
my $file = $files{$digest};
open(my $in, $file) or log_msg("Error: failed to open file $file:
$!")
&& exit 1;
log_msg("Processing file $file...");

# ---- DUMMY PROCESSING CODE -----
my $count = 0;
$count++ while (my $line = <$in>);

close $in;
if ($count) {
log_msg("OK:\t$file");
move($file, $result_dir);
} else {
log_msg("ERR:\t$file");
move($file, $error_dir);
}
# -------------------------------

lock(%files);
delete $files{$digest};
$th_sem->up;
}

sub log_msg {
$io_sem->down;
my $msg = shift;
open(my $logfile, ">> $logfile");
print $logfile localtime() . " - $msg\n";
close($logfile);
$io_sem->up;
}

sub read_config {
my $file = shift;
open(my $cfg, $file) or
warn "Warning: failed to open config file $file: $!. Using default
values instead."
&& return;
my $config = join('', <$cfg>);
eval $config;
die "Error: in config file $file: $@" if $@;
}
 
R

Ralph Moritz

A. Sinan Unur said:
Your program has too many external dependencies for anyone else to try
to debug it. The need to post the shortest possible program that still
exhibits the problem.

There are no external dependencies. I posted the entire program.
There are oddities such as defining a subroutine warn with the name of a
builtin. Again, I don't know if this has anything to do with the problem
or even if it is a problem but why create complications?

I don't know what you mean. There is no definition for a subroutine
called warn() in the code I posted. ??
As far as I can see, your factory_loop does not limit the number of
threads it is creating.

Not explicitly, but if you look closely you'll see that it's calling
th_sem->down which decrements the semaphore's count by one.
If the count were to drop below zero, the semaphore blocks until
the count is raised again. This effectively makes factory_loop block
if the maximum number of worker threads are already running.
 
A

A. Sinan Unur

There are no external dependencies.

Yes it does.
I posted the entire program.

But one would need to create a config file, directories and files to be
able to run it. Reduce the program to something which still exhibits your
problem, and which others can run just by copying and pasting.

I don't know what you mean. There is no definition for a subroutine
called warn() in the code I posted. ?

I confused myself in an attempt to reduce your program to something that
can be run.
Not explicitly, but if you look closely you'll see that it's calling
th_sem->down which decrements the semaphore's count by one.
If the count were to drop below zero, the semaphore blocks until
the count is raised again. This effectively makes factory_loop block
if the maximum number of worker threads are already running.

It looks like my efforts have been wasted. Did you actually run the
program I posted? Does it work on your system as it does on mine? If so,
we can rule out any problems with sleep.

Now, it is your turn to do some work: Extend that program to what you
need in a step-by-step approach: Add one thing, test, add another, test.
You will find the solution to your problem. If not, you will at least
have come up with a program which others can use to help you.

Sinan


PS: Use the correct format for the signature separator. It is "dash-dash-
space-newline"
 
J

John W. Krahn

Ralph said:
I said:
My program is included below in it's entirety.

Sorry, I forgot to include the program :-] Here it is:

use warnings;
use strict;
use threads;
use threads::shared;
use Thread::Semaphore;
use Thread::Queue;
use File::Copy;
use Digest::MD5 qw(md5);

# ------ Fallback config values ------------------

# Path to application root directory
our $root_dir = '/home/ralph/Sources/QMSv2';

# Directory containing inbound files
our $data_dir = "$root_dir/inbound";

# Directory to put processed files
our $result_dir = "$root_dir/processed";

# Directory to put files containing errors
our $error_dir = "$root_dir/errors";

# Path to the log file
our $logfile = "$root_dir/logs/monit.log";

# Frequency to scan inbound directory (in seconds)
our $scan_interval = 60;

# File extensions to match against
our $file_ext = qr/.txt$/;

our $file_ext = qr/\.txt$/;

The . will match any character, you need to escape it.

# Maximum number of worker threads to create
our $max_workers = 10;

# -----------------------------------------------

# HACK: hard-code path to config file
my $config_file = '/home/ralph/Sources/QMSv2/config.pl';
read_config($config_file);

my $io_sem = Thread::Semaphore->new; # I/O semaphore
my $th_sem = Thread::Semaphore->new($max_workers); # Thread semaphore
my $queue = Thread::Queue->new; # Data queue
my %files :shared;

monitor_loop($data_dir);

sub monitor_loop {
log_msg('Entering monitor loop');
my $factory = threads->create(\&factory_loop);
$factory->detach;

while (1) {
log_msg('Woke up. Scanning for files...');

opendir(my $indir, $data_dir) or
log_msg("Error: failed to open directory $data_dir: $!")
&& exit 1;
my @flist = grep { $_ = "$data_dir/$_" if /$file_ext/ }
readdir($indir);

You probably want either:

my @flist = map /$file_ext/ ? "$data_dir/$_" : (), readdir($indir);

Or:

my @flist = map "$data_dir/$_", grep /$file_ext/, readdir($indir);


$_ is an alias to the current list element just like it is in for/foreach so
modifying $_ also propagates the changes back to the original list/array.




John
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,764
Messages
2,569,564
Members
45,039
Latest member
CasimiraVa

Latest Threads

Top