runaway memory leak with LWP and Fork()ing on Windows

Discussion in 'Perl Misc' started by bulk88@hotmail.com, Nov 2, 2007.

  1. Guest

    I am trying to non-blocking/asynchronously fetch/work with websites as
    a standalone program (not a server script).

    My idea is to use Perl, LWP, and simple perl-provided forking on Win32
    (Windows XP in my case). Child threads do not need to talk back
    anything to the parent. Parent fires a bunch of childs off, and
    wait()s for them to complete, then fires off more children in a loop.
    My code has a severe memory leak. I have isolated it into the code
    below.

    If this program is modified to not fork (look in comments; change
    'fork()' to '0' and 'exit(0);' to '#exit(0);' ), it will
    only take 8mb ram and 7mb VM through its whole lifetime which is 500
    iterations for this example. If forking is on, then it will balloon to
    290mb Ram usage, 470mb VM usage when it ends at 500 iterations.
    Placing the loop on infinity will cause all memory to be used up and
    eventually a crash. I would want to be go with a infinite loop, or
    user specified count in the 1000s of iterations. I believe I wait();
    an adequate amount of times. In my research, if I don't wait(),
    program will silently end with no errors after 64 unwait()ed children
    threads due to limitations in fork() implementation in ActivePerl/
    Win32 perl, but I am fine with the 64 limit.

    My forking code seems to work fine by itself. To see this, replace
    doGet sub with "mySleep" sub. mySleep will simulate the latency of the
    web page fetches. MySleep uses between 13 to 38 MB of RAM, never
    exceeding 40mb. It approaches 40mb right before the wait() cleanup
    that happens every 60 iterations, then falls to 13mb and SLOWLY goes
    back up again. With LWP code, the memory NEVER goes down after a 60
    iteration wait() cleanup. Undef()ing $ua and $response in the child
    does not help the memleak at all. I think there might be a garbage
    collection problem, but I dont know.

    Keywords: win32, LWP, UserAgent, fork, windows, threads, ithreads

    I assume there is a problem with LWP somehow. I would rather not use a
    real threading library. It seems unnecessary and not KISS. Can someone
    help?

    #!/usr/bin/perl -w
    use strict;
    use warnings;
    use LWP::UserAgent;
    use Time::HiRes qw( sleep );
    use String::Escape qw( printable );
    sub doGet();

    for (my $i = 0; $i < 500; $i++)
    {
    my $pid;
    $pid = fork(); # $pid = 0; # for non-fork design
    if ($pid == 0)
    {
    doGet ();
    #mySleep(); # replace doGet with mySleep to see forking code
    working right
    print "child $i\n";
    exit(0); # comment out to switch to non-fork design
    }
    else {
    print "parent $i $pid \n";
    sleep(.1);
    if ($i % 60 == 0) #cleanup thread IDs to not run into 64
    limit on windows fork
    { foreach(1..60) {wait();}};
    }
    }
    #end of script/pause to check memory usage, hopefully will cause a
    pause on non-win32
    $^O eq 'MSWin32' ? system('pause') : system('read -n 1 -p "Press Any
    Key to Continue..."');

    #LWP function
    sub doGet()
    {
    my $ua = LWP::UserAgent->new;
    #url can be anything, google was used just as an example
    my $response = $ua->get('http://www.google.com/images/firefox/
    op_icon.png');

    if ($response->is_success) {
    print substr(printable( $response->content), 0, 30)."\n";
    }
    else {
    die $response->status_line;
    }
    #undef($ua);
    #undef($response);
    }

    sub mySleep()
    {
    sleep(rand(5));
    }

    __END__


    Here is my perl -V:

    Summary of my perl5 (revision 5 version 8 subversion 8) configuration:
    Platform:
    osname=MSWin32, osvers=5.00, archname=MSWin32-x86-multi-thread
    uname=''
    config_args='undef'
    hint=recommended, useposix=true, d_sigaction=undef
    usethreads=define use5005threads=undef useithreads=define
    usemultiplicity=d
    fine
    useperlio=define d_sfio=undef uselargefiles=define usesocks=undef
    use64bitint=undef use64bitall=undef uselongdouble=undef
    usemymalloc=n, bincompat5005=undef
    Compiler:
    cc='cl', ccflags ='-nologo -GF -W3 -MD -Zi -DNDEBUG -O1 -DWIN32 -
    D_CONSOLE
    DNO_STRICT -DHAVE_DES_FCRYPT -DNO_HASH_SEED -DUSE_SITECUSTOMIZE -
    DPRIVLIB_LAST_
    N_INC -DPERL_IMPLICIT_CONTEXT -DPERL_IMPLICIT_SYS -DUSE_PERLIO -
    DPERL_MSVCRT_RE
    DFIX',
    optimize='-MD -Zi -DNDEBUG -O1',
    cppflags='-DWIN32'
    ccversion='12.00.8804', gccversion='', gccosandvers=''
    intsize=4, longsize=4, ptrsize=4, doublesize=8, byteorder=1234
    d_longlong=undef, longlongsize=8, d_longdbl=define, longdblsize=8
    ivtype='long', ivsize=4, nvtype='double', nvsize=8,
    Off_t='__int64', lseeks
    ze=8
    alignbytes=8, prototype=define
    Linker and Libraries:
    ld='link', ldflags ='-nologo -nodefaultlib -debug -opt:ref,icf -
    libpath:"C
    \Perl\lib\CORE" -machine:x86'
    libpth=\lib
    libs= oldnames.lib kernel32.lib user32.lib gdi32.lib
    winspool.lib comdlg3
    ..lib advapi32.lib shell32.lib ole32.lib oleaut32.lib netapi32.lib
    uuid.lib ws2
    32.lib mpr.lib winmm.lib version.lib odbc32.lib odbccp32.lib
    msvcrt.lib
    perllibs= oldnames.lib kernel32.lib user32.lib gdi32.lib
    winspool.lib com
    lg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib netapi32.lib
    uuid.lib
    ws2_32.lib mpr.lib winmm.lib version.lib odbc32.lib odbccp32.lib
    msvcrt.lib
    libc=msvcrt.lib, so=dll, useshrplib=true, libperl=perl58.lib
    gnulibc_version=''
    Dynamic Linking:
    dlsrc=dl_win32.xs, dlext=dll, d_dlsymun=undef, ccdlflags=' '
    cccdlflags=' ', lddlflags='-dll -nologo -nodefaultlib -debug -
    opt:ref,icf
    libpath:"C:\Perl\lib\CORE" -machine:x86'


    Characteristics of this binary (from libperl):
    Compile-time options: MULTIPLICITY PERL_IMPLICIT_CONTEXT
    PERL_IMPLICIT_SYS PERL_MALLOC_WRAP
    PL_OP_SLAB_ALLOC USE_ITHREADS USE_LARGE_FILES
    USE_PERLIO USE_SITECUSTOMIZE
    Locally applied patches:
    ActivePerl Build 822 [280952]
    Iin_load_module moved for compatibility with build 806
    PerlEx support in CGI::Carp
    Less verbose ExtUtils::Install and Pod::Find
    Patch for CAN-2005-0448 from Debian with modifications
    Rearrange @INC so that 'site' is searched before 'perl'
    Partly reverted 24733 to preserve binary compatibility
    MAINT31223 plus additional changes
    31490 Problem bootstraping Win32CORE
    31324 Fix DynaLoader::dl_findfile() to locate .so files again
    31214 Win32::GetLastError fails when first called
    31211 Restore Windows NT support
    31188 Problem killing a pseudo-forked child on Win32
    29732 ANSIfy the PATH environment variable on Windows
    27527,29868 win32_async_check() can loop indefinitely
    26970 Make Passive mode the default for Net::FTP
    26379 Fix alarm() for Windows 2003
    24699 ICMP_UNREACHABLE handling in Net::ping
    Built under MSWin32
    Compiled at Jul 31 2007 19:34:48
    @INC:
    C:/Perl/site/lib
    C:/Perl/lib
     
    , Nov 2, 2007
    #1
    1. Advertising

  2. Guest

    Without delving too deep into your problem I think you could give a
    try to my attached module PreforkAgent.pm.
    It was made for very much the same task - stress testing a website in
    fact.

    You could probably use the module in your script without any
    adaptations. Just follow the pod instructions on how to set up your
    callback routines.
    In contrast to your current concept the specified number of children
    is created only once before the actual work starts. And each returning
    child is immediantly given the next task as long as there are any
    more.

    Under Linux the Module works like a charm with up to 500 children.
    Because the overhead for the forks is all done beforehand it's
    possible to create a heavy load on a target web (or any other) server
    with only moderate local means.
    If you want to put a cap on the load, you will need to do so in your
    wrapper script.

    Because I never tried it in Windows I'd be delighted to hear how you
    fare.

    Cheers, Steffen

    #
    # PreforkAgent
    #
    # Allows execution of many jobs in parallel.
    # All parent / child communication is implemented with pipes.
    # Only the signal INT is caught by the parent for cleanup purposes.
    #
    # Steffen Heinrich - Jun 2007
    #

    package PreforkAgent;

    use strict;

    my $VERSION = '0.03';

    #################################################
    # libs and class vars

    use IO::Select;

    my $EOF_MSG_SEQ = "\x1F"; # ASCII cotrol character US (Unit Separator)

    ############################
    # constructor

    sub new {
    my $class = shift;

    my $me = bless {
    debug_out => 0,
    parent => $$,
    listener => IO::Select->new(),
    kids_to_spawn => 0,
    kids => 0,
    living => {},
    pids => {},
    jobs => {},
    child_prepare => sub {1}
    }, $class;

    $me
    }


    ############################
    # methods

    sub register {
    # registers one or more callback routines with the agent
    my $self = shift() or return;
    my %subs = @_ or return;

    my @errors;
    while (my ($s, $c) = each %subs) {

    if ($s !~ /^(child_prepare|fetch_next_task|process_job|
    process_response)$/) {
    push @errors, "'$s' is not a known sub";

    } elsif (defined($c) && ref($c) && ref($c) =~ /CODE/) {
    $self->{$s} = $c;

    } else {
    push @errors, "'$s' does not reference a sub";
    }
    }

    die join("\n", @errors)."\n" if @errors;
    }


    sub spawn {
    # creates a given number of children
    # and opens bidrectional pipes for each
    my $self = shift() or return;
    my $kids_to_spawn = shift() or return;

    $self->{kids_to_spawn} = $kids_to_spawn;

    my $process_job = $self->{process_job}
    or die "process_job() must have been registered with PreforkAgent
    before a call to spawn()!\n";

    my $sel = $self->{listener};

    # prevent zombies since we won't wait()
    $SIG{CHLD} = 'IGNORE';

    # fork loop
    for my $child (1..$kids_to_spawn) {
    my $whdl = 'W'.$child;
    my $rhdl = 'R'.$child;

    { no strict 'refs';
    # open bidirect comm
    pipe $rhdl, WH or die "pipe1: $!"; # parent <- child
    pipe RH, $whdl or die "pipe2: $!"; # child <- parent

    # register the read handle with the ones to listen to
    $sel->add(\*$rhdl);
    }
    # save write handle connected with readhandle
    $self->{living}{$child} = $whdl;

    select((select(WH), $| = 1)[0]); # autoflush
    select((select($whdl), $| = 1)[0]); # autoflush

    my $pid;
    unless ($pid = fork()) {
    # Child process
    # closes unnecessary handles
    close $rhdl;
    close $whdl;

    # execute individual initialization
    my $init = $self->{child_prepare};
    defined(&$init($child)) or die;

    # creates a new listener
    $sel = IO::Select->new;
    # registers the one handle to listen to
    $sel->add(\*RH);

    # signals readiness
    _write_into_pipe(\*WH, 'READY');

    while ($sel->can_read) {
    my $job = _read_from_pipe(\*RH);

    if ($job eq 'QUIT') {
    last;

    } else {
    # do something
    my $answer = &$process_job($job, $child);
    _write_into_pipe(\*WH, $answer);
    }
    }
    # child is done, unload and quit
    $sel->remove(\*RH);
    close WH;
    close RH;

    exit 0;
    }
    # Parent process closes unnecessary handles
    close WH;
    close RH;

    # and registers child
    $self->{pids}{$child} = $pid;

    # parent catches SIGINT
    $SIG{INT} = sub {$self->cleanup()}
    unless $self->{kids}++;

    } # loop to start others

    ($self->{kids} == $self->{kids_to_spawn})
    or die "Could only spawn $self->{kids} kids of $self-
    >{kids_to_spawn}: $!";


    $self->{kids}
    } # end of spawn()


    sub assign {
    # Sending out jobs to any child which is ready to listen
    # and collecting any responses which are then being reported to the
    registered callback.
    # As long as their are more jobs to do, they are being immediately
    assigned to returning children.
    my $self = shift() or return;

    my $fetch_next_task = $self->{fetch_next_task};
    my $process_response = $self->{process_response};

    ($fetch_next_task && $process_response)
    or die "fetch_next_task() and process_response() must have been
    registered with PreforkAgent before a call to assign()!\n";

    $self->{kids} > 0
    or die "You need to call spawn(kids) before assigning jobs!\n";

    my $sel = $self->{listener};

    # work loop
    while ($self->{kids}) {

    my $not_finished = 1;

    while (my @ready = $sel->can_read) {

    foreach my $rhdl (@ready) {

    my $child = '';
    { no strict 'refs';
    *{$rhdl} =~ /^(.+::)?R(\d+)$/
    and $child = $2;
    }
    my $whdl = $self->{living}{$child};

    my $response = _read_from_pipe($rhdl);

    unless ($response eq 'READY') {
    &$process_response($response, $child);
    }

    # assign next task
    my $task = undef;
    $not_finished = $not_finished && defined($task = &
    $fetch_next_task($child));

    if ($child && $whdl && $not_finished) {

    _write_into_pipe($whdl, $task);

    $self->{jobs}{$child}++ if $self->{debug_out};

    } else {
    # tell child to exit

    _write_into_pipe($whdl, 'QUIT');

    # unregister child
    $sel->remove($rhdl);
    delete $self->{living}{$child};

    # close handles to child
    close $rhdl;
    close $whdl;

    $self->{kids}--;
    }
    }
    }
    }
    # since all children exited and we set SIGCHLD = IGNORE
    # we don't have to wait()

    if ($self->{debug_out}) {
    my $job_cnt = $self->{jobs};
    foreach my $child (sort {$job_cnt->{$b} <=> $job_cnt->{$a}} keys %
    $job_cnt) {
    printf "%4s: %5d\n", $child, $job_cnt->{$child};
    }
    }

    $self->cleanup();
    } # end of assign()


    ############################
    # subroutines

    sub _read_from_pipe {
    my ($fh) = @_;

    my $blksize = (stat $fh)[11] || 16384;
    my $offset = 0;
    my $buf = '';

    while (my $len = sysread($fh, $buf, $blksize, $offset)) {
    if (!defined $len) {
    next if $! =~ /^Interrupted/;
    die "System Read Error: $!\n";
    }
    $offset += $len;
    last if $buf =~ s/$EOF_MSG_SEQ$//o;
    }
    $buf
    }


    sub _write_into_pipe {
    my ($fh, $msg) = @_;

    $msg .= $EOF_MSG_SEQ;
    my $length = length($msg);

    my $blksize = (stat $fh)[11] || 16384;
    my $offset = 0;

    while ($length) {
    my $len = syswrite($fh, $msg, $blksize, $offset);
    die "System Write Error: $!\n"
    unless defined $len;
    $length -= $len;
    $offset += $len;
    }
    $offset
    }


    sub cleanup {
    my $self = shift() or return;
    # only for parent
    return unless $self->{parent} == $$;

    # print "\$kids = $self->{kids}\n";
    # print "\%living count = ", scalar(keys %{$self->{living}}), "\n";
    # print "select bitmap = '", defined $self->{listener}->bits()?
    (unpack 'b*', $self->{listener}->bits()):'', "'\n";


    while (my ($kid, $pid) = each %{$self->{pids}}) {
    my $ps = `ps $pid`;
    if ($ps =~ /$0\b/so) {
    print STDERR "killing $pid\n";
    `kill $pid`;
    }
    delete $self->{pids}{$kid};
    }
    }

    1

    __END__


    =pod

    =head1 NAME

    PreforkAgent - A dispatch wrapper for simultanous tasks.

    =head1 PURPOSE

    Any big number of similar tasks that have to be run in parallel with
    outmost throughput.
    First, a given number of children is spawned. Then each of them is
    handed the next task
    from a common queue in succsession as they return with a response.

    =head1 SYNOPSIS

    use PreforkAgent;

    my $pfa = PreforkAgent->new or die;

    $pfa->register(
    child_prepare => \&individual_init, # this sub is optional
    fetch_next_task => \&next_job,
    process_job => \&dispatch,
    process_response => \&collect_response
    );

    my $GLOBAL_VAR = "fancy value";

    my $kid_count = $pfa->spawn(5) or die;

    $pfa->assign();

    exit;

    sub individual_init {
    # child will die if false is returned
    # this sub is optional
    my $child_id = shift() or return;
    # Child context:
    # can read $GLOBAL_VAR at time of spawn(), but not change

    # any initialization to be done by each child goes here
    ...
    return $success;
    }

    sub next_job {
    # must return a string, which will be subsequently passed to
    dispatch()
    # and MUST return undef, if finished.
    # Enables the main program to tie a certain job to a specific
    child's response returned in collect_response().
    my $child_id = shift;
    # Parent context:
    # can read AND write $GLOBAL_VAR

    ...
    return $str_job;
    }


    sub dispatch {
    # defines the parallel task for the children
    # processes the job and returns a serialized response
    my $str_job = shift() or return;
    my $child_id = shift() or return;
    # Child context:
    # can read $GLOBAL_VAR at time of spawn(), but not change

    ...
    return $str_response;
    }


    sub collect_response {
    # allows the main program to evaluate any of the child's reponses
    my $response = shift() or return;
    my $child_id = shift() or return;
    # Parent context:
    # can read AND write $GLOBAL_VAR

    ...
    }

    =head1 SEE ALSO

    ParallelUserAgent by Marc Langheinrich

    =head1 VERSION

    This document describes version 0.03.

    =head1 LICENSE

    Copyright (C) 2007, Steffen Heinrich. All rights reserved.

    This module is free software;
    you can redistribute it and/or modify it under the same terms as Perl
    itself.

    =cut
     
    , Nov 2, 2007
    #2
    1. Advertising

  3. J. Gleixner Guest

    wrote:
    > I am trying to non-blocking/asynchronously fetch/work with websites as
    > a standalone program (not a server script).
    >
    > My idea is to use Perl, LWP, and simple perl-provided forking on Win32
    > (Windows XP in my case). Child threads do not need to talk back
    > anything to the parent. Parent fires a bunch of childs off, and
    > wait()s for them to complete, then fires off more children in a loop.
    > My code has a severe memory leak. I have isolated it into the code
    > below.

    [...]

    > Keywords: ...


    No need for you to put 'Keywords' in your post.

    >
    > I assume there is a problem with LWP somehow. I would rather not use a
    > real threading library. It seems unnecessary and not KISS. Can someone
    > help?


    You could look at LWP::parallel::UserAgent

    And a helpful article using it, by Randal:

    http://www.stonehenge.com/merlyn/WebTechniques/col27.html
     
    J. Gleixner, Nov 2, 2007
    #3
  4. wrote:
    > Without delving too deep into your problem I think you could give a
    > try to my attached module PreforkAgent.pm.
    > It was made for very much the same task - stress testing a website in
    > fact.
    >

    <snip>

    > Because I never tried it in Windows I'd be delighted to hear how you
    > fare.

    <snip>

    You might like to consider packaging it up and releasing it on CPAN.
    This would make it much more likely that you will receive feedback :)

    Mark
     
    Mark Clements, Nov 4, 2007
    #4
  5. Guest

    >
    > <snip>
    >
    > You might like to consider packaging it up and releasing it on CPAN.
    > This would make it much more likely that you will receive feedback :)
    >
    > Mark


    You are absolutely right.
    But then I will have to bother with CPAN packaging and submission.
    Which is something I never felt to have the time for. ;-)

    Cheers, Steffen
     
    , Nov 5, 2007
    #5
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. vtcompsci

    runaway button

    vtcompsci, Feb 23, 2006, in forum: Java
    Replies:
    4
    Views:
    686
    vtcompsci
    Feb 24, 2006
  2. Follower
    Replies:
    0
    Views:
    323
    Follower
    May 7, 2004
  3. Jeff Mitchell

    runaway memory in extension

    Jeff Mitchell, May 19, 2004, in forum: Ruby
    Replies:
    1
    Views:
    90
    Joel VanderWerf
    May 19, 2004
  4. Martin DeMello

    runaway cgi process

    Martin DeMello, Sep 2, 2004, in forum: Ruby
    Replies:
    0
    Views:
    131
    Martin DeMello
    Sep 2, 2004
  5. Ara.T.Howard
    Replies:
    2
    Views:
    122
    HOWARD ARA T
    Sep 27, 2004
Loading...

Share This Page