How to make only one thread sleep?

Discussion in 'Perl Misc' started by Ralph Moritz, Sep 13, 2006.

  1. Ralph Moritz

    Ralph Moritz Guest

    Hi,

    I've written a little multi-threaded file monitoring program. When it
    starts it creates a detached "factory" thread, which loops infinitely,
    creating detached worker threads. In the main thread we enter a
    "monitor" loop which scans a directory for new files at intervals and
    inserts them into a queue for the factory thread to extract. The
    factory thread will block if the maximum number of allowed worker
    threads are already running or there is no data in the queue.

    This design was chosen so that the monitor loop can continue running
    even if the factory thread blocks. Some problems have appeared though:

    * When I call sleep() in the main thread (monitor loop), it affects
    the entire process, rather than just the main thread. How do I put
    just one, specific thread to sleep?

    * There appears to be a memory leak somewhere. When I run the program
    with perl -w I get warnings about leaked scalars. How do I track
    this down?

    My program is included below in it's entirety. Any advice on the
    design and the code will be much appreciated.

    TIA,
    Ralph
     
    Ralph Moritz, Sep 13, 2006
    #1
    1. Advertising

  2. Ralph Moritz

    Ralph Moritz Guest

    I wrote:

    > My program is included below in it's entirety.


    Sorry, I forgot to include the program :-] Here it is:

    use strict;
    use threads;
    use threads::shared;
    use Thread::Semaphore;
    use Thread::Queue;
    use File::Copy;
    use Digest::MD5 qw(md5);

    # ------ Fallback config values ------------------

    # Path to application root directory
    our $root_dir = '/home/ralph/Sources/QMSv2';

    # Directory containing inbound files
    our $data_dir = "$root_dir/inbound";

    # Directory to put processed files
    our $result_dir = "$root_dir/processed";

    # Directory to put files containing errors
    our $error_dir = "$root_dir/errors";

    # Path to the log file
    our $logfile = "$root_dir/logs/monit.log";

    # Frequency to scan inbound directory (in seconds)
    our $scan_interval = 60;

    # File extensions to match against
    our $file_ext = qr/.txt$/;

    # Maximum number of worker threads to create
    our $max_workers = 10;

    # -----------------------------------------------

    # HACK: hard-code path to config file
    my $config_file = '/home/ralph/Sources/QMSv2/config.pl';
    read_config($config_file);

    my $io_sem = Thread::Semaphore->new; # I/O semaphore
    my $th_sem = Thread::Semaphore->new($max_workers); # Thread semaphore
    my $queue = Thread::Queue->new; # Data queue
    my %files :shared;

    monitor_loop($data_dir);

    sub monitor_loop {
    log_msg('Entering monitor loop');
    my $factory = threads->create(\&factory_loop);
    $factory->detach;

    while (1) {
    log_msg('Woke up. Scanning for files...');

    opendir(my $indir, $data_dir) or
    log_msg("Error: failed to open directory $data_dir: $!")
    && exit 1;
    my @flist = grep { $_ = "$data_dir/$_" if /$file_ext/ }
    readdir($indir);
    closedir($indir) or
    log_msg("Error: failed to close directory $data_dir: $!")
    && exit 1;

    foreach my $fn (@flist) {
    open(my $in, $fn) or log_msg("Error: failed to open file $fn:
    $!")
    && exit 1;
    my $digest = Digest::MD5->new->addfile($in)->hexdigest;
    close($in);

    # Ignore files which are still being processed.
    lock(%files);
    unless (exists $files{$digest}) {
    log_msg("Queueing file $fn...");
    $files{$digest} = $fn;
    $queue->enqueue($digest);
    }
    }

    log_msg("Sleeping...");

    # BUG: this puts the whole process to sleep, instead
    # of just the current thread.
    sleep($scan_interval);
    }
    }

    sub factory_loop {
    log_msg('Entering factory loop');
    while (1) {
    $th_sem->down;
    my $digest = $queue->dequeue;
    log_msg('Dispatching worker thread...');
    my $worker = threads->create(\&process_file, $digest);
    $worker->detach;
    }
    }

    sub process_file {
    my $digest = shift;
    my $file = $files{$digest};
    open(my $in, $file) or log_msg("Error: failed to open file $file:
    $!")
    && exit 1;
    log_msg("Processing file $file...");

    # ---- DUMMY PROCESSING CODE -----
    my $count = 0;
    $count++ while (my $line = <$in>);

    close $in;
    if ($count) {
    log_msg("OK:\t$file");
    move($file, $result_dir);
    } else {
    log_msg("ERR:\t$file");
    move($file, $error_dir);
    }
    # -------------------------------

    lock(%files);
    delete $files{$digest};
    $th_sem->up;
    }

    sub log_msg {
    $io_sem->down;
    my $msg = shift;
    open(my $logfile, ">> $logfile");
    print $logfile localtime() . " - $msg\n";
    close($logfile);
    $io_sem->up;
    }

    sub read_config {
    my $file = shift;
    open(my $cfg, $file) or
    warn "Warning: failed to open config file $file: $!. Using default
    values instead."
    && return;
    my $config = join('', <$cfg>);
    eval $config;
    die "Error: in config file $file: $@" if $@;
    }
     
    Ralph Moritz, Sep 13, 2006
    #2
    1. Advertising

  3. Ralph Moritz

    Ralph Moritz Guest

    A. Sinan Unur wrote:
    > "Ralph Moritz" <> wrote in
    > news::
    >
    > >> My program is included below in it's entirety.

    >
    > Your program has too many external dependencies for anyone else to try
    > to debug it. The need to post the shortest possible program that still
    > exhibits the problem.


    There are no external dependencies. I posted the entire program.

    > There are oddities such as defining a subroutine warn with the name of a
    > builtin. Again, I don't know if this has anything to do with the problem
    > or even if it is a problem but why create complications?


    I don't know what you mean. There is no definition for a subroutine
    called warn() in the code I posted. ??

    > As far as I can see, your factory_loop does not limit the number of
    > threads it is creating.


    Not explicitly, but if you look closely you'll see that it's calling
    th_sem->down which decrements the semaphore's count by one.
    If the count were to drop below zero, the semaphore blocks until
    the count is raised again. This effectively makes factory_loop block
    if the maximum number of worker threads are already running.

    --
    Ralph Moritz
     
    Ralph Moritz, Sep 13, 2006
    #3
  4. "Ralph Moritz" <> wrote in
    news::

    > A. Sinan Unur wrote:
    >> "Ralph Moritz" <> wrote in
    >> news::
    >>
    >> >> My program is included below in it's entirety.

    >>
    >> Your program has too many external dependencies for anyone else to try
    >> to debug it. The need to post the shortest possible program that still
    >> exhibits the problem.

    >
    > There are no external dependencies.


    Yes it does.

    > I posted the entire program.


    But one would need to create a config file, directories and files to be
    able to run it. Reduce the program to something which still exhibits your
    problem, and which others can run just by copying and pasting.


    >> There are oddities such as defining a subroutine warn with the name of
    >> a builtin. Again, I don't know if this has anything to do with the
    >> problem or even if it is a problem but why create complications?

    >
    > I don't know what you mean. There is no definition for a subroutine
    > called warn() in the code I posted. ?


    I confused myself in an attempt to reduce your program to something that
    can be run.

    >> As far as I can see, your factory_loop does not limit the number of
    >> threads it is creating.

    >
    > Not explicitly, but if you look closely you'll see that it's calling
    > th_sem->down which decrements the semaphore's count by one.
    > If the count were to drop below zero, the semaphore blocks until
    > the count is raised again. This effectively makes factory_loop block
    > if the maximum number of worker threads are already running.


    It looks like my efforts have been wasted. Did you actually run the
    program I posted? Does it work on your system as it does on mine? If so,
    we can rule out any problems with sleep.

    Now, it is your turn to do some work: Extend that program to what you
    need in a step-by-step approach: Add one thing, test, add another, test.
    You will find the solution to your problem. If not, you will at least
    have come up with a program which others can use to help you.

    Sinan


    > --
    > Ralph Moritz


    PS: Use the correct format for the signature separator. It is "dash-dash-
    space-newline"
     
    A. Sinan Unur, Sep 13, 2006
    #4
  5. Ralph Moritz wrote:
    > I wrote:
    >
    >>My program is included below in it's entirety.

    >
    > Sorry, I forgot to include the program :-] Here it is:


    use warnings;

    > use strict;
    > use threads;
    > use threads::shared;
    > use Thread::Semaphore;
    > use Thread::Queue;
    > use File::Copy;
    > use Digest::MD5 qw(md5);
    >
    > # ------ Fallback config values ------------------
    >
    > # Path to application root directory
    > our $root_dir = '/home/ralph/Sources/QMSv2';
    >
    > # Directory containing inbound files
    > our $data_dir = "$root_dir/inbound";
    >
    > # Directory to put processed files
    > our $result_dir = "$root_dir/processed";
    >
    > # Directory to put files containing errors
    > our $error_dir = "$root_dir/errors";
    >
    > # Path to the log file
    > our $logfile = "$root_dir/logs/monit.log";
    >
    > # Frequency to scan inbound directory (in seconds)
    > our $scan_interval = 60;
    >
    > # File extensions to match against
    > our $file_ext = qr/.txt$/;


    our $file_ext = qr/\.txt$/;

    The . will match any character, you need to escape it.


    > # Maximum number of worker threads to create
    > our $max_workers = 10;
    >
    > # -----------------------------------------------
    >
    > # HACK: hard-code path to config file
    > my $config_file = '/home/ralph/Sources/QMSv2/config.pl';
    > read_config($config_file);
    >
    > my $io_sem = Thread::Semaphore->new; # I/O semaphore
    > my $th_sem = Thread::Semaphore->new($max_workers); # Thread semaphore
    > my $queue = Thread::Queue->new; # Data queue
    > my %files :shared;
    >
    > monitor_loop($data_dir);
    >
    > sub monitor_loop {
    > log_msg('Entering monitor loop');
    > my $factory = threads->create(\&factory_loop);
    > $factory->detach;
    >
    > while (1) {
    > log_msg('Woke up. Scanning for files...');
    >
    > opendir(my $indir, $data_dir) or
    > log_msg("Error: failed to open directory $data_dir: $!")
    > && exit 1;
    > my @flist = grep { $_ = "$data_dir/$_" if /$file_ext/ }
    > readdir($indir);


    You probably want either:

    my @flist = map /$file_ext/ ? "$data_dir/$_" : (), readdir($indir);

    Or:

    my @flist = map "$data_dir/$_", grep /$file_ext/, readdir($indir);


    $_ is an alias to the current list element just like it is in for/foreach so
    modifying $_ also propagates the changes back to the original list/array.




    John
    --
    use Perl;
    program
    fulfillment
     
    John W. Krahn, Sep 13, 2006
    #5
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Stephen Miller
    Replies:
    3
    Views:
    4,032
    Stephen Miller
    Jul 2, 2004
  2. Gonzalo Moreno
    Replies:
    2
    Views:
    12,562
  3. Dmitry Teslenko
    Replies:
    0
    Views:
    657
    Dmitry Teslenko
    Jan 3, 2010
  4. Sam Roberts
    Replies:
    16
    Views:
    408
    Daniel Berger
    Feb 18, 2005
  5. Richard
    Replies:
    7
    Views:
    259
    Richard
    May 22, 2007
Loading...

Share This Page