From: <ha...@us...> - 2008-10-20 21:44:01
|
Revision: 2236 http://cogkit.svn.sourceforge.net/cogkit/?rev=2236&view=rev Author: hategan Date: 2008-10-20 21:43:53 +0000 (Mon, 20 Oct 2008) Log Message: ----------- fixed a few issues Modified Paths: -------------- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl =================================================================== --- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-10-13 20:37:21 UTC (rev 2235) +++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-10-20 21:43:53 UTC (rev 2236) @@ -70,6 +70,26 @@ } } +sub hts { + my ($H) = @_; + + my $k; + my $s = "{"; + my $first = 1; + + for $k (keys %$H) { + if (!$first) { + $s = $s.", "; + } + else { + $first = 0; + } + $s = $s."$k = $$H{$k}"; + } + + return $s."}"; +} + sub reconnect() { my $fail = 0; my $i; @@ -113,8 +133,9 @@ my $len = length($msg); my $buf = pack("VVV", $tag, $flags, $len); $buf = $buf.$msg; - #wlog("> len=$len, tag=$tag, flags=$flags, $msg\n"); + wlog("> len=$len, tag=$tag, flags=$flags, $msg\n"); + #($SOCK->send($buf) == length($buf)) || reconnect(); eval {defined($SOCK->send($buf))} or wlog("Send failed: $!\n"); } @@ -158,9 +179,19 @@ my $flg = unpack("V", substr($data, 4, 4)); my $len = unpack("V", substr($data, 8, 4)); my $msg; - $SOCK->recv($msg, $len); + my $frag; + my $alen = 0; + while ($alen < $len) { + $SOCK->recv($frag, $len - $alen); + $alen = $alen + length($frag); + $msg = $msg.$frag; + } - #wlog("< len=$len, tag=$tag, flags=$flg, $data\n"); + my $actuallen = length($msg); + wlog("< len=$len, actuallen=$actuallen, tag=$tag, flags=$flg, $msg\n"); + if ($len != $actuallen) { + wlog("Warning: len != actuallen"); + } return ($tag, $flg, $msg); } @@ -278,6 +309,8 @@ sub registerCmd { my ($tag, $cont) = @_; + wlog "Replies: ".hts(\%REPLIES)."\n"; + $REPLIES{$tag} = [$cont, time(), ()]; } @@ -404,11 +437,10 @@ my $tag = shift; my $executable = $JOB{"executable"}; if (!(defined $JOBID)) { - wlog "Job details\n"; - my $name; - foreach $name (keys %JOB) { - wlog "key: $name, value: $JOB{$name}\n"; - } + my $ds = hts(\%JOB); + + wlog "Job details $ds\n"; + sendError($tag, ("Missing job identity")); return 0; } @@ -425,13 +457,17 @@ sub forkjob { my ($pid, $status); $JOB_RUNNING = 1; + pipe(PARENT_R, CHILD_W); $pid = fork(); if (defined($pid)) { if ($pid == 0) { - runjob(); + close PARENT_R; + runjob(\*CHILD_W); + close CHILD_W; } else { wlog "Forked process $pid. Waiting for its completion\n"; + close CHILD_W; #waitpid($pid, 0); my $tid; do { @@ -447,7 +483,14 @@ } } while $tid != $pid; wlog "Child process $pid terminated. Status is $status. $!\n"; - queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$COMPLETED", "$status", ""); + my $s = <PARENT_R>; + close PARENT_R; + if ($s eq "") { + queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$COMPLETED", "$status", $s); + } + else { + queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "$status", $s); + } } } else { @@ -457,6 +500,7 @@ } sub runjob { + my ($WR) = @_; my $executable = $JOB{"executable"}; my $stdout = $JOB{"stdout"}; my $stderr = $JOB{"stderr"}; @@ -484,7 +528,8 @@ open STDERR, ">$stderr" or die "Cannot redirect STDERR"; } wlog "Command: @JOBARGS\n"; - exec { $executable } @JOBARGS or queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "513", "Could not execute $executable: $!"); + exec { $executable } @JOBARGS; + print $WR "Could not execute $executable: $!\n"; die "Could not execute $executable: $!"; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |