Alternative to Parallel::ForkManager

N

nolo contendere

Scenario:
I am expecting 3 files in a drop directory. They won't
necessarily all arrive at the same time. I want to begin processing
the each file as soon as it arrives (or as close to arrival time as is
reasonable). Would the best way to go about this be to simply have a
script that takes a filename as a parameter and marks the file as
'currently processing' when it begins to process the file (or could
move the file to a different directory)?

I could kick off 3 daemon processes looking in the drop directory, and
sleep every 5 secs, for instance.

That seems to me, to be a straightforward, if clumsy, approach. I was
wondering if there was a module that could accomplish this task more
elegantly--Parallel::ForkManager, at least in my experience, doesn't
seem entirely suited to this particular task.

Or I could code my own fork,exec,wait/waitpid.

I know TMTOWTDI, but I was seeking to benefit from others' experience,
and for a 'best practice'.

Sorry there's no tangible code; this is more of a conceptual question
I guess.
 
P

Peter Makholm

nolo contendere said:
I know TMTOWTDI, but I was seeking to benefit from others' experience,
and for a 'best practice'.

If portability isn't a issue, you platform might support some kind of
monitoring of parts of the filesystem. Then you can get events when
files are created in you spool directory og moved there.

Linux::Inotify2 is a linux only-solution I'm using for a couple of
scripts. Another usable module coudl be SGI::FAM, which should be
supported on a broader range of unices.

I have been looking for something like Net::Server for spool dirs a
couple of times without finding anything really useful.

//Makholm
 
X

xhoster

nolo contendere said:
Scenario:
I am expecting 3 files in a drop directory. They won't
necessarily all arrive at the same time. I want to begin processing
the each file as soon as it arrives (or as close to arrival time as is
reasonable).

What is the relationship between the 3 files? Presumably, this whole
thing will happen more than once, right, otherwise you wouldn't need
to automate it? So what is the difference between "3 files show up,
and that happens 30 times" and just "90 files show up"?
Would the best way to go about this be to simply have a
script that takes a filename as a parameter and marks the file as
'currently processing' when it begins to process the file (or could
move the file to a different directory)?

I could kick off 3 daemon processes looking in the drop directory, and
sleep every 5 secs, for instance.

Do the file's contents show up atomically with the file's name? If not,
the process could see the file is there and start processing it, even
though it is not completely written yet.
That seems to me, to be a straightforward, if clumsy, approach. I was
wondering if there was a module that could accomplish this task more
elegantly--Parallel::ForkManager, at least in my experience, doesn't
seem entirely suited to this particular task.

Why don't you think it is suited? It seems well suited, unless there
are details that I am missing (or maybe you are Windows or something where
forking isn't as robust).

my $pm=Parallel::ForkManager->new(3);
foreach my $file (@ARGV) {
$pm->start() and next;
process($file);
$pm->finish();
};
$pm->wait_all_children();

Where process() subroutine first waits for the named $file to exist, then
processes it.

Xho

--
-------------------- http://NewsReader.Com/ --------------------
The costs of publication of this article were defrayed in part by the
payment of page charges. This article must therefore be hereby marked
advertisement in accordance with 18 U.S.C. Section 1734 solely to indicate
this fact.
 
T

Ted Zlatanov

nc> I am expecting 3 files in a drop directory. They won't
nc> necessarily all arrive at the same time. I want to begin processing
nc> the each file as soon as it arrives (or as close to arrival time as is
nc> reasonable). Would the best way to go about this be to simply have a
nc> script that takes a filename as a parameter and marks the file as
nc> 'currently processing' when it begins to process the file (or could
nc> move the file to a different directory)?

nc> I could kick off 3 daemon processes looking in the drop directory, and
nc> sleep every 5 secs, for instance.

nc> That seems to me, to be a straightforward, if clumsy, approach. I was
nc> wondering if there was a module that could accomplish this task more
nc> elegantly--Parallel::ForkManager, at least in my experience, doesn't
nc> seem entirely suited to this particular task.

nc> Or I could code my own fork,exec,wait/waitpid.

Get Tie::ShareLite from CPAN.

In each process, lock a shared hash and insert an entry for the new file
when it's noticed in the idle loop. If the file already exists in the
hash, do nothing. The first process to notice the file wins.

Now, unlock the hash and work with the file. When done, move the file
out, lock the hash again, and remove the entry you inserted.

The advantage is that you can store much more in the hash than just the
filename, so this is handy for complex processing. Also, no file
renaming is needed.

A simpler version is just to rename the file to "$file.$$" where $$ is
your PID. If, after the rename, the renamed file is there, your process
won against the others and you can work with the file. Note there could
be name collisions with an existing file, but since PIDs are unique on
the machine, you can just remove that bogus file. Just be aware this is
the quick and dirty solution.

Another approach is to use a Maildir structure, which can handle
multiple readers and writers atomically, even over NFS. You just need
to map your incoming queue into a Maildir structure; there's no need to
actually have mail in the files. This is good if you expect lots of
volume, network access, etc. complications to your original model.

Ted
 
B

Ben Morrow

Quoth Peter Makholm said:
If portability isn't a issue, you platform might support some kind of
monitoring of parts of the filesystem. Then you can get events when
files are created in you spool directory og moved there.

Linux::Inotify2 is a linux only-solution I'm using for a couple of
scripts. Another usable module coudl be SGI::FAM, which should be
supported on a broader range of unices.

SGI::FAM only works under Irix. I've been meaning to port it to other
systems that support fam (and gamin, the GNU rewrite) but haven't got
round to it yet. There is Sys::Gamin, but it doesn't have any tests and
doesn't appear to be maintained.

Other OS-specific alternatives include IO::KQueue for BSDish systems,
and Win32::ChangeNotify for Win32. This seems like a perfect opportunity
for someone to write an OS-independant wrapper module, but AFAIK no-one
has yet.

Ben
 
P

Peter Makholm

Ben Morrow said:
SGI::FAM only works under Irix. I've been meaning to port it to other
systems that support fam (and gamin, the GNU rewrite) but haven't got
round to it yet.

Never used the module myself (should have made that clear) and I have
to admit that my only reason to assume that it is usable on other
platforms is that File::Tail::FAM talk about Linux.

//Makholm
 
N

nolo contendere

What is the relationship between the 3 files?  Presumably, this whole
thing will happen more than once, right, otherwise you wouldn't need
to automate it?  So what is the difference between "3 files show up,
and that happens 30 times" and just "90 files show up"?

The timing. But you do point out that I can move the glob logic into
each thread. Currently I have it outside the init of my $pm object.
Do the file's contents show up atomically with the file's name?  If not,
the process could see the file is there and start processing it, even
though it is not completely written yet.

Yes. This is handled by a separate, asynchronous process.
Why don't you think it is suited?  It seems well suited, unless there
are details that I am missing (or maybe you are Windows or something where
forking isn't as robust).

my $pm=Parallel::ForkManager->new(3);
foreach my $file (@ARGV) {
  $pm->start() and next;
  process($file);
  $pm->finish();};

$pm->wait_all_children();

Where process() subroutine  first waits for the named $file to exist, then
processes it.

This is what I have, and again, I think I just needed to move the glob
function ( get_files() below ) into each thread. I won't know the
exact filename beforehand, so can't pass that to the child process and
have it wait for it.

my $done = 0;
while ( is_before($stop_checking_time) && !$done ) {
get_files( $loadcount, \$filecount, \@files, \$num_threads );

print "About to process $class files...\n";

if ( $filecount > $loadcount ) {
die "ERROR: Found too many files: expecting $loadcount files,
but found $filecount files. " .
"Maybe you want to increase the 'loadcount' parameter in
'$conf_file'?";
}
else {
my $pm = Parallel::ForkManager->new( $num_threads );
init_pm( $pm );

my $itr;
while ( @files ) {
my $file = shift @files;
++$itr;

my ( $err_log, $txn_log ) = init_logs( $file );
my $id = "file=$file\:\:err_log=$err_log";

my @parms;
if ( $class eq 'PRICE' ) {
@parms = ( $file, $err_log, $txn_log );
}
else {
@parms = ( $file );
}

$pm->start( $id ) and next;

$process{$class}->( @parms );
archive_file( $file );

$pm->finish;
}
$pm->wait_all_children;
if ( $filecount == $loadcount ) {
$done = 1;
}
}
}

sub get_files {
my ( $loadcount, $filecount_ref, $filesref, $numthreads_ref ) =
@_;

if ( $$filecount_ref == $loadcount ) {
++$$filecount_ref;
return;
}

@$filesref = glob("$dropdir/$class\_*");

my $diff = $loadcount - $$filecount_ref;

if ( @$filesref == 0 ) {
print localtime() . " Waiting on $diff out of $loadcount
file(s). " .
"About to sleep $check_interval seconds before checking
again...\n";
sleep $check_interval;
}
else {
$$numthreads_ref = @$filesref;
$$filecount_ref += @$filesref;
show_files( $filesref );
}
}
 
N

nolo contendere

nc>      I am expecting 3 files in a drop directory. They won't
nc> necessarily all arrive at the same time. I want to begin processing
nc> the each file as soon as it arrives (or as close to arrival time as is
nc> reasonable). Would the best way to go about this be to simply have a
nc> script that takes a filename as a parameter and marks the file as
nc> 'currently processing' when it begins to process the file (or could
nc> move the file to a different directory)?

nc> I could kick off 3 daemon processes looking in the drop directory, and
nc> sleep every 5 secs, for instance.

nc> That seems to me, to be a straightforward, if clumsy, approach. I was
nc> wondering if there was a module that could accomplish this task more
nc> elegantly--Parallel::ForkManager, at least in my experience, doesn't
nc> seem entirely suited to this particular task.

nc> Or I could code my own fork,exec,wait/waitpid.

Get Tie::ShareLite from CPAN.

In each process, lock a shared hash and insert an entry for the new file
when it's noticed in the idle loop.  If the file already exists in the
hash, do nothing.  The first process to notice the file wins.

Now, unlock the hash and work with the file.  When done, move the file
out, lock the hash again, and remove the entry you inserted.

The advantage is that you can store much more in the hash than just the
filename, so this is handy for complex processing.  Also, no file
renaming is needed.

This is similar in concept to what I was doing with
Parallel::ForkManager, only with a "global" array.
A simpler version is just to rename the file to "$file.$$" where $$ is
your PID.  If, after the rename, the renamed file is there, your process
won against the others and you can work with the file.  Note there could
be name collisions with an existing file, but since PIDs are unique on
the machine, you can just remove that bogus file.  Just be aware this is
the quick and dirty solution.

Yeah, PIDs can be reused, but a filename/timestamp/pid combo would be
effectively unique. This is an example of my "mark the file as
currently processing" tactic. another solution would be to move it to
a tmp or work dir.
Another approach is to use a Maildir structure, which can handle
multiple readers and writers atomically, even over NFS.  You just need
to map your incoming queue into a Maildir structure; there's no need to
actually have mail in the files.  This is good if you expect lots of
volume, network access, etc. complications to your original model.

This is interesting! I'll do some research into Maildir.

Ted, thanks for the ideas! I appreciate the different perspectives.
 
N

nolo contendere

Never used the module myself (should have made that clear) and I have
to admit that my only reason to assume that it is usable on other
platforms is that File::Tail::FAM talk about Linux.

//Makholm

I appreciate the effort Peter, however I'm currently stuck on Solaris.
 
T

Ted Zlatanov

nc> This is similar in concept to what I was doing with
nc> Parallel::ForkManager, only with a "global" array.

Yes, but notice you can suddenly access the global hash from any Perl
program, not just the managed ones. The hash becomes your API.

nc> Yeah, PIDs can be reused, but a filename/timestamp/pid combo would be
nc> effectively unique. This is an example of my "mark the file as
nc> currently processing" tactic. another solution would be to move it to
nc> a tmp or work dir.

By the way, if you need a unique name, use the File::Temp module. I
should have mentioned that.

Ted
 
X

xhoster

nolo contendere said:
The timing.

I still don't understand. How is the timing significant? If you want
each file to start being processed as soon as it shows up, then what
difference does it make whether they tend to show up in clumps of three?
As soon as they show up is as soon as they show up, regardless of when that
is. Is there something significant that the clumps of three have in common
*other* than merely their timing?

This is what I have, and again, I think I just needed to move the glob
function ( get_files() below ) into each thread.

If you do that, the "threads" will be fighting over the files. You will
have to code that very, very carefully. But depending on your answer to my
first question, it might be moot.

I won't know the
exact filename beforehand, so can't pass that to the child process and
have it wait for it.

Is there a pattern, like one file will end in _1, one end in _2, and one
end in _3? If so, give each child a different (and mutually exclusive)
pattern to glob on. That way they won't fight over the files.

my $done =3D 0;
while ( is_before($stop_checking_time) && !$done ) {
get_files( $loadcount, \$filecount, \@files, \$num_threads );
....
my $pm =3D Parallel::ForkManager->new( $num_threads );

$num_threads determines the *maximum* number of processes that will be live
at any one time. This should be determined based on the number of CPUs or
the amount of main memory or the IO bandwidth that your server has. It
should not be determined by the count of the number of tasks to be done, as
you seem to be doing here.

Xho

--
-------------------- http://NewsReader.Com/ --------------------
The costs of publication of this article were defrayed in part by the
payment of page charges. This article must therefore be hereby marked
advertisement in accordance with 18 U.S.C. Section 1734 solely to indicate
this fact.
 
N

nolo contendere

I still don't understand.  How is the timing significant?  If you want
each file to start being processed as soon as it shows up, then what
difference does it make whether they tend to show up in clumps of three?
As soon as they show up is as soon as they show up, regardless of when that
is.  Is there something significant that the clumps of three have in common
*other* than merely their timing?

The difference lies in my implementation of the solution, not
necessarily the problem. Historically I've used Parallel::ForkManager
in cases where there were many more jobs to do than there were CPUs,
and those jobs were all ready to be done. In that scenario, I would
initiate <num CPU> processes, loop through the jobs and assign a job
to each process until all the jobs were done. In the case mentioned in
this thread, all the jobs are not ready to be done at process-
initiation time. If I were to use my old implementation, then it's
possible that 2 out of 3 files showed up at time t0. So 2 processes
were kicked off. Shortly after, the 3rd file showed up. But my script
doesn't know that until the last of the 2 processes is finished, and
so I must wait to process the 3rd file.

By including the sleep/check code in the logic for each process, I can
handle this case more efficiently.

So to answer your earlier questions of the difference it makes, and
the significance: it changed my thought process (hopefully for the
better) around how to handle this incarnation of staggered-yet-
concurrent job processing.
If you do that, the "threads" will be fighting over the files.  You will
have to code that very, very carefully.  But depending on your answer tomy
first question, it might be moot.

Yes, the "very, very carefully" is why I posted to begin with, hoping
for an elegant and efficient solution.
Is there a pattern, like one file will end in _1, one end in _2, and one
end in _3?  If so, give each child a different (and mutually exclusive)
pattern to glob on.  That way they won't fight over the files.

The pattern is: <CLASS>_YYYYMMDDhhmmss_nnn

The glob is on <CLASS>_

There may be skips in the 'nnn' sequence, which is why rather than
attempting to be more specific on the glob pattern, i had hoped to
mark the files as 'being-processed' by either atomic rename, or atomic
mv to a work/tmp dir.
$num_threads determines the *maximum* number of processes that will be live
at any one time.  This should be determined based on the number of CPUs or
the amount of main memory or the IO bandwidth that your server has.  It
should not be determined by the count of the number of tasks to be done, as
you seem to be doing here.

Yeah, I know, it's dangerous. There *shouldn't* be more than 40 files
at a time (I know, I know, stupid to believe this will actually be
true), and each process is calling Tcl script which will load the file
to a Sybase table (I don't have control over this). I think that this
is less bound by CPU, and more by IO, so I don't think $num_procs >
$num_cpus should be that much of an issue. Of course, I could be
wrong. This would require testing.
 
X

xhoster

nolo contendere said:
The difference lies in my implementation of the solution, not
necessarily the problem. ....
So to answer your earlier questions of the difference it makes, and
the significance: it changed my thought process (hopefully for the
better) around how to handle this incarnation of staggered-yet-
concurrent job processing.

OK, so let me try to change your thought process yet again, then :)

The master process does all the waiting. That way, it fights with no
one but itself. First it waits for a file to exist (if necessary) then
it waits for ForkManager to let it start a new process (if necessary).
It does a rename in between. There is no one else trying to do the rename,
so no worry about race conditions (unless you unwisely start two master
processes!).


## Set a reasonable upper limit of 10. May never be reached!
my $pm=Parallel::ForkManager->new(10);

while ( is_before($stop_checking_time) && !$done ) {
my @files = glob "${class}_*";
sleep 1 unless (@files);
foreach my $file (@files) {
my $new_name="foo_$file";
## it is important that the renamed file won't match the glob on
## the next time through the loop!
rename $files, $new_name or die $!;
$pm->start() and next;
process($new_name);
$pm->finish();
};
};

If the main process remembers what files were already started, then
it could remember to skip those ones the next time through and wouldn't
need to bother with the renaming.

Of course, you could always change the sleep loop into some kind of FS
change notifier, as was discussed elsewhere in the thread. But doing a
glob once a second is probably not going to be a problem. At least, I
wouldn't worry about until it proves itself to be a problem.
Yes, the "very, very carefully" is why I posted to begin with, hoping
for an elegant and efficient solution.

It's best to avoid needing to do it all, like above. But if I needed
to do this, this is how I would try. If you are using threads and they
have the same PID, then you will have to use something other than $$.

my $file="some_file_we_are_fighting_over";
my $newname="$$.$file";
if (-e $newname) {
die "Should never happen. A previous job must have had the same PID,
Accomplished the rename, then failed to clean up after itself";
};
if (rename $file, $newname) {
## we won. Do whatever needs doing;
## But since we are paranoid...
-e $newname or die "$newname: how could this be?";
process($newname);
} elsif ($!{ENOENT}) {
## The file didn't exist, most likely someone else beat us to it.
## Do nothing, fall through to next iteration
} else {
## something *else* went wrong. What could it be?
die "Rename $file, $newname failed in an unexpected way: $!";
}
Yeah, I know, it's dangerous. There *shouldn't* be more than 40 files
at a time (I know, I know, stupid to believe this will actually be
true),

But there is no reason to take this risk. Hard code 40 as the max number
of processes (I'd probably go lower myself, but if you think 40 is the
number below which you don't need to worry...). If there are ever more
than forty, then some will have to wait in line, and don't crash your
machine. If there are never more than forty, then hardcoding the value of
40 instead of passing around $num_threads doesn't change the behavior at
all (and makes the code cleaner to boot).


Xho

--
-------------------- http://NewsReader.Com/ --------------------
The costs of publication of this article were defrayed in part by the
payment of page charges. This article must therefore be hereby marked
advertisement in accordance with 18 U.S.C. Section 1734 solely to indicate
this fact.
 
M

Martijn Lievaart

I appreciate the effort Peter, however I'm currently stuck on Solaris.

I thought FAM works on Solaris, so you may not be completely out of luck.
Haven't used any of the FAM modules myself though.

M4
 
N

nolo contendere

OK, so let me try to change your thought process yet again, then :)

Xho, I don't know how much they're paying you right now, but I'm
certain it's not enough :). Thanks for your help!
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,756
Messages
2,569,535
Members
45,008
Latest member
obedient dusk

Latest Threads

Top