Front page | perl.ithreads |
Postings from May 2010
Re: Help with concurrent threads
Thread Previous
|
Thread Next
From:
alfonso caponi
Date:
May 6, 2010 03:02
Subject:
Re: Help with concurrent threads
Message ID:
t2l1ab6b3a11005060302if236fd21pe4b808abbb9af542@mail.gmail.com
#!/usr/bin/perl
$|=1;
use strict;
use warnings;
use Net::DNS;
use threads;
use threads::shared;
use Thread::Queue;
use Uniq;
my $ip_list = $ARGV[0];
die "$0 <ip list file>\n" unless $ARGV[0];
########################
### Global Variables ###
# FQDN list
my @fqdn = ("www.nba.com","www.google.com","www.kernel.org","www.linux.org");
# Maximum working threads
my $MAX_THREADS = 10;
# Maximum thread working time
my $TIMEOUT = 10;
# Flag to inform all threads that application is terminating
my $TERM :shared = 0;
# Prevents double detach attempts
my $DETACHING :shared;
#######################
### Signal Handling ###
# Gracefully terminate application on ^C or command line 'kill'
$SIG{'INT'} = $SIG{'TERM'} = sub {
print(">>> Terminating <<<\n");
$TERM = 1;
};
# This signal handler is called inside threads
# that get cancelled by the timer thread
$SIG{'KILL'} = sub {
# Tell user we've been terminated
printf("%3d <- Killed\n", threads->tid());
# Detach and terminate
lock($DETACHING);
threads->detach() if ! threads->is_detached();
threads->exit();
};
#####
my @hosts;
open(IPLIST, "<$ip_list") || die "Error : cannot open the ip address list file: $!\n";
chomp (@hosts = <IPLIST>);
close (IPLIST);
@hosts = uniq sort @hosts;
### fill the queue
our $q :shared;
$q = new Thread::Queue;
foreach my $host (@hosts) {
foreach my $fqdn (@fqdn) {
$q->enqueue("$host;$fqdn");
}
}
###############################
### Main Processing Section ###
MAIN: {
# Start timer thread
print "[+] Start timer thread\n";
my $queue = Thread::Queue->new();
threads->create('Timer',$queue)->detach();
# Manage the thread pool until signalled to terminate
while (! $TERM) {
# Keep max threads running
for (my $needed = $MAX_THREADS - threads->list(); $needed && ! $TERM; $needed--) {
my $element = $q->dequeue_nb;
# New thread
threads->create('RequestDNS',$queue,$element,$TIMEOUT);
}
# Wait for any threads to finish
sleep(1);
}
# Wait for max timeout for threads to finish
while ((threads->list() > 0) && $TIMEOUT--) {
sleep(1);
}
# Detach and kill any remaining threads
foreach my $thr (threads->list()) {
lock($DETACHING);
$thr->detach() if ! $thr->is_detached();
$thr->kill('KILL');
}
}
exit(0);
sub RequestDNS {
my ($queue,$element,$timeout) = @_;
my ($dns_server,$fqdn) = split(/;/,$element);
# My thread ID
my $tid = threads->tid();
printf("$dns_server -> %3d\n", $tid);
# Register with timer thread
$queue->enqueue($tid,$timeout);
# Do some work while monitoring $TERM
#while (! $TERM) {
print "try $dns_server;$fqdn\n";
my $res = Net::DNS::Resolver->new;
$res->nameservers($dns_server);
my $query = $res->query($fqdn,"A");
if ($query) {
foreach my $rr (grep { $_->type eq 'A' } $query->answer) {
my $reply = $rr->address;
print "$dns_server;$fqdn;$reply\n";
}
}# else {
# warn "query failed: $dns_server ($fqdn)", $res->errorstring, "\n";
#}
#}
# Remove signal handler
$SIG{'KILL'} = sub {};
# Unregister with timer thread
$queue->enqueue($tid, undef);
# Tell user we're done
printf("%3d <- Finished\n", $tid);
# Detach and terminate
lock($DETACHING);
threads->detach() if ! threads->is_detached();
threads->exit();
}
# The timer thread that monitors other threads for timeout
sub Timer {
my $queue = shift; # The registration queue
my %timers; # Contains threads and timeouts
# Loop until told to quit
while (! $TERM) {
# Check queue
while (my $tid = $queue->dequeue_nb()) {
if (! ($timers{$tid}{'timeout'} = $queue->dequeue()) || ! ($timers{$tid}{'thread'} = threads->object($tid))) {
# No timeout - unregister thread
delete($timers{$tid});
}
}
# Cancel timed out threads
foreach my $tid (keys(%timers)) {
if (--$timers{$tid}{'timeout'} < 0) {
$timers{$tid}{'thread'}->kill('KILL');
delete($timers{$tid});
}
}
# Tick tock
sleep(1);
}
}
Thread Previous
|
Thread Next