|
From: Jason B. <br...@us...> - 2004-11-16 20:53:15
|
Update of /cvsroot/openxcat/openxcat/bin In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv31179 Modified Files: psh Log Message: Add forking option for psh... speed increases dramatically on multiprocessor machines Index: psh =================================================================== RCS file: /cvsroot/openxcat/openxcat/bin/psh,v retrieving revision 1.8 retrieving revision 1.9 diff -u -d -r1.8 -r1.9 --- psh 16 Nov 2004 19:22:15 -0000 1.8 +++ psh 16 Nov 2004 20:53:03 -0000 1.9 @@ -21,10 +21,12 @@ my $node; my $maxjobs; +my $forkjobs = 500; my $serial = 0; my $interface; my $flags; my $p = 1; +my $fork = 0; $SIG{INT} = \&killed; $SIG{HUP} = \&killed; @@ -102,25 +104,59 @@ } } } else { - my $jobs = 0; - foreach ( @nodes ) { $nodequeue->enqueue( $_ ); } - if ( scalar(@nodes) > $maxjobs ) { - foreach ( 1..$maxjobs) { - $nodequeue->enqueue(undef); - push( @comthreads, threads->new(\&commandthread) ); + foreach ( @nodes ) { + $nodequeue->enqueue( undef ); + } + my $jobs = 0; + if ( $fork ) { + my @children; + my $pid; + $maxjobs = $forkjobs; + my $jobs = 0; + if ( scalar(@nodes) < $maxjobs ) { + $maxjobs = scalar(@nodes); + } + + #print "Forking up to $maxjobs times for " . scalar(@nodes) . " nodes\n"; + + my $nc = scalar(@nodes); + while ( my $tnode = pop(@nodes) ) { + if ( $jobs >= $maxjobs ) { + my $donepid = wait(); + @children = grep { $_ != $donepid } @children; + $jobs--; + } + if ( $pid = fork ) { + $jobs++; + push(@children, $pid); + } else { + croak "Can't fork: $!" unless defined $pid; + commandfork($tnode); + exit; + } } + foreach my $child (@children) { + waitpid($child, 0); + } + exit; } else { - my $thr = int(scalar(@nodes) / 4) || 1; - foreach ( 1 .. $thr ) { - push( @comthreads, threads->new(\&commandthread) ); - $nodequeue->enqueue(undef); + if ( scalar(@nodes) > $maxjobs ) { + foreach ( 1..$maxjobs) { + push( @comthreads, threads->new(\&commandthread) ); + } + } else { + my $thr = int(scalar(@nodes) / 4) || 1; + foreach ( 1 .. $thr ) { + push( @comthreads, threads->new(\&commandthread) ); + $nodequeue->enqueue(undef); + } + } + while ( my $thread = pop(@comthreads) ) { + $thread->join; } - } - while ( my $thread = pop(@comthreads) ) { - $thread->join; } } } @@ -154,6 +190,21 @@ } } +sub commandfork { + my $node = shift; + if ( $p && ! ping($node) ) { + print "$node$interface: noping\n"; + exit; + } + my @results = split("\n", `$RSHC $flags $node$interface "$command" 2>&1 | sed "s/^/$node: /"`); + @results = grep { $_ !~ /.Xauthority$/ } @results; + if ( scalar(@results) ) { + print join("\n", @results); + print "\n"; + } + exit; +} + sub interactive { my @nodes; if ( $p ) { @@ -224,9 +275,10 @@ print "$0 - Parallel Shell\n"; print version; print "Usage: $0 (-s) (-i interface) noderange command\n"; - print "--help Display this helpful information\n"; - print "--serial Run command serially in order\n"; - print "--interface <if> Use interface\n"; + print "--help Display this helpful information\n"; + print "--serial Run command serially in order\n"; + print "--interface <if> Use interface\n"; + print "--fork Use forking instead of threads\n"; print "\n"; print "noderange may be 'me' to specify all the nodes running your pbs\n"; print "job(s). You may also specify a PBS job ID to run on the nodes assigned\n"; @@ -239,11 +291,12 @@ Getopt::Long::Configure ("permute", "auto_abbrev", "pass_through"); GetOptions( - 'serial!' => \$serial, - 'interface=s' => sub { $interface = "-$_"; }, - 'ping!' => \$p, - 'help|?' => sub { usage(); }, - 'version' => sub { version; }, + 'serial!' => \$serial, + 'interface=s' => sub { $interface = "-$_"; }, + 'ping!' => \$p, + 'fork!' => \$fork, + 'help|?' => sub { usage(); }, + 'version' => sub { version; }, ); no Getopt::Long; } @@ -259,8 +312,8 @@ Options: --help Display this helpful information --serial Run command serially in order - --interface <if> Use interface\n"; - + --interface <if> Use interface + --fork Use forking instead of threads noderange may be 'me' to specify all the nodes running your pbs job(s). You may also specify a PBS job ID to run on the nodes assigned to that job ID. |