develooper Front page | perl.ithreads | Postings from May 2010

Re: Help with concurrent threads

Thread Previous | Thread Next
From:
alfonso caponi
Date:
May 6, 2010 03:02
Subject:
Re: Help with concurrent threads
Message ID:
t2l1ab6b3a11005060302if236fd21pe4b808abbb9af542@mail.gmail.com
#!/usr/bin/perl

$|=1;

use strict;
use warnings;

use Net::DNS;
use threads;
use threads::shared;
use Thread::Queue;
use Uniq;

my $ip_list  = $ARGV[0];

die "$0 <ip list file>\n" unless $ARGV[0];

########################
### Global Variables ###

# FQDN list
my @fqdn = ("www.nba.com","www.google.com","www.kernel.org","www.linux.org");

# Maximum working threads
my $MAX_THREADS = 10;

# Maximum thread working time
my $TIMEOUT = 10;

# Flag to inform all threads that application is terminating
my $TERM :shared = 0;

# Prevents double detach attempts
my $DETACHING :shared;

#######################
### Signal Handling ###

# Gracefully terminate application on ^C or command line 'kill'
$SIG{'INT'} = $SIG{'TERM'} = sub {
	print(">>> Terminating <<<\n");
	$TERM = 1;
};

# This signal handler is called inside threads
# that get cancelled by the timer thread
$SIG{'KILL'} = sub {
	# Tell user we've been terminated
	printf("%3d <- Killed\n", threads->tid());
	# Detach and terminate
	lock($DETACHING);
	threads->detach() if ! threads->is_detached();
	threads->exit();
};

#####

my @hosts;
open(IPLIST, "<$ip_list") || die "Error : cannot open the ip address list file: $!\n";
	chomp (@hosts = <IPLIST>);
close (IPLIST);

@hosts = uniq sort @hosts;

### fill the queue
our $q :shared;
$q = new Thread::Queue;
foreach my $host (@hosts) {
	foreach my $fqdn (@fqdn) {
		$q->enqueue("$host;$fqdn");
	}
}

###############################
### Main Processing Section ###

MAIN: {

	# Start timer thread
	print "[+] Start timer thread\n";
	my $queue = Thread::Queue->new();
	threads->create('Timer',$queue)->detach();

	# Manage the thread pool until signalled to terminate
	while (! $TERM) {

		# Keep max threads running
		for (my $needed = $MAX_THREADS - threads->list(); $needed && ! $TERM; $needed--) {

			my $element = $q->dequeue_nb;

			# New thread
			threads->create('RequestDNS',$queue,$element,$TIMEOUT);
		}

		# Wait for any threads to finish
		sleep(1);
	}

	# Wait for max timeout for threads to finish
	while ((threads->list() > 0) && $TIMEOUT--) {
		sleep(1);
	}

	# Detach and kill any remaining threads
	foreach my $thr (threads->list()) {
		lock($DETACHING);
		$thr->detach() if ! $thr->is_detached();
		$thr->kill('KILL');
	}
}

exit(0);

sub RequestDNS {

	my ($queue,$element,$timeout) = @_;

	my ($dns_server,$fqdn) = split(/;/,$element);

	# My thread ID
	my $tid = threads->tid();
	printf("$dns_server -> %3d\n", $tid);

	# Register with timer thread
	$queue->enqueue($tid,$timeout);

	# Do some work while monitoring $TERM
	#while (! $TERM) {

		print "try $dns_server;$fqdn\n";

		my $res = Net::DNS::Resolver->new;
		$res->nameservers($dns_server);

		my $query = $res->query($fqdn,"A");

		if ($query) {
			foreach my $rr (grep { $_->type eq 'A' } $query->answer) {
				my $reply = $rr->address;
				print "$dns_server;$fqdn;$reply\n";
			}
		}# else {
		#	warn "query failed: $dns_server ($fqdn)", $res->errorstring, "\n";
		#}
	#}

	# Remove signal handler
	$SIG{'KILL'} = sub {};

	# Unregister with timer thread
	$queue->enqueue($tid, undef);

	# Tell user we're done
	printf("%3d <- Finished\n", $tid);

	# Detach and terminate
	lock($DETACHING);
	threads->detach() if ! threads->is_detached();
	threads->exit();
}

# The timer thread that monitors other threads for timeout
sub Timer {

	my $queue = shift;   # The registration queue
	my %timers;          # Contains threads and timeouts

	# Loop until told to quit
	while (! $TERM) {

		# Check queue
		while (my $tid = $queue->dequeue_nb()) {
			if (! ($timers{$tid}{'timeout'} = $queue->dequeue()) || ! ($timers{$tid}{'thread'}  = threads->object($tid))) {
				# No timeout - unregister thread
				delete($timers{$tid});
			}
		}

		# Cancel timed out threads
		foreach my $tid (keys(%timers)) {
			if (--$timers{$tid}{'timeout'} < 0) {
				$timers{$tid}{'thread'}->kill('KILL');
				delete($timers{$tid});
			}
		}

		# Tick tock
		sleep(1);
	}
}

Thread Previous | Thread Next


nntp.perl.org: Perl Programming lists via nntp and http.
Comments to Ask Bjørn Hansen at ask@perl.org | Group listing | About