develooper Front page | perl.cvs.qpsmtpd | Postings from February 2007

[svn:qpsmtpd] r707 - in branches/0.3x/lib: Danga Qpsmtpd

From:
msergeant
Date:
February 2, 2007 14:47
Subject:
[svn:qpsmtpd] r707 - in branches/0.3x/lib: Danga Qpsmtpd
Message ID:
20070202234727.D4214CBA1B@x12.develooper.com
Author: msergeant
Date: Fri Feb  2 15:47:26 2007
New Revision: 707

Added:
   branches/0.3x/lib/Danga/
   branches/0.3x/lib/Danga/Client.pm
   branches/0.3x/lib/Danga/TimeoutSocket.pm
   branches/0.3x/lib/Qpsmtpd/ConfigServer.pm

Log:
More qpsmtpd-async merges


Added: branches/0.3x/lib/Danga/Client.pm
==============================================================================
--- (empty file)
+++ branches/0.3x/lib/Danga/Client.pm	Fri Feb  2 15:47:26 2007
@@ -0,0 +1,173 @@
+# $Id: Client.pm,v 1.8 2005/02/14 22:06:38 msergeant Exp $
+
+package Danga::Client;
+use base 'Danga::TimeoutSocket';
+use fields qw(line pause_count read_bytes data_bytes callback get_chunks);
+use Time::HiRes ();
+
+use bytes;
+
+# 30 seconds max timeout!
+sub max_idle_time       { 30 }
+sub max_connect_time    { 1200 }
+
+sub new {
+    my Danga::Client $self = shift;
+    $self = fields::new($self) unless ref $self;
+    $self->SUPER::new( @_ );
+
+    $self->reset_for_next_message;
+    return $self;
+}
+
+sub reset_for_next_message {
+    my Danga::Client $self = shift;
+    $self->{line} = '';
+    $self->{pause_count} = 0;
+    $self->{read_bytes} = 0;
+    $self->{callback} = undef;
+    $self->{data_bytes} = '';
+    $self->{get_chunks} = 0;
+    return $self;
+}
+
+sub get_bytes {
+    my Danga::Client $self = shift;
+    my ($bytes, $callback) = @_;
+    if ($self->{callback}) {
+        die "get_bytes/get_chunks currently in progress!";
+    }
+    $self->{read_bytes} = $bytes;
+    $self->{data_bytes} = $self->{line};
+    $self->{read_bytes} -= length($self->{data_bytes});
+    $self->{line} = '';
+    if ($self->{read_bytes} <= 0) {
+        if ($self->{read_bytes} < 0) {
+            $self->{line} = substr($self->{data_bytes},
+                                   $self->{read_bytes}, # negative offset
+                                   0 - $self->{read_bytes}, # to end of str
+                                   ""); # truncate that substr
+        }
+        $callback->($self->{data_bytes});
+        return;
+    }
+    $self->{callback} = $callback;
+}
+
+sub get_chunks {
+    my Danga::Client $self = shift;
+    my ($bytes, $callback) = @_;
+    if ($self->{callback}) {
+        die "get_bytes/get_chunks currently in progress!";
+    }
+    $self->{read_bytes} = $bytes;
+    $callback->($self->{line}) if length($self->{line});
+    $self->{line} = '';
+    $self->{callback} = $callback;
+    $self->{get_chunks} = 1;
+}
+
+sub end_get_chunks {
+    my Danga::Client $self = shift;
+    my $remaining = shift;
+    $self->{callback} = undef;
+    $self->{get_chunks} = 0;
+    if (defined($remaining)) {
+        $self->process_read_buf(\$remaining);
+    }
+}
+
+sub event_read {
+    my Danga::Client $self = shift;
+    if ($self->{callback}) {
+        $self->{alive_time} = time;
+        if ($self->{get_chunks}) {
+            my $bref = $self->read($self->{read_bytes});
+            return $self->close($!) unless defined $bref;
+            $self->{callback}->($$bref) if length($$bref);
+            return;
+        }
+        if ($self->{read_bytes} > 0) {
+            my $bref = $self->read($self->{read_bytes});
+            return $self->close($!) unless defined $bref;
+            $self->{read_bytes} -= length($$bref);
+            $self->{data_bytes} .= $$bref;
+        }
+        if ($self->{read_bytes} <= 0) {
+            # print "Erk, read too much!\n" if $self->{read_bytes} < 0;
+            my $cb = $self->{callback};
+            $self->{callback} = undef;
+            $cb->($self->{data_bytes});
+        }
+    }
+    else {
+        my $bref = $self->read(8192);
+        return $self->close($!) unless defined $bref;
+        $self->process_read_buf($bref);
+    }
+}
+
+sub process_read_buf {
+    my Danga::Client $self = shift;
+    my $bref = shift;
+    $self->{line} .= $$bref;
+    return if $self->{pause_count} || $self->{closed};
+    
+    while ($self->{line} =~ s/^(.*?\n)//) {
+        my $line = $1;
+        $self->{alive_time} = time;
+        my $resp = $self->process_line($line);
+        if ($::DEBUG > 1 and $resp) { print "$$:".($self+0)."S: $_\n" for split(/\n/, $resp) }
+        $self->write($resp) if $resp;
+        # $self->watch_read(0) if $self->{pause_count};
+        return if $self->{pause_count} || $self->{closed};
+    }
+}
+
+sub has_data {
+    my Danga::Client $self = shift;
+    return length($self->{line}) ? 1 : 0;
+}
+
+sub clear_data {
+    my Danga::Client $self = shift;
+    $self->{line} = '';
+}
+
+sub paused {
+    my Danga::Client $self = shift;
+    return 1 if $self->{pause_count};
+    return 1 if $self->{closed};
+    return 0;
+}
+
+sub pause_read {
+    my Danga::Client $self = shift;
+    $self->{pause_count}++;
+    # $self->watch_read(0);
+}
+
+sub continue_read {
+    my Danga::Client $self = shift;
+    $self->{pause_count}--;
+    if ($self->{pause_count} <= 0) {
+        $self->{pause_count} = 0;
+        # $self->watch_read(1);
+    }
+}
+
+sub process_line {
+    my Danga::Client $self = shift;
+    return '';
+}
+
+sub close {
+    my Danga::Client $self = shift;
+    print "closing @_\n" if $::DEBUG;
+    $self->SUPER::close(@_);
+}
+
+sub event_err { my Danga::Client $self = shift; $self->close("Error") }
+sub event_hup { my Danga::Client $self = shift; $self->close("Disconnect (HUP)") }
+
+1;

Added: branches/0.3x/lib/Danga/TimeoutSocket.pm
==============================================================================
--- (empty file)
+++ branches/0.3x/lib/Danga/TimeoutSocket.pm	Fri Feb  2 15:47:26 2007
@@ -0,0 +1,62 @@
+# $Id: TimeoutSocket.pm,v 1.2 2005/02/02 20:44:35 msergeant Exp $
+
+package Danga::TimeoutSocket;
+
+use base 'Danga::Socket';
+use fields qw(alive_time create_time);
+
+our $last_cleanup = 0;
+
+Danga::Socket->AddTimer(15, \&_do_cleanup);
+
+sub new {
+    my Danga::TimeoutSocket $self = shift;
+    my $sock = shift;
+    $self = fields::new($self) unless ref($self);
+    $self->SUPER::new($sock);
+
+    my $now = time;
+    $self->{alive_time} = $self->{create_time} = $now;
+
+    return $self;
+}
+
+# overload these in a subclass
+sub max_idle_time       { 0 }
+sub max_connect_time    { 0 }
+
+sub _do_cleanup {
+    my $now = time;
+    
+    Danga::Socket->AddTimer(15, \&_do_cleanup);
+    
+    my $sf = __PACKAGE__->get_sock_ref;
+
+    my %max_age;  # classname -> max age (0 means forever)
+    my %max_connect; # classname -> max connect time
+    my @to_close;
+    while (my $k = each %$sf) {
+        my Danga::TimeoutSocket $v = $sf->{$k};
+        my $ref = ref $v;
+        next unless $v->isa('Danga::TimeoutSocket');
+        unless (defined $max_age{$ref}) {
+            $max_age{$ref}      = $ref->max_idle_time || 0;
+            $max_connect{$ref}  = $ref->max_connect_time || 0;
+        }
+        if (my $t = $max_connect{$ref}) {
+            if ($v->{create_time} < $now - $t) {
+                push @to_close, $v;
+                next;
+            }
+        }
+        if (my $t = $max_age{$ref}) {
+            if ($v->{alive_time} < $now - $t) {
+                push @to_close, $v;
+            }
+        }
+    }
+
+    $_->close("Timeout") foreach @to_close;
+}
+
+1;

Added: branches/0.3x/lib/Qpsmtpd/ConfigServer.pm
==============================================================================
--- (empty file)
+++ branches/0.3x/lib/Qpsmtpd/ConfigServer.pm	Fri Feb  2 15:47:26 2007
@@ -0,0 +1,289 @@
+# $Id$
+
+package Qpsmtpd::ConfigServer;
+
+use base ('Danga::Client');
+use Qpsmtpd::Constants;
+
+use strict;
+
+use fields qw(
+    _auth
+    _commands
+    _config_cache
+    _connection
+    _transaction
+    _test_mode
+    _extras
+    other_fds
+);
+
+my $PROMPT = "Enter command: ";
+
+sub new {
+    my Qpsmtpd::ConfigServer $self = shift;
+    
+    $self = fields::new($self) unless ref $self;
+    $self->SUPER::new( @_ );
+    $self->write($PROMPT);
+    return $self;
+}
+
+sub max_idle_time { 3600 } # one hour
+
+sub process_line {
+    my $self = shift;
+    my $line = shift || return;
+    if ($::DEBUG > 1) { print "$$:".($self+0)."C($self->{mode}): $line"; }
+    local $SIG{ALRM} = sub {
+        my ($pkg, $file, $line) = caller();
+        die "ALARM: $pkg, $file, $line";
+    };
+    my $prev = alarm(2); # must process a command in < 2 seconds
+    my $resp = eval { $self->_process_line($line) };
+    alarm($prev);
+    if ($@) {
+        print STDERR "Error: $@\n";
+    }
+    return $resp || '';
+}
+
+sub respond {
+    my $self = shift;
+    my (@messages) = @_;
+    while (my $msg = shift @messages) {
+        $self->write("$msg\r\n");
+    }
+    return;
+}
+
+sub fault {
+  my $self = shift;
+  my ($msg) = shift || "program fault - command not performed";
+  print STDERR "$0 [$$]: $msg ($!)\n";
+  $self->respond("Error - " . $msg);
+  return $PROMPT;
+}
+
+sub _process_line {
+    my $self = shift;
+    my $line = shift;
+
+    $line =~ s/\r?\n//;
+    my ($cmd, @params) = split(/ +/, $line);
+    my $meth = "cmd_" . lc($cmd);
+    if (my $lookup = $self->can($meth)) {
+        my $resp = eval {
+            $lookup->($self, @params);
+        };
+        if ($@) {
+            my $error = $@;
+            chomp($error);
+            Qpsmtpd->log(LOGERROR, "Command Error: $error");
+            return $self->fault("command '$cmd' failed unexpectedly");
+        }
+        return "$resp\n$PROMPT";
+    }
+    else {
+        # No such method - i.e. unrecognized command
+        return $self->fault("command '$cmd' unrecognised");
+    }
+}
+
+my %helptext = (
+    help => "HELP [CMD] - Get help on all commands or a specific command",
+    status => "STATUS - Returns status information about current connections",
+    list => "LIST [LIMIT] - List the connections, specify limit or negative limit to shrink list",
+    kill => "KILL (\$IP | \$REF) - Disconnect all connections from \$IP or connection reference \$REF",
+    pause => "PAUSE - Stop accepting new connections",
+    continue => "CONTINUE - Resume accepting connections",
+    reload => "RELOAD - Reload all plugins and config",
+    quit => "QUIT - Exit the config server",
+    );
+
+sub cmd_help {
+    my $self = shift;
+    my ($subcmd) = @_;
+    
+    $subcmd ||= 'help';
+    $subcmd = lc($subcmd);
+    
+    if ($subcmd eq 'help') {
+        my $txt = join("\n", map { substr($_, 0, index($_, "-")) } sort values(%helptext));
+        return "Available Commands:\n\n$txt\n";
+    }
+    my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help' for a full list.";
+    return "$txt\n";
+}
+
+sub cmd_quit {
+    my $self = shift;
+    $self->close;
+}
+
+sub cmd_shutdown {
+    exit;
+}
+
+sub cmd_pause {
+    my $self = shift;
+    
+    my $other_fds = $self->OtherFds;
+    
+    $self->{other_fds} = { %$other_fds };
+    %$other_fds = ();
+    return "PAUSED";
+}
+
+sub cmd_continue {
+    my $self = shift;
+    
+    my $other_fds = $self->{other_fds};
+    
+    $self->OtherFds( %$other_fds );
+    %$other_fds = ();
+    return "UNPAUSED";
+}
+
+sub cmd_status {
+    my $self = shift;
+ 
+# Status should show:
+#  - Total time running
+#  - Total number of mails received
+#  - Total number of mails rejected (5xx)
+#  - Total number of mails tempfailed (5xx)
+#  - Avg number of mails/minute
+#  - Number of current connections
+#  - Number of outstanding DNS queries
+    
+    my $output = "Current Status as of " . gmtime() . " GMT\n\n";
+    
+    if (defined &Qpsmtpd::Plugin::stats::get_stats) {
+        # Stats plugin is loaded
+        $output .= Qpsmtpd::Plugin::stats->get_stats;
+    }
+    
+    my $descriptors = Danga::Socket->DescriptorMap;
+    
+    my $current_connections = 0;
+    my $current_dns = 0;
+    foreach my $fd (keys %$descriptors) {
+        my $pob = $descriptors->{$fd};
+        if ($pob->isa("Qpsmtpd::PollServer")) {
+            $current_connections++;
+        }
+        elsif ($pob->isa("ParaDNS::Resolver")) {
+            $current_dns = $pob->pending;
+        }
+    }
+    
+    $output .= "Curr Connections: $current_connections / $::MAXconn\n".
+               "Curr DNS Queries: $current_dns";
+    
+    return $output;
+}
+
+sub cmd_list {
+    my $self = shift;
+    my ($count) = @_;
+    
+    my $descriptors = Danga::Socket->DescriptorMap;
+    
+    my $list = "Current" . ($count ? (($count > 0) ? " Oldest $count" : " Newest ".-$count) : "") . " Connections: \n\n";
+    my @all;
+    foreach my $fd (keys %$descriptors) {
+        my $pob = $descriptors->{$fd};
+        if ($pob->isa("Qpsmtpd::PollServer")) {
+            next unless $pob->connection->remote_ip; # haven't even started yet
+            push @all, [$pob+0, $pob->connection->remote_ip,
+                        $pob->connection->remote_host, $pob->uptime];
+        }
+    }
+    
+    @all = sort { $a->[3] <=> $b->[3] } @all;
+    if ($count) {
+        if ($count > 0) {
+            @all = @all[$#all-($count-1) .. $#all];
+        }
+        else {
+            @all = @all[0..(abs($count) - 1)];
+        }
+    }
+    foreach my $item (@all) {
+        $list .= sprintf("%x : %s [%s] Connected %0.2fs\n", map { defined()?$_:'' } @$item);
+    }
+    
+    return $list;
+}
+
+sub cmd_kill {
+    my $self = shift;
+    my ($match) = @_;
+    
+    return "SYNTAX: KILL (\$IP | \$REF)\n" unless $match;
+    
+    my $descriptors = Danga::Socket->DescriptorMap;
+    
+    my $killed = 0;
+    my $is_ip = (index($match, '.') >= 0);
+    foreach my $fd (keys %$descriptors) {
+        my $pob = $descriptors->{$fd};
+        if ($pob->isa("Qpsmtpd::PollServer")) {
+            if ($is_ip) {
+                next unless $pob->connection->remote_ip; # haven't even started yet
+                if ($pob->connection->remote_ip eq $match) {
+                    $pob->write("550 Your connection has been killed by an administrator\r\n");
+                    $pob->disconnect;
+                    $killed++;
+                }
+            }
+            else {
+                # match by ID
+                if ($pob+0 == hex($match)) {
+                    $pob->write("550 Your connection has been killed by an administrator\r\n");
+                    $pob->disconnect;
+                    $killed++;
+                }
+            }
+        }
+    }
+    
+    return "Killed $killed connection" . ($killed > 1 ? "s" : "") . "\n";
+}
+
+sub cmd_dump {
+    my $self = shift;
+    my ($ref) = @_;
+    
+    return "SYNTAX: DUMP \$REF\n" unless $ref;
+    require Data::Dumper;
+    $Data::Dumper::Indent=1;
+    
+    my $descriptors = Danga::Socket->DescriptorMap;
+    foreach my $fd (keys %$descriptors) {
+        my $pob = $descriptors->{$fd};
+        if ($pob->isa("Qpsmtpd::PollServer")) {
+            if ($pob+0 == hex($ref)) {
+                return Data::Dumper::Dumper($pob);
+            }
+        }
+    }
+    
+    return "Unable to find the connection: $ref. Try the LIST command\n";
+}
+
+1;
+__END__
+
+=head1 NAME
+
+Qpsmtpd::ConfigServer - a configuration server for qpsmtpd
+
+=head1 DESCRIPTION
+
+When qpsmtpd runs in multiplex mode it also provides a config server that you
+can connect to. This allows you to view current connection statistics and other
+gumph that you probably don't care about.
+
+=cut



nntp.perl.org: Perl Programming lists via nntp and http.
Comments to Ask Bjørn Hansen at ask@perl.org | Group listing | About