|
From: <ha...@us...> - 2008-09-16 10:33:16
|
Revision: 2156
http://cogkit.svn.sourceforge.net/cogkit/?rev=2156&view=rev
Author: hategan
Date: 2008-09-16 17:33:14 +0000 (Tue, 16 Sep 2008)
Log Message:
-----------
reconnect; heartbeats; proper exit code processing; fixed idle detection
Modified Paths:
--------------
trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
Modified: trunk/current/src/cog/modules/provider-coaster/resources/worker.pl
===================================================================
--- trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-09-16 17:29:04 UTC (rev 2155)
+++ trunk/current/src/cog/modules/provider-coaster/resources/worker.pl 2008-09-16 17:33:14 UTC (rev 2156)
@@ -1,6 +1,7 @@
#!/usr/bin/perl
use IO::Socket;
use Cwd;
+use POSIX ":sys_wait_h";
use strict;
use warnings;
@@ -17,22 +18,30 @@
my $RETRIES = 3;
my $REPLYTIMEOUT = 60;
my $MAXFRAGS = 16;
+my $MAX_RECONNECT_ATTEMPTS = 3;
-my $IDLETIMEOUT = 300; #Seconds
+my $IDLETIMEOUT = 10 * 60; #Seconds
my $LASTRECV = 0;
+my $JOB_RUNNING = 0;
my $BUFSZ = 2048;
+# 60 seconds by default. Note that since there is no configuration handshake
+# this would have to match the default interval in the service in order to avoid
+# "lost heartbeats".
+my $HEARTBEAT_INTERVAL = 5 * 60;
+
my %REQUESTS = ();
my %REPLIES = ();
-my $LOG = "$ENV{HOME}/worker$ARGV[1].log";
+my $LOG = "$ENV{HOME}/.globus/coasters/worker$ARGV[1].log";
my %HANDLERS = (
"SHUTDOWN" => \&shutdown,
"SUBMITJOB" => \&submitjob,
"REGISTER" => \®ister,
+ "HEARTBEAT" => \&heartbeat,
);
my @CMDQ = ();
@@ -51,6 +60,7 @@
my %JOB;
my %JOBENV;
my @JOBARGS;
+my $LAST_HEARTBEAT = 0;
sub wlog {
my $msg;
@@ -60,37 +70,51 @@
}
}
-sub init() {
+sub reconnect() {
my $fail = 0;
-
- open(LOG, ">$LOG") or die "Failed to open log file: $!";
- my $b = select(LOG);
- $| = 1;
- select($b);
- print LOG time(), " Logging started\n";
-
- wlog "uri=$URI, scheme=$SCHEME, host=$HOSTNAME, port=$PORT, id=$ID\n";
- for ($_ = 0; $_ < 10; $_++) {
- $SOCK = IO::Socket::INET->new(Proto=>'tcp', PeerAddr=>$HOSTNAME, PeerPort=>$PORT) || ($fail = 1);
+ my $i;
+ for ($i = 0; $i < $MAX_RECONNECT_ATTEMPTS; $i++) {
+ wlog "Connecting ($i)...\n";
+ $SOCK = IO::Socket::INET->new(Proto=>'tcp', PeerAddr=>$HOSTNAME, PeerPort=>$PORT, Blocking=>1) || ($fail = 1);
if (!$fail) {
$SOCK->setsockopt(SOL_SOCKET, SO_RCVBUF, 16384);
$SOCK->setsockopt(SOL_SOCKET, SO_SNDBUF, 32768);
+ wlog "Connected\n";
+ $SOCK->blocking(0);
+ queueCmd(\®isterCB, "REGISTER", $ID, "wid://$ID");
last;
- }
+ }
+ else {
+ wlog "Connection failed\n";
+ select(undef, undef, undef, 2 ** $i);
+ }
}
if ($fail) {
- die "Failed to create sockets: $!";
+ die "Failed to connect: $!";
}
}
+sub init() {
+ open(LOG, ">$LOG") or die "Failed to open log file: $!";
+ my $b = select(LOG);
+ $| = 1;
+ select($b);
+ print LOG time(), " Logging started\n";
+
+ wlog "uri=$URI, scheme=$SCHEME, host=$HOSTNAME, port=$PORT, id=$ID\n";
+ reconnect();
+}
+
sub sendm {
my ($tag, $flags, $msg) = @_;
my $len = length($msg);
my $buf = pack("VVV", $tag, $flags, $len);
$buf = $buf.$msg;
wlog("> len=$len, tag=$tag, flags=$flags, $msg\n");
- $SOCK->send($buf) == length($buf) or die "cannot send to $SOCK: $!";
+
+ #($SOCK->send($buf) == length($buf)) || reconnect();
+ eval {defined($SOCK->send($buf))} or wlog("Send failed: $!\n");
}
sub sendFrags {
@@ -106,7 +130,7 @@
my $cont = shift(@cmd);
my $ctag = $TAG++;
- registerCmd($ctag, $cont);
+ registerCmd($ctag, $cont);
sendFrags($ctag, 0, @cmd);
}
@@ -176,6 +200,7 @@
}
}
else {
+ $LASTRECV = time();
if (!exists($REQUESTS{$tag})) {
$REQUESTS{$tag} = [\&processRequest, time(), []];
wlog "New request ($tag)\n";
@@ -227,8 +252,8 @@
checkTimeouts2(\%REPLIES);
if ($LASTRECV != 0) {
my $dif = time() - $LASTRECV;
- if ($dif >= $IDLETIMEOUT) {
- die "Idle time exceeded";
+ if ($dif >= $IDLETIMEOUT && $JOB_RUNNING == 0) {
+ die "Idle time exceeded";
}
}
}
@@ -239,7 +264,6 @@
if (length($data) > 0) {
wlog "Received $data\n";
eval { process(unpackData($data)); } || wlog "$@\n";
- $LASTRECV = time();
}
else {
#sleep 250ms
@@ -256,16 +280,24 @@
sub mainloop {
- my $cmd;
while(1) {
- foreach $cmd (@CMDQ) {
- sendCmd(@$cmd);
- }
- @CMDQ = ();
- recvOne();
+ loopOne();
}
}
+sub loopOne {
+ my $cmd;
+ if (time() - $LAST_HEARTBEAT > $HEARTBEAT_INTERVAL) {
+ queueCmd(\&heartbeatCB, "HEARTBEAT");
+ $LAST_HEARTBEAT = time();
+ }
+ foreach $cmd (@CMDQ) {
+ sendCmd(@$cmd);
+ }
+ @CMDQ = ();
+ recvOne();
+}
+
sub queueCmd {
push @CMDQ, [@_];
}
@@ -295,6 +327,24 @@
}
}
+sub heartbeatCB {
+ my ($tag, $timeout, $err, $reply) = @_;
+
+ if ($timeout) {
+ if (time() - $LAST_HEARTBEAT > 2 * $HEARTBEAT_INTERVAL) {
+ wlog "No heartbeat replies in a while. Dying.\n";
+ die "No response to heartbeat\n";
+ }
+ }
+ elsif ($err) {
+ die "Heartbeat failed: $reply\n";
+ }
+ else {
+ wlog "Heartbeat acknowledged\n";
+ }
+}
+
+
sub register {
my ($tag, $timeout, $reply) = @_;
sendReply($tag, ("OK"));
@@ -309,6 +359,11 @@
exit 0;
}
+sub heartbeat {
+ my ($tag, $timeout, $msgs) = @_;
+ sendReply($tag, ("OK"));
+}
+
sub submitjob {
my ($tag, $timeout, $msgs) = @_;
my $desc = $$msgs[0];
@@ -361,6 +416,7 @@
sub forkjob {
my ($pid, $status);
+ $JOB_RUNNING = 1;
$pid = fork();
if (defined($pid)) {
if ($pid == 0) {
@@ -368,8 +424,20 @@
}
else {
wlog "Forked process $pid. Waiting for its completion\n";
- waitpid($pid, 0);
- $status = $? & 0xff;
+ #waitpid($pid, 0);
+ my $tid;
+ do {
+ $tid = waitpid($pid, &WNOHANG);
+ if ($tid != $pid) {
+ loopOne();
+ }
+ else {
+ # exit code is in MSB and signal in LSB, so
+ # switch them such that status & 0xff is the
+ # exit code
+ $status = $? >> 8 + (($? & 0xff) << 8);
+ }
+ } while $tid != $pid;
wlog "Child process $pid terminated. Status is $status. $!\n";
queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$COMPLETED", "$status", "");
}
@@ -377,6 +445,7 @@
else {
queueCmd(\&nullCB, "JOBSTATUS", $JOBID, "$FAILED", "512", "Could not fork child process");
}
+ $JOB_RUNNING = 0;
}
sub runjob {
@@ -421,22 +490,21 @@
$ID =$ARGV[$i];
my $MSG="0";
- my $myhost=`hostname -i`;
+ my $myhost=`hostname`;
$myhost =~ s/\s+$//;
init();
- print LOG time(), " Initialized coaster worker $i\n";
- queueCmd(\®isterCB, "REGISTER", $ID, "wid://$ID");
+ wlog("Initialized coaster worker $i\n");
+ wlog("Running on node $myhost\n");
mainloop();
exit(0);
- } else {
+ }
+ else {
$wp[$i]=$waitpid;
}
}
-for($i=1; $i<=$#ARGV ; $i++) {
+for ($i=1; $i<=$#ARGV ; $i++) {
waitpid($wp[$i],0);
}
-
-
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|