|
From: Jason B. <br...@us...> - 2004-11-30 21:41:41
|
Update of /cvsroot/openxcat/openxcat/bin In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv10667 Modified Files: psh Log Message: Fix some forking stuff, add quiet option, make forking default (not threading) Index: psh =================================================================== RCS file: /cvsroot/openxcat/openxcat/bin/psh,v retrieving revision 1.10 retrieving revision 1.11 diff -u -d -r1.10 -r1.11 --- psh 16 Nov 2004 21:29:00 -0000 1.10 +++ psh 30 Nov 2004 21:41:25 -0000 1.11 @@ -20,18 +20,20 @@ my $killed : shared = 0; my $node; +#select STDOUT; +#$| = 1; + my $maxjobs; my $forkjobs = 500; my $serial = 0; my $interface; my $flags; my $p = 1; -my $fork = 0; - -$SIG{INT} = \&killed; -$SIG{HUP} = \&killed; -$SIG{QUIT}= \&killed; -$SIG{TERM}= \&killed; +my $q = 0; +my $fork = 1; +my $parent; +my @children; +my @nodes; if ( $ENV{'XCAT_PSHMAXJOBS'} ) { $maxjobs = $ENV{'XCAT_PSHMAXJOBS'}; @@ -55,11 +57,31 @@ getconfig(); +if ( $q ) { + $q = '>/dev/null'; +} else { + $q = ''; +} + +if ( ! $fork ) { + $SIG{INT} = \&killed; + $SIG{HUP} = \&killed; + $SIG{QUIT}= \&killed; + $SIG{TERM}= \&killed; +} else { + $parent = $$; + $SIG{INT} = \&killedfork; + $SIG{HUP} = \&killedfork; + $SIG{QUIT}= \&killedfork; + $SIG{TERM}= \&killedfork; +} + if ( scalar(@ARGV) < 1 ) { usage(); } -my ($noderange, $command) = @ARGV; +my $noderange = shift; +my $command = join(' ', @ARGV); my $qstat = `which qstat 2>/dev/null`; @@ -86,16 +108,18 @@ exit; } else { docommand(); +# select STDOUT; +# $| = 0; +# $| = 1; } sub docommand { my $nr = shift || $noderange; - my @nodes = noderange($nr); + @nodes = noderange($nr); if ( $serial ) { foreach my $node (sort(@nodes)) { if ( ! $p || ping($node) ) { - #print "Running '$RSHC $flags $node$interface '$command' 2>&1 | sed 's/^/$node: /'\n"; - my @results = split("\n", `$RSHC $flags $node$interface '$command' 2>&1 | sed 's/^/$node: /'`); + my @results = split("\n", `$RSHC $flags $node$interface '$command' $q 2>&1 | sed 's/^/$node: /'`); @results = grep { $_ !~ /.Xauthority$/ } @results; print join("\n", @results); print "\n"; @@ -112,36 +136,29 @@ } my $jobs = 0; if ( $fork ) { - my @children; - my $pid; - $maxjobs = $forkjobs; - my $jobs = 0; - if ( scalar(@nodes) < $maxjobs ) { - $maxjobs = scalar(@nodes); - } - - #print "Forking up to $maxjobs times for " . scalar(@nodes) . " nodes\n"; - - my $nc = scalar(@nodes); - while ( my $tnode = pop(@nodes) ) { - if ( $jobs >= $maxjobs ) { - my $donepid = wait(); - @children = grep { $_ != $donepid } @children; - $jobs--; - } - if ( $pid = fork ) { - $jobs++; - push(@children, $pid); + use IO::Handle; + my %pids; + my %output; + foreach my $n (@nodes) { + $output{$n} = new IO::Handle; + if ( $pids{$n} = open($output{$n}, "-|") ) { + + } elsif ( defined( $pids{$n} ) ) { + exec("$RSHC $flags $n$interface '$command' $q 2>&1 | sed 's/^/$n: /'") + or die "Couldn't connect to $n: $!\n"; } else { - croak "Can't fork: $!" unless defined $pid; - commandfork($tnode); - exit; + carp "Can't fork to connect to $n"; } } - foreach my $child (@children) { - waitpid($child, 0); + + foreach my $node (@nodes) { + my $OUTPUT = $output{$node}; + $OUTPUT->autoflush(1); + while ( <$OUTPUT> ) { + print $_; + } + close $OUTPUT; } - exit; } else { if ( scalar(@nodes) > $maxjobs ) { foreach ( 1..$maxjobs) { @@ -177,38 +194,51 @@ die "Got killed with SIG$signame\n"; } +sub killedfork { + my $signame = shift; + if ( $$ == $parent ) { + if ( scalar(@children) ) { + my $donepid = wait(); + @children = grep { $_ != $donepid } @children; + } + print "Have not processed " . scalar(@nodes) . " nodes: " . join(', ', sort(@nodes)) . "\n"; + die "Got killed with SIG$signame\n"; + } else { + die "Child dying\n"; + } +} + sub commandthread { +# select STDOUT; +# $| = 1; while( $node = $nodequeue->dequeue() ) { if ( $p && ! ping($node) ) { print "$node$interface: noping\n"; next; } - my @results = split("\n", `$RSHC $flags $node$interface "$command" 2>&1 | sed "s/^/$node: /"`); + my @results = split("\n", `$RSHC $flags $node$interface "$command" $q 2>&1 | sed "s/^/$node: /"`); @results = grep { $_ !~ /.Xauthority$/ } @results; print join("\n", @results); - print "\n"; + if ( scalar(@results) ) { + print "\n"; + } } } sub commandfork { +# select STDOUT; +# $| = 1; my $node = shift; if ( $p && ! ping($node) ) { print "$node$interface: noping\n"; exit; } my @results; - exec("$RSHC $flags $node$interface '$command' 2>&1 | sed 's/^/$node: /'"); - #my @results = split("\n", `$RSHC $flags $node$interface "$command" 2>&1 | sed "s/^/$node: /"`); - #@results = grep { $_ !~ /.Xauthority$/ } @results; - #if ( scalar(@results) ) { - # print join("\n", @results); - # print "\n"; - #} + exec("$RSHC $flags $node$interface '$command' $q 2>&1 | sed 's/^/$node: /'"); exit; } sub interactive { - my @nodes; if ( $p ) { @nodes = ping(noderange($noderange)); } else { @@ -266,8 +296,14 @@ if ($cwd) { $command = "cd $cwd; $command"; docommand(join(',', @nodes)); + select STDOUT; + $| = 0; + $| = 1; } else { docommand(join(',', @nodes)); + select STDOUT; + $| = 0; + $| = 1; } } } @@ -297,6 +333,7 @@ 'interface=s' => sub { $interface = "-$_"; }, 'ping!' => \$p, 'fork!' => \$fork, + 'quiet!' => \$q, 'help|?' => sub { usage(); }, 'version' => sub { version; }, ); |