|
From: <ha...@us...> - 2008-09-09 02:36:03
|
Revision: 2153
http://cogkit.svn.sourceforge.net/cogkit/?rev=2153&view=rev
Author: hategan
Date: 2008-09-09 02:36:01 +0000 (Tue, 09 Sep 2008)
Log Message:
-----------
fixed redirection
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-09-08 23:35:13 UTC (rev 2152)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-09-09 02:36:01 UTC (rev 2153)
@@ -1,5 +1,6 @@
#!/usr/bin/perl
use IO::Socket;
+use Cwd;
use strict;
use warnings;
@@ -383,23 +384,28 @@
my $stdout = $JOB{"stdout"};
my $stderr = $JOB{"stderr"};
+ my $cwd = getcwd();
+ wlog "CWD: $cwd\n";
wlog "Running $executable\n";
+ wlog "Directory: $JOB{directory}\n";
my $ename;
foreach $ename (keys %JOBENV) {
$ENV{$ename} = $JOBENV{$ename};
}
unshift @JOBARGS, $executable;
+ if (defined $JOB{directory}) {
+ chdir $JOB{directory};
+ }
if (defined $stdout) {
+ wlog "STDOUT: $stdout\n";
close STDOUT;
- open STDOUT, $stdout;
+ open STDOUT, ">$stdout" or die "Cannot redirect STDOUT";
}
if (defined $stderr) {
+ wlog "STDERR: $stderr\n";
close STDERR;
- open STDERR, $stderr;
+ open STDERR, ">$stderr" or die "Cannot redirect STDERR";
}
- if (defined $JOB{directory}) {
- chdir $JOB{directory};
- }
wlog "Command: @JOBARGS\n";
exec { $executable } @JOBARGS or queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "513", "Could not execute $executable: $!");
die "Could not execute $executable: $!";
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-09-16 10:33:16
|
Revision: 2156
http://cogkit.svn.sourceforge.net/cogkit/?rev=2156&view=rev
Author: hategan
Date: 2008-09-16 17:33:14 +0000 (Tue, 16 Sep 2008)
Log Message:
-----------
reconnect; heartbeats; proper exit code processing; fixed idle detection
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-09-16 17:29:04 UTC (rev 2155)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-09-16 17:33:14 UTC (rev 2156)
@@ -1,6 +1,7 @@
#!/usr/bin/perl
use IO::Socket;
use Cwd;
+use POSIX ":sys_wait_h";
use strict;
use warnings;
@@ -17,22 +18,30 @@
my $RETRIES = 3;
my $REPLYTIMEOUT = 60;
my $MAXFRAGS = 16;
+my $MAX_RECONNECT_ATTEMPTS = 3;
-my $IDLETIMEOUT = 300; #Seconds
+my $IDLETIMEOUT = 10 * 60; #Seconds
my $LASTRECV = 0;
+my $JOB_RUNNING = 0;
my $BUFSZ = 2048;
+# 60 seconds by default. Note that since there is no configuration handshake
+# this would have to match the default interval in the service in order to avoid
+# "lost heartbeats".
+my $HEARTBEAT_INTERVAL = 5 * 60;
+
my %REQUESTS = ();
my %REPLIES = ();
-my $LOG = "$ENV{HOME}/worker$ARGV[1].log";
+my $LOG = "$ENV{HOME}/.globus/coasters/worker$ARGV[1].log";
my %HANDLERS = (
"SHUTDOWN" => \&shutdown,
"SUBMITJOB" => \&submitjob,
"REGISTER" => \®ister,
+ "HEARTBEAT" => \&heartbeat,
);
my @CMDQ = ();
@@ -51,6 +60,7 @@
my %JOB;
my %JOBENV;
my @JOBARGS;
+my $LAST_HEARTBEAT = 0;
sub wlog {
my $msg;
@@ -60,37 +70,51 @@
}
}
-sub init() {
+sub reconnect() {
my $fail = 0;
-
- open(LOG, ">$LOG") or die "Failed to open log file: $!";
- my $b = select(LOG);
- $| = 1;
- select($b);
- print LOG time(), " Logging started\n";
-
- wlog "uri=$URI, scheme=$SCHEME, host=$HOSTNAME, port=$PORT, id=$ID\n";
- for ($_ = 0; $_ < 10; $_++) {
- $SOCK = IO::Socket::INET->new(Proto=>'tcp', PeerAddr=>$HOSTNAME, PeerPort=>$PORT) || ($fail = 1);
+ my $i;
+ for ($i = 0; $i < $MAX_RECONNECT_ATTEMPTS; $i++) {
+ wlog "Connecting ($i)...\n";
+ $SOCK = IO::Socket::INET->new(Proto=>'tcp', PeerAddr=>$HOSTNAME, PeerPort=>$PORT, Blocking=>1) || ($fail = 1);
if (!$fail) {
$SOCK->setsockopt(SOL_SOCKET, SO_RCVBUF, 16384);
$SOCK->setsockopt(SOL_SOCKET, SO_SNDBUF, 32768);
+ wlog "Connected\n";
+ $SOCK->blocking(0);
+ queueCmd(\®isterCB, "REGISTER", $ID, "wid://$ID");
last;
- }
+ }
+ else {
+ wlog "Connection failed\n";
+ select(undef, undef, undef, 2 ** $i);
+ }
}
if ($fail) {
- die "Failed to create sockets: $!";
+ die "Failed to connect: $!";
}
}
+sub init() {
+ open(LOG, ">$LOG") or die "Failed to open log file: $!";
+ my $b = select(LOG);
+ $| = 1;
+ select($b);
+ print LOG time(), " Logging started\n";
+
+ wlog "uri=$URI, scheme=$SCHEME, host=$HOSTNAME, port=$PORT, id=$ID\n";
+ reconnect();
+}
+
sub sendm {
my ($tag, $flags, $msg) = @_;
my $len = length($msg);
my $buf = pack("VVV", $tag, $flags, $len);
$buf = $buf.$msg;
wlog("> len=$len, tag=$tag, flags=$flags, $msg\n");
- $SOCK->send($buf) == length($buf) or die "cannot send to $SOCK: $!";
+
+ #($SOCK->send($buf) == length($buf)) || reconnect();
+ eval {defined($SOCK->send($buf))} or wlog("Send failed: $!\n");
}
sub sendFrags {
@@ -106,7 +130,7 @@
my $cont = shift(@cmd);
my $ctag = $TAG++;
- registerCmd($ctag, $cont);
+ registerCmd($ctag, $cont);
sendFrags($ctag, 0, @cmd);
}
@@ -176,6 +200,7 @@
}
}
else {
+ $LASTRECV = time();
if (!exists($REQUESTS{$tag})) {
$REQUESTS{$tag} = [\&processRequest, time(), []];
wlog "New request ($tag)\n";
@@ -227,8 +252,8 @@
checkTimeouts2(\%REPLIES);
if ($LASTRECV != 0) {
my $dif = time() - $LASTRECV;
- if ($dif >= $IDLETIMEOUT) {
- die "Idle time exceeded";
+ if ($dif >= $IDLETIMEOUT && $JOB_RUNNING == 0) {
+ die "Idle time exceeded";
}
}
}
@@ -239,7 +264,6 @@
if (length($data) > 0) {
wlog "Received $data\n";
eval { process(unpackData($data)); } || wlog "$@\n";
- $LASTRECV = time();
}
else {
#sleep 250ms
@@ -256,16 +280,24 @@
sub mainloop {
- my $cmd;
while(1) {
- foreach $cmd (@CMDQ) {
- sendCmd(@$cmd);
- }
- @CMDQ = ();
- recvOne();
+ loopOne();
}
}
+sub loopOne {
+ my $cmd;
+ if (time() - $LAST_HEARTBEAT > $HEARTBEAT_INTERVAL) {
+ queueCmd(\&heartbeatCB, "HEARTBEAT");
+ $LAST_HEARTBEAT = time();
+ }
+ foreach $cmd (@CMDQ) {
+ sendCmd(@$cmd);
+ }
+ @CMDQ = ();
+ recvOne();
+}
+
sub queueCmd {
push @CMDQ, [@_];
}
@@ -295,6 +327,24 @@
}
}
+sub heartbeatCB {
+ my ($tag, $timeout, $err, $reply) = @_;
+
+ if ($timeout) {
+ if (time() - $LAST_HEARTBEAT > 2 * $HEARTBEAT_INTERVAL) {
+ wlog "No heartbeat replies in a while. Dying.\n";
+ die "No response to heartbeat\n";
+ }
+ }
+ elsif ($err) {
+ die "Heartbeat failed: $reply\n";
+ }
+ else {
+ wlog "Heartbeat acknowledged\n";
+ }
+}
+
+
sub register {
my ($tag, $timeout, $reply) = @_;
sendReply($tag, ("OK"));
@@ -309,6 +359,11 @@
exit 0;
}
+sub heartbeat {
+ my ($tag, $timeout, $msgs) = @_;
+ sendReply($tag, ("OK"));
+}
+
sub submitjob {
my ($tag, $timeout, $msgs) = @_;
my $desc = $$msgs[0];
@@ -361,6 +416,7 @@
sub forkjob {
my ($pid, $status);
+ $JOB_RUNNING = 1;
$pid = fork();
if (defined($pid)) {
if ($pid == 0) {
@@ -368,8 +424,20 @@
}
else {
wlog "Forked process $pid. Waiting for its completion\n";
- waitpid($pid, 0);
- $status = $? & 0xff;
+ #waitpid($pid, 0);
+ my $tid;
+ do {
+ $tid = waitpid($pid, &WNOHANG);
+ if ($tid != $pid) {
+ loopOne();
+ }
+ else {
+ # exit code is in MSB and signal in LSB, so
+ # switch them such that status & 0xff is the
+ # exit code
+ $status = $? >> 8 + (($? & 0xff) << 8);
+ }
+ } while $tid != $pid;
wlog "Child process $pid terminated. Status is $status. $!\n";
queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$COMPLETED", "$status", "");
}
@@ -377,6 +445,7 @@
else {
queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "512", "Could not fork child process");
}
+ $JOB_RUNNING = 0;
}
sub runjob {
@@ -421,22 +490,21 @@
$ID =$ARGV[$i];
my $MSG="0";
- my $myhost=`hostname -i`;
+ my $myhost=`hostname`;
$myhost =~ s/\s+$//;
init();
- print LOG time(), " Initialized coaster worker $i\n";
- queueCmd(\®isterCB, "REGISTER", $ID, "wid://$ID");
+ wlog("Initialized coaster worker $i\n");
+ wlog("Running on node $myhost\n");
mainloop();
exit(0);
- } else {
+ }
+ else {
$wp[$i]=$waitpid;
}
}
-for($i=1; $i<=$#ARGV ; $i++) {
+for ($i=1; $i<=$#ARGV ; $i++) {
waitpid($wp[$i],0);
}
-
-
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-09-23 04:42:44
|
Revision: 2186
http://cogkit.svn.sourceforge.net/cogkit/?rev=2186&view=rev
Author: hategan
Date: 2008-09-23 04:42:38 +0000 (Tue, 23 Sep 2008)
Log Message:
-----------
log node before trying to connect (D'oh\!);some other minor changes
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-09-23 04:39:59 UTC (rev 2185)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-09-23 04:42:38 UTC (rev 2186)
@@ -38,7 +38,7 @@
my %HANDLERS = (
- "SHUTDOWN" => \&shutdown,
+ "SHUTDOWN" => \&shutdownw,
"SUBMITJOB" => \&submitjob,
"REGISTER" => \®ister,
"HEARTBEAT" => \&heartbeat,
@@ -85,7 +85,7 @@
last;
}
else {
- wlog "Connection failed\n";
+ wlog "Connection failed: $!\n";
select(undef, undef, undef, 2 ** $i);
}
}
@@ -263,7 +263,7 @@
$SOCK->recv($data, 12);
if (length($data) > 0) {
wlog "Received $data\n";
- eval { process(unpackData($data)); } || wlog "$@\n";
+ eval { process(unpackData($data)); } || wlog "Failed to process data: $@\n";
}
else {
#sleep 250ms
@@ -351,11 +351,11 @@
}
-sub shutdown {
+sub shutdownw {
my ($tag, $timeout, $msgs) = @_;
-
+ wlog "Shutdown command received\n";
sendReply($tag, ("OK"));
- wlog "Shutdown command received. Exiting\n";
+ wlog "Acknowledged shutdown. Exiting\n";
exit 0;
}
@@ -401,11 +401,11 @@
my $tag = shift;
my $executable = $JOB{"executable"};
if (!(defined $JOBID)) {
- sendReply($tag, ("Missing job identity"));
+ sendError($tag, ("Missing job identity"));
return 0;
}
elsif (!(defined $executable)) {
- sendReply($tag, ("Missing executable"));
+ sendError($tag, ("Missing executable"));
return 0;
}
else {
@@ -492,10 +492,11 @@
my $myhost=`hostname`;
$myhost =~ s/\s+$//;
- init();
-
+
wlog("Initialized coaster worker $i\n");
wlog("Running on node $myhost\n");
+
+ init();
mainloop();
exit(0);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-10-04 16:28:43
|
Revision: 2218
http://cogkit.svn.sourceforge.net/cogkit/?rev=2218&view=rev
Author: hategan
Date: 2008-10-04 16:28:32 +0000 (Sat, 04 Oct 2008)
Log Message:
-----------
initialize log before logging stuff
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-10-03 21:23:22 UTC (rev 2217)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-10-04 16:28:32 UTC (rev 2218)
@@ -94,14 +94,16 @@
}
}
-
-sub init() {
+sub initlog() {
open(LOG, ">$LOG") or die "Failed to open log file: $!";
my $b = select(LOG);
$| = 1;
select($b);
print LOG time(), " Logging started\n";
+}
+
+sub init() {
wlog "uri=$URI, scheme=$SCHEME, host=$HOSTNAME, port=$PORT, id=$ID\n";
reconnect();
}
@@ -493,6 +495,8 @@
my $myhost=`hostname`;
$myhost =~ s/\s+$//;
+ initlog();
+
wlog("Initialized coaster worker $i\n");
wlog("Running on node $myhost\n");
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-10-10 23:38:52
|
Revision: 2232
http://cogkit.svn.sourceforge.net/cogkit/?rev=2232&view=rev
Author: hategan
Date: 2008-10-10 23:38:47 +0000 (Fri, 10 Oct 2008)
Log Message:
-----------
fixed logging race
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-10-10 23:22:31 UTC (rev 2231)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-10-10 23:38:47 UTC (rev 2232)
@@ -65,7 +65,7 @@
sub wlog {
my $msg;
foreach $msg (@_) {
- print LOG time(), " ", $msg;
+ print LOG time(), " $ID $msg";
#print $msg;
}
}
@@ -95,11 +95,11 @@
}
sub initlog() {
- open(LOG, ">$LOG") or die "Failed to open log file: $!";
+ open(LOG, ">>$LOG") or die "Failed to open log file: $!";
my $b = select(LOG);
$| = 1;
select($b);
- print LOG time(), " Logging started\n";
+ print LOG time(), " $ID Logging started\n";
}
@@ -113,7 +113,7 @@
my $len = length($msg);
my $buf = pack("VVV", $tag, $flags, $len);
$buf = $buf.$msg;
- wlog("> len=$len, tag=$tag, flags=$flags, $msg\n");
+ #wlog("> len=$len, tag=$tag, flags=$flags, $msg\n");
#($SOCK->send($buf) == length($buf)) || reconnect();
eval {defined($SOCK->send($buf))} or wlog("Send failed: $!\n");
@@ -144,7 +144,6 @@
sub sendError {
my ($tag, @msgs) = @_;
-
sendFrags($tag, $REPLY_FLAG | $ERROR_FLAG, @msgs);
}
@@ -161,7 +160,7 @@
my $msg;
$SOCK->recv($msg, $len);
- wlog("< len=$len, tag=$tag, flags=$flg, $data\n");
+ #wlog("< len=$len, tag=$tag, flags=$flg, $data\n");
return ($tag, $flg, $msg);
}
@@ -225,6 +224,8 @@
}
$cont->($tag, 0, $err, $frags);
}
+
+ return 1;
}
sub checkTimeouts2 {
@@ -265,7 +266,7 @@
$SOCK->recv($data, 12);
if (length($data) > 0) {
wlog "Received $data\n";
- eval { process(unpackData($data)); } || wlog "Failed to process data: $@\n";
+ eval { process(unpackData($data)); } || (die "Failed to process data: $@" && wlog "Failed to process data: $@\n");
}
else {
#sleep 250ms
@@ -403,6 +404,11 @@
my $tag = shift;
my $executable = $JOB{"executable"};
if (!(defined $JOBID)) {
+ wlog "Job details\n";
+ my $name;
+ foreach $name (keys %JOB) {
+ wlog "key: $name, value: $JOB{$name}\n";
+ }
sendError($tag, ("Missing job identity"));
return 0;
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|
|
From: <ha...@us...> - 2008-10-20 21:44:01
|
Revision: 2236
http://cogkit.svn.sourceforge.net/cogkit/?rev=2236&view=rev
Author: hategan
Date: 2008-10-20 21:43:53 +0000 (Mon, 20 Oct 2008)
Log Message:
-----------
fixed a few issues
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-10-13 20:37:21 UTC (rev 2235)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-10-20 21:43:53 UTC (rev 2236)
@@ -70,6 +70,26 @@
}
}
+sub hts {
+ my ($H) = @_;
+
+ my $k;
+ my $s = "{";
+ my $first = 1;
+
+ for $k (keys %$H) {
+ if (!$first) {
+ $s = $s.", ";
+ }
+ else {
+ $first = 0;
+ }
+ $s = $s."$k = $$H{$k}";
+ }
+
+ return $s."}";
+}
+
sub reconnect() {
my $fail = 0;
my $i;
@@ -113,8 +133,9 @@
my $len = length($msg);
my $buf = pack("VVV", $tag, $flags, $len);
$buf = $buf.$msg;
- #wlog("> len=$len, tag=$tag, flags=$flags, $msg\n");
+ wlog("> len=$len, tag=$tag, flags=$flags, $msg\n");
+
#($SOCK->send($buf) == length($buf)) || reconnect();
eval {defined($SOCK->send($buf))} or wlog("Send failed: $!\n");
}
@@ -158,9 +179,19 @@
my $flg = unpack("V", substr($data, 4, 4));
my $len = unpack("V", substr($data, 8, 4));
my $msg;
- $SOCK->recv($msg, $len);
+ my $frag;
+ my $alen = 0;
+ while ($alen < $len) {
+ $SOCK->recv($frag, $len - $alen);
+ $alen = $alen + length($frag);
+ $msg = $msg.$frag;
+ }
- #wlog("< len=$len, tag=$tag, flags=$flg, $data\n");
+ my $actuallen = length($msg);
+ wlog("< len=$len, actuallen=$actuallen, tag=$tag, flags=$flg, $msg\n");
+ if ($len != $actuallen) {
+ wlog("Warning: len != actuallen");
+ }
return ($tag, $flg, $msg);
}
@@ -278,6 +309,8 @@
sub registerCmd {
my ($tag, $cont) = @_;
+ wlog "Replies: ".hts(\%REPLIES)."\n";
+
$REPLIES{$tag} = [$cont, time(), ()];
}
@@ -404,11 +437,10 @@
my $tag = shift;
my $executable = $JOB{"executable"};
if (!(defined $JOBID)) {
- wlog "Job details\n";
- my $name;
- foreach $name (keys %JOB) {
- wlog "key: $name, value: $JOB{$name}\n";
- }
+ my $ds = hts(\%JOB);
+
+ wlog "Job details $ds\n";
+
sendError($tag, ("Missing job identity"));
return 0;
}
@@ -425,13 +457,17 @@
sub forkjob {
my ($pid, $status);
$JOB_RUNNING = 1;
+ pipe(PARENT_R, CHILD_W);
$pid = fork();
if (defined($pid)) {
if ($pid == 0) {
- runjob();
+ close PARENT_R;
+ runjob(\*CHILD_W);
+ close CHILD_W;
}
else {
wlog "Forked process $pid. Waiting for its completion\n";
+ close CHILD_W;
#waitpid($pid, 0);
my $tid;
do {
@@ -447,7 +483,14 @@
}
} while $tid != $pid;
wlog "Child process $pid terminated. Status is $status. $!\n";
- queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$COMPLETED", "$status", "");
+ my $s = <PARENT_R>;
+ close PARENT_R;
+ if ($s eq "") {
+ queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$COMPLETED", "$status", $s);
+ }
+ else {
+ queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "$status", $s);
+ }
}
}
else {
@@ -457,6 +500,7 @@
}
sub runjob {
+ my ($WR) = @_;
my $executable = $JOB{"executable"};
my $stdout = $JOB{"stdout"};
my $stderr = $JOB{"stderr"};
@@ -484,7 +528,8 @@
open STDERR, ">$stderr" or die "Cannot redirect STDERR";
}
wlog "Command: @JOBARGS\n";
- exec { $executable } @JOBARGS or queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "513", "Could not execute $executable: $!");
+ exec { $executable } @JOBARGS;
+ print $WR "Could not execute $executable: $!\n";
die "Could not execute $executable: $!";
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|