From: <sla...@us...> - 2011-08-10 21:16:04
|
Revision: 5480 http://sashimi.svn.sourceforge.net/sashimi/?rev=5480&view=rev Author: slagelwa Date: 2011-08-10 21:15:57 +0000 (Wed, 10 Aug 2011) Log Message: ----------- Significant changes across the board that haven't been checked in. * Include EC2 name in working directory on instance to distinguish it * Fixed omssa path/files issues * Changed how polling of SQS message queues are tallied * Forking of upload/downloads * Fixed paths issue in pep.xml files * Cache directory changes Modified Paths: -------------- trunk/trans_proteomic_pipeline/extern/hpctools/aws/bin/amztpp trunk/trans_proteomic_pipeline/extern/hpctools/aws/bin/amztppserver trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/Client.pm trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/EC2Manager.pm trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/OMSSAService.pm trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/Report.pm trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/SQSManager.pm trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/SearchService.pm trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS.pm Modified: trunk/trans_proteomic_pipeline/extern/hpctools/aws/bin/amztpp =================================================================== --- trunk/trans_proteomic_pipeline/extern/hpctools/aws/bin/amztpp 2011-08-08 19:35:03 UTC (rev 5479) +++ trunk/trans_proteomic_pipeline/extern/hpctools/aws/bin/amztpp 2011-08-10 21:15:57 UTC (rev 5480) @@ -126,6 +126,7 @@ #$client->start(); # Make sure bg process is running foreach ( @ARGV ) { + $log->logdie( "'$_': file not found" ) if ( ! -f $_ ); $sqsm->queueUpload( &$factory( $_ ) ); } Modified: trunk/trans_proteomic_pipeline/extern/hpctools/aws/bin/amztppserver =================================================================== --- trunk/trans_proteomic_pipeline/extern/hpctools/aws/bin/amztppserver 2011-08-08 19:35:03 UTC (rev 5479) +++ trunk/trans_proteomic_pipeline/extern/hpctools/aws/bin/amztppserver 2011-08-10 21:15:57 UTC (rev 5480) @@ -36,6 +36,7 @@ use IO::File; use Pod::Usage; use POSIX; +use Sys::Hostname; use TPP::AWS; use TPP::AWS::Credentials qw( credentials ); @@ -54,9 +55,9 @@ my $signal = 0; # Received signal my $shutdown = sub {}; # On END do something -use constant CACHE_DIR => "/tmp/amztppserver$$"; # tmp directory to work in + #-- @MAIN -------------------------------------------------------------------# $| = 1; # Flush STDOUT automatically @@ -96,7 +97,7 @@ $sqsm->createQueues(); # Set working directory to cache sandbox - $opts{cache} ||= "/tmp/tppaws.$$/"; + $opts{cache} ||= ("/tmp/amztppserver-$$-" . time()); if ( !-d $opts{cache} ) { mkdir $opts{cache} or $log->logdie( "Error: can't create cache dir: $!\n" ); @@ -124,8 +125,8 @@ } # Loop processing service requests... - my $poll = TPP::AWS::Poll->new( AFTER => (60 * 15) ); - while ( !$signal && ($sqsm->hasActiveServices() || !$poll->quit()) ) + my $poll = TPP::AWS::Poll->new( AFTER => (60 * 30) ); + while ( !$signal && !$poll->quit() ) { $log->debug( "checking for work to do" ); if ( my $srv = $sqsm->dequeueService() ) @@ -274,4 +275,4 @@ =back -=cut \ No newline at end of file +=cut Modified: trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/Client.pm =================================================================== --- trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/Client.pm 2011-08-08 19:35:03 UTC (rev 5479) +++ trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/Client.pm 2011-08-10 21:15:57 UTC (rev 5480) @@ -106,7 +106,6 @@ POSIX::setsid() or die "Cannot start a new session: $!\n"; } $SIG{INT} = $SIG{TERM} = $SIG{HUP} = \&signaled; -# $SIG{CHLD} = 'IGNORE'; $self->pidfile()->touch(); $0 = $name; # Give us a sensible name @@ -141,9 +140,10 @@ $::sqsm->createQueues(); # Loop over messages - my $poll = TPP::AWS::Poll->new( AFTER => (60 * 5), MAX => 60 ); + my $poll = TPP::AWS::Poll->new( AFTER => (60 * 10), MAX => 60 ); my $pmgr = Parallel::ForkManager->new( $self->maxprocs() ); my $report = TPP::AWS::Report->new(); + my $active = 0; while ( !$signal && !$poll->quit() ) { # Don't quit if there are active services (kludge resets quit timeout) @@ -154,6 +154,7 @@ if ( $upsrv = $::sqsm->dequeueUpload() ) { $poll->reset(); + $active++; if ( $pmgr->start() == 0 ) # run in forked process { local $SIG{INT} = $SIG{TERM} = $SIG{HUP} = 'DEFAULT'; @@ -161,14 +162,25 @@ $upsrv->uploadInput( $::s3m ); $::sqsm->queueService( $upsrv ); $::sqsm->deleteMessage( $::sqsm->upQueue(), $upsrv ); - ::ec2Launch( 1 ) if ( $::ec2m->running() < $::opts{max} ); $pmgr->finish; } + + # Launch another ec2 instance? + my $cnt = $::ec2m->running(); + if ( $cnt < $::opts{max} && $cnt < $active ) + { + ::ec2Launch( 1 ); + } + else + { + $log->debug( "skipping launch: $cnt/$active (max $::opts{max}) running " ); + } } if ( $downsrv = $::sqsm->dequeueDownload() ) { $poll->reset(); + $active-- if ( $active > 0 ); $log->debug( "getting ready for fork"); if ( $downsrv->{REPORT} ) # resent msg for report? { @@ -200,7 +212,7 @@ if ( !$upsrv && !$downsrv && !$signal ) { - $pmgr->wait_children(); # repeat child processes + $pmgr->wait_children(); # reap child processes $log->debug( "sleeping " . $poll->interval() ); $poll->sleep(); } @@ -220,4 +232,4 @@ $signal = 1; } -1; \ No newline at end of file +1; Modified: trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/EC2Manager.pm =================================================================== --- trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/EC2Manager.pm 2011-08-08 19:35:03 UTC (rev 5479) +++ trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/EC2Manager.pm 2011-08-10 21:15:57 UTC (rev 5480) @@ -130,7 +130,7 @@ EC2DNS=\${EC2DNS%%.*} source /etc/profile.d/tpp.sh -/opt/tpp/bin/amztppserver -v -l /var/log/amztppserver.\${EC2DNS}.log -c /mnt/amztppserver +/opt/tpp/bin/amztppserver -v -l /var/log/amztppserver-\${EC2DNS}.log -c /mnt/amztppserver-\${EC2DNS} BOOTSTRAP $image ||= ($self->getImages())[0]; Modified: trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/OMSSAService.pm =================================================================== --- trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/OMSSAService.pm 2011-08-08 19:35:03 UTC (rev 5479) +++ trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/OMSSAService.pm 2011-08-10 21:15:57 UTC (rev 5480) @@ -60,10 +60,33 @@ my $this = $class->SUPER::new( @_); # Get database files from parameters file - my $flags = $this->_readFlags( $this->{PARAMS} ); - ( $flags =~ /-d\s+([^\s]+)/ ) - or $log->logdie( "no database found in omssa parameters file"); - $this->{DBS} = [ "$1", "$1.phr", "$1.pin", "$1.psd", "$1.psi", "$1.psq" ]; + my @flags = $this->_readFlags( $this->{PARAMS} ); + $this->{DBS} = []; + + # Add blast database file(s) + foreach ( @flags ) + { + if ( /-d\s+(.*)\s*/ ) + { + $log->debug( "adding database $1" ); + push @{ $this->{DBS} }, $1; + push @{ $this->{DBS} }, "$1.phr"; + push @{ $this->{DBS} }, "$1.pin"; + push @{ $this->{DBS} }, "$1.psd"; + push @{ $this->{DBS} }, "$1.psi"; + push @{ $this->{DBS} }, "$1.psq"; + } + } + ( @{$this->{DBS}} > 1 ) + or $log->logdie( "missing database files in omssa parameters file"); + + # Add modification files (if any) + foreach ( @flags ) + { + push @{ $this->{DBS} }, $1 if ( /-mx\s+(.*)\s*/ ); + push @{ $this->{DBS} }, $1 if ( /-mux\s+(.*)\s*/ ); + } + return $this; } @@ -104,10 +127,12 @@ while ( <PARAMS> ) { chomp; s/#.*$//; s/\r$//; # remove comments and endline - push @flags, $_ if ( !/^$/ ); # skip empty lines + next if ( /^$/ ); # skip empty lines + push @flags, $_; } close( PARAMS ); - return join( ' ', @flags ); + + return @flags; } # @@ -117,9 +142,20 @@ { my ( $this, $input, $params ) = @_; - my $flags = $this->_readFlags( $params ); + # Get flags and localize file paths + my @flags = $this->_readFlags( $params ); + + # Localize file paths + foreach ( @flags ) + { + $_ = "$1" . basename($2) if ( /^(\s*-d\s+)(.*)\s*/ ); + $_ = "$1" . basename($2) if ( /^(\s*-mux\s+)(.*)\s*/ ); + $_ = "$1" . basename($2) if ( /^(\s*-mx\s+)(.*)\s*/ ); + } + my @output = (); my $cmd; + my $flags = join( ' ', @flags ); my ( $root, $dir, $suffix ) = fileparse( $input, qr/\.[^.]*$/ ); Modified: trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/Report.pm =================================================================== --- trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/Report.pm 2011-08-08 19:35:03 UTC (rev 5479) +++ trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/Report.pm 2011-08-10 21:15:57 UTC (rev 5480) @@ -34,6 +34,7 @@ use TPP::AWS::S3Manager; use TPP::AWS::Service; +use TPP::AWS::Logger qw( $log ); #-- @GLOBALS ----------------------------------------------------------------# @@ -81,6 +82,18 @@ } $errorCount{$id}++ if ( $srv->{ERROR} ); + if ( $log->is_debug() ) + { + my $out = sprintf "%-4d %-25.25s %18s %18s %18s %18s %4d %s\n", + $srv->{NUM}, $srv->{SERVER}, + strftime( "%F %T", localtime($srv->{CLIENT_SUBMIT}) ), + strftime( "%F %T", localtime($srv->{SERVER_RECEIVE}) ), + strftime( "%F %T", localtime($srv->{SERVER_SUBMIT}) ), + strftime( "%F %T", localtime($srv->{CLIENT_RECEIVE}) ), + $srv->{SERVER_WTIME}, $srv->{INPUT}; + $log->debug( "reporting $out" ); + } + push @{$services{$id}}, $srv; } @@ -164,16 +177,16 @@ printf $out " Avg (MB/s): %.2f\n", avgDownloadSpeed( $s ); print "\n"; - print $out "Num Host Req Submit Req Receive Res Submit Res Receive Wall time\n"; + print $out "Num Host Req Submit Req Receive Res Submit Res Receive Wall time Input File\n"; foreach my $s ( @{ $services{id $self} } ) { - printf $out "%-3d %-25.25s %18s %18s %18s %18s %d\n", + printf $out "%-4d %-25.25s %18s %18s %18s %18s %4d %s\n", $s->{NUM}, $s->{SERVER}, strftime( "%F %T", localtime($s->{CLIENT_SUBMIT}) ), strftime( "%F %T", localtime($s->{SERVER_RECEIVE}) ), strftime( "%F %T", localtime($s->{SERVER_SUBMIT}) ), strftime( "%F %T", localtime($s->{CLIENT_RECEIVE}) ), - $s->{SERVER_WTIME}; + $s->{SERVER_WTIME}, $s->{INPUT}; } printf $out "\n"; Modified: trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/SQSManager.pm =================================================================== --- trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/SQSManager.pm 2011-08-08 19:35:03 UTC (rev 5479) +++ trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/SQSManager.pm 2011-08-10 21:15:57 UTC (rev 5480) @@ -48,13 +48,16 @@ #-- @ATTRIBUTES -------------------------------------------------------------# -readonly sqs => my %sqs; # Reference to Amazon::SQS -readonly upQueue => my %upQueue; # AWS queue for upload messages -readonly srvQueue => my %srvQueue; # AWS queue for service messages -readonly downQueue => my %downQueue; # AWS queue for download messages -readonly rcvdIds => my %rcvdIds; # Hash of message ids received -private active => my %active; # Active count -private prefix => my %prefix; # Prefix for name of queues +readonly sqs => my %sqs; # Reference to Amazon::SQS +readonly upQueue => my %upQueue; # AWS queue for upload messages +readonly upCounts => my %upCounts; # counts of visible/invisible msgs +readonly srvQueue => my %srvQueue; # AWS queue for service messages +readonly srvCounts => my %srvCounts; # counts of visible/invisible msgs +readonly downQueue => my %downQueue; # AWS queue for download messages +readonly downCounts => my %downCounts; # counts of visible/invisible msgs +readonly rcvdIds => my %rcvdIds; # Hash of message ids received +private active => my %active; # Active count +private prefix => my %prefix; # Prefix for name of queues #-- @PUBLIC -----------------------------------------------------------------# @@ -81,6 +84,10 @@ $downQueue{$id} = $q if ( $q->Endpoint() =~ /-download$/ ); } + $upCounts{$id} = [ undef, undef ], + $srvCounts{$id} = [ undef, undef ], + $downCounts{$id} = [ undef, undef ], + $log->debug( "SQSManager initialized" ); return $self; } @@ -245,28 +252,42 @@ my ($self) = shift; my $id = id $self; - my $up = $self->upQueue()->GetAttributes(); - my $ucnt = $up->{ApproximateNumberOfMessages} + $up->{ApproximateNumberOfMessagesNotVisible}; - _logmsgcnt( $up, "upload" ) if ( $log->is_debug ); - my $srv = $self->srvQueue()->GetAttributes(); - my $scnt = $srv->{ApproximateNumberOfMessages} + $srv->{ApproximateNumberOfMessagesNotVisible}; - _logmsgcnt( $srv, "service" ) if ( $log->is_debug ); - my $down = $self->downQueue()->GetAttributes(); - my $dcnt = $down->{ApproximateNumberOfMessages} + $down->{ApproximateNumberOfMessagesNotVisible}; - _logmsgcnt( $down, "download" ) if ( $log->is_debug ); + my $cnt = 0; + eval + { + my $up = $self->upQueue()->GetAttributes(); + $upCounts{$id} = [ $up->{ApproximateNumberOfMessages}, + $up->{ApproximateNumberOfMessagesNotVisible} ]; + map { $cnt += $_ } @{ $upCounts{$id} }; + + my $srv = $self->srvQueue()->GetAttributes(); + $srvCounts{$id} = [ $srv->{ApproximateNumberOfMessages}, + $srv->{ApproximateNumberOfMessagesNotVisible} ]; + map { $cnt += $_ } @{ $srvCounts{$id} }; + + my $down = $self->downQueue()->GetAttributes(); + $downCounts{$id} = [ $down->{ApproximateNumberOfMessages}, + $down->{ApproximateNumberOfMessagesNotVisible} ]; + map { $cnt += $_ } @{ $downCounts{$id} }; + + if ( $log->is_debug ) + { + $log->debug( sprintf("approximately %d/%-d messages in upload queue", + @{ $upCounts{$id} } ) ); + $log->debug( sprintf("approximately %d/%-d messages in srv queue", + @{ $srvCounts{$id} } ) ); + $log->debug( sprintf("approximately %d/%-d messages in download queue", + @{ $downCounts{$id} } ) ); + } + }; + if ( $@ ) + { + $log->warn( "Error reading queue: $@" ); + } - return( $ucnt + $scnt + $dcnt ); + return( $cnt ); } -sub _logmsgcnt - { - my ( $m, $q ) = @_; - $log->debug( sprintf("approximately %d/%-d messages in $q queue", - $m->{ApproximateNumberOfMessages}, - $m->{ApproximateNumberOfMessagesNotVisible} ) - ); - } - # # Delete a message from a queue # Modified: trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/SearchService.pm =================================================================== --- trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/SearchService.pm 2011-08-08 19:35:03 UTC (rev 5479) +++ trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS/SearchService.pm 2011-08-10 21:15:57 UTC (rev 5480) @@ -29,7 +29,7 @@ use strict; use warnings; -use Cwd qw( abs_path ); +use Cwd qw( abs_path cwd ); use File::Basename; use File::pushd; use File::Spec::Functions qw( rel2abs ); @@ -153,6 +153,17 @@ my $bytes = $s3m->uploadBytes(); my $secs = $s3m->uploadSecs(); + # For the time being manually fix the paths in the pep.xml file on the + # server as updateAllPaths.pl is broke + my $pep = "$this->{ROOT}.pep.xml"; + if ( -f $pep ) + { + my $cwd = cwd(); + my $db = basename($this->{DBS}->[0]); + `perl -pi -e 's#$cwd/$this->{ROOT}#$this->{DIR}$pep#g' "$pep"`; + `perl -pi -e 's#$cwd/$db#$this->{DBS}->[0]#g' "$pep"`; + } + foreach my $outFile ( @output ) { next unless ( -f $outFile ); @@ -179,8 +190,11 @@ my $dir = pushd( $this->{DIR} ); $s3m->get($_) foreach ( @{$this->{OUTPUT_KEYS}} ); - $this->_updatePaths( "$this->{ROOT}.pep.xml", $this->{DBS}->[0] ); +# JS: Below is not implemented if/until updateAllPaths is fixed correctly +# instead fix it on the server before the upload +# $this->_updatePaths( "$this->{ROOT}.pep.xml", $this->{DBS}->[0] ); + $this->{CLIENT_DOWNCOUNT} += ($s3m->downloadCount() - $cnt); $this->{CLIENT_DOWNBYTES} += ($s3m->downloadBytes() - $bytes); $this->{CLIENT_DOWNSECS} += ($s3m->downloadSecs() - $secs); @@ -201,9 +215,13 @@ # Check input/paths $database = abs_path( $database ); - $log->error( "missing $filename " ) if ( !-f $filename ); - $log->error( "missing $database file" ) if ( $database && !-f $database ); - return 0 if ( !-f $filename || ($database && !-f $database) ); + $log->warn( "missing $filename " ) if ( !-f $filename ); + $log->warn( "missing $database file" ) if ( $database && !-f $database ); + if ( !-f $filename || ($database && !-f $database) ) + { + $log->error( "unable to update paths in $filename" ); + return 0 + } # Build command my $UPDATEPATHS = which('updateAllPaths.pl') @@ -213,6 +231,7 @@ # Believe it or not we need to run it twice due to a bug in updateAllPaths # skipping the update of the database if the paths are similar/same + $log->debug( $cmd ); system( $cmd ) == 0 or $log->logdie( "error invoking updateAllPaths.pl, $!" ); system( $cmd ) == 0 Modified: trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS.pm =================================================================== --- trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS.pm 2011-08-08 19:35:03 UTC (rev 5479) +++ trunk/trans_proteomic_pipeline/extern/hpctools/aws/lib/TPP/AWS.pm 2011-08-10 21:15:57 UTC (rev 5480) @@ -32,7 +32,7 @@ #-- @GLOBALS -----------------------------------------------------------------# -our $VERSION = '2.0A'; +our $VERSION = '2.0'; our $REVISION = (q$Revision: $ =~ /(\d+)/g)[0] || '???'; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |