[a38357]: Netdisco / lib / App / Netdisco / Daemon / Worker / Manager.pm Maximize Restore History

Download this file

Manager.pm    105 lines (80 with data), 2.8 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
package App::Netdisco::Daemon::Worker::Manager;
use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';
use Net::Domain 'hostfqdn';
use Try::Tiny;
use Role::Tiny;
use namespace::clean;
my $fqdn = hostfqdn || 'localhost';
my $role_map = {
(map {$_ => 'Poller'}
qw/discoverall discover arpwalk arpnip nodenames macwalk macsuck/),
(map {$_ => 'Interactive'}
qw/location contact portcontrol portname vlan power/)
};
sub worker_begin {
my $self = shift;
my $wid = $self->wid;
debug "entering Manager ($wid) worker_begin()";
# requeue jobs locally
debug "mgr ($wid): searching for jobs booked to this processing node";
my $rs = schema('netdisco')->resultset('Admin')
->search({status => "queued-$fqdn"});
my @jobs = map {{$_->get_columns}} $rs->all;
if (scalar @jobs) {
info sprintf "mgr (%s): found %s jobs booked to this processing node", $wid, scalar @jobs;
map { $_->{role} = $role_map->{$_->{action}} } @jobs;
$self->do('add_jobs', \@jobs);
}
}
sub worker_body {
my $self = shift;
my $wid = $self->wid;
my $num_slots = $self->do('num_workers')
or return debug "mgr ($wid): this node has no workers... quitting manager";
# get some pending jobs
my $rs = schema('netdisco')->resultset('Admin')
->search(
{status => 'queued'},
{order_by => 'random()', rows => $num_slots},
);
while (1) {
debug "mgr ($wid): getting potential jobs for $num_slots workers";
while (my $job = $rs->next) {
my $jid = $job->job;
# check for available local capacity
next unless $self->do('capacity_for', $job->action);
debug sprintf "mgr (%s): processing node has capacity for job %s (%s)",
$wid, $jid, $job->action;
# mark job as running
next unless $self->lock_job($job);
info sprintf "mgr (%s): job %s booked out for this processing node",
$wid, $jid;
my $local_job = { $job->get_columns };
$local_job->{role} = $role_map->{$job->action};
# copy job to local queue
$self->do('add_jobs', [$local_job]);
}
# reset iterator so ->next() triggers another DB query
$rs->reset;
# TODO also check for stale jobs in Netdisco DB
debug "mgr ($wid): sleeping now...";
sleep( setting('workers')->{sleep_time} || 2 );
}
}
sub lock_job {
my ($self, $job) = @_;
my $happy = 0;
# lock db row and update to show job has been picked
try {
schema('netdisco')->txn_do(sub {
schema('netdisco')->resultset('Admin')->find(
{job => $job->job, status => 'queued'},
{for => 'update'}
)->update({ status => "queued-$fqdn" });
});
$happy = 1;
};
return $happy;
}
1;