#!/usr/bin/perl -w

# Copyright 2004 Andreas Bernauer.  All rights reserved except:
#
# You are free to use this software privately or in a non-profit
# organization.  I would like to hear from you if you want to use this
# software.  You can reach me at andreas.bernauer@gmx.de.

# Starts a data server for distributed computing on various computer
# nodes.  Contacts them via ssh and public_key authorization.

# The only function you need is runDistribution.  It is described below.


package Distributor;
require Exporter;

use strict;
use Net::Server::Single;	# Implementation depends on this being
                                #  a Single Server as no locking is
                                #  performed.
use Errno qw(EAGAIN);

our @ISA = qw(Net::Server::Single Exporter);
our @EXPORT = qw (runDistribution);
our @EXPORT_OK = qw();
our $VERSION = 0.1;


# my ($serverHost, $serverPort, $serverPassword,  
#     $dataPerCon , $nextData, $username, $nodes, $remoteCommand);
my $serverHost;			# server host name
my $serverPort;			# server host port
my $serverPassword;		# server host shutdown password
my $dataPerCon;			# data items to deliver per request
my $nextData;			# function that returns next data string
my $username;			# name on every remote machine
my $nodes;			# array of nodes that we connect to
my $remoteCommand;		# command to launch on remote machine (node)
sub myfork($\@$\@);# (\&\@\&\@);

# Launches a server that delivers data items to clients.  The server
# also connects to each node and launches the client program there.
# The client program is supposed to connect to the server to get the
# data items.
#
# The idea is that the server is running on a machine and the clients
# are running on the nodes.  The clients periodically connect to the
# server and get a set of next data items they should handle, until
# the server indicates there are no more data items left.  See below
# for examples.
#
# Args: hostname the server is running on, port server is listening
# on, password to shutdown the server, amount of data items to be
# delivered per client connection, function that returns the next data
# item (a string without linebreaks), username to login in to each
# node, array of node names on which the server will launch the client
# programs, command to execute on node that launches the client.
#
# Returns: nothing.  Only returns, if shutdown via the client.  Gives
# some output at startup and says "All data items delivered!" when all
# data items have been delivered.
#
# Example:
#
# use Distributor;
# @data = qw ( 3 6 9 12);
# @nodes = qw ( c1n1.math.uconn.edu );
# sub nextItem { return shift @data; }
# runDistribution( "hgt.mcb.uconn.edu", 49876, "neynex", 3, 
# 		 &nextItem, "gogarten", @nodes, 
#                "perl -I andreas/research/bin andreas/research/bin/readData.pl");
#
# You probably want to create a client called readData.pl and
# distribute it to the nodes.  The client can use the Collector class
# to ease the data fetching.  readData.pl can look like this:
#
# use Collector;
# exit (!wrapper("echo", "hgt.mcb.uconn.edu",  49876));
#
# This runs the "echo" program on the remote machine for each data
# item that the function gets from a server on
# hgt.mcb.uconn.edu:49876.
#
# readData.pl can also look like this:
#
# use Collector;
# my @data;
# while (@data = readData($remote_host, $remote_port)) {
#   for (@data) {
#     # perform operations on data item here
#   }
# }
#
# Notes:
#
# The password is transmitted in plain text and is supposed to
# prevent a shutdown by accident, hardly more.
#
# The nextData function that provides the next data item must return
# undef to indicate that there are no more data items.  The server
# might continue to call the nextData item, even it has already
# returned undef once.
#
# The server does not buffer any data, nor does it ensure the client
# got the data.  So if the connection fails after the server has
# called nextData but before the client actually got the data, the
# data is just lost.  You have to come up with a mechanism by yourself
# to ensure that all data has been processed.

sub runDistribution ($$$$$$\@$) {
  ($serverHost, $serverPort, $serverPassword,
   $dataPerCon , $nextData, $username, $nodes, $remoteCommand) = @_;
  sub server () { 
    Distributor->run(syslog_ident => "PerlDistributer",
		     port => $serverPort,
		     host => $serverHost,
		     listen => 10,
		     group => "nobody",
		     user => "nobody");
  }


  sub clients ($){
    my $pid = shift;
#     sub next_node {
#       my @nodes = @_;
#       my $node = shift @nodes;
#       my @logfiles = ($node . ".log", $node . ".err.log");
#       myfork &next_node, @nodes, &ssh, @logfiles;
#     }
	
    sub ssh ($$$$){
      my $pid = shift;
      my ($logfile, $errlogfile, $node) = @_;
      my @ssh_command = ( "ssh", 
			  "$username\@$node",
			  $remoteCommand,
			  $serverHost,
			  $serverPort);
      my $logfileMark = "*** " . scalar(localtime) . ": New subprocess started.\n";
      open(LOGFILE, ">>$logfile")         or die "Couldn't open $logfile: $!\n";
      open(ERRLOGFILE, ">>$errlogfile")   or die "Couldn't open $errlogfile: $!\n";
      print LOGFILE $logfileMark;
      print ERRLOGFILE $logfileMark;
      open(STDOUT, ">>&LOGFILE")          or die "Couldn't dup STDOUT: $!\n";
      open(STDERR, ">>&ERRLOGFILE")       or die "Couldn't dup STDERR: $!\n";
      exec {"ssh"} @ssh_command
	or die "Could not exec ssh: $!\n";
    }

    sub nothing () { }

    for my $node (@$nodes) {
      print "Launching on node: $node\n";
      my @ssh_args = ($node . ".log", 
		      $node . ".err.log", 
		      $node);
      myfork \&nothing , @{[]}, \&ssh, @ssh_args;
    }
#       if ($returnCode == -1) {
# 	warn "Couldn't ssh to $node: $!\n";
#       } elsif ($returnCode / 256) {
# 	warn "ssh to $node failed, exit status " . ($returnCode / 256) . "\n";
#       } else {
# 	print "Node $node: successfully launched. \n";
#       }
  }
  myfork \&server, @{[]}, \&clients, @{[]};
}

# Does a fork and calls two functions with arguments: one in the
# parent, the other in the child.  Makes sure, zombies are reaped and
# child don't exit back to parent.
#
# Args: parent_thunk, parent_args, child_thunk, child_args
#
# Returns: nothing.
#
# Example: myfork &server, @{[]}, &clients, @{[]}
#
# Note: After I'v written this function, I've realized, I don't need
# the parent function.  But now it's that way and so I keep it that
# way.


sub myfork ($\@$\@) {
  my ($parentSub, $parentArg, $childSub, $childArg) = @_;
 FORK: {
    local $SIG{CHLD} = "IGNORE";
    my $pid=fork;
    if ($pid) {			# parent
      &$parentSub(@$parentArg);
    } elsif (defined $pid) {	# child
      unshift @$childArg, $pid;
      &$childSub(@$childArg);
      exit;			# don't let child fall back into main
                                # code
    } elsif ($! == EAGAIN) {	# recoverable fork error
      sleep 5;
      redo FORK;
    } else {
      die "Can't fork: $!\n";
    }
  }
}

# A chomp command for network lines:  removes both \r (if existant) and \n.
#
# Args: reference to line to chomp.
#
# Returns: nothing.  Side effects on argument by removing trailing
# \r\n.
#
# Example:  netChomp \$line;


sub netChomp ($) {
  my $ref = shift;
  $$ref =~ s/\r?\n$//;
}

### over-ridden subs below

# Launched when a request reaches the server.  Implements the Data
# Transfer Protocl invented by me.  The protocol is really simple and
# goes like this.  The first four characters represent a command.
# They are followed by a whitespace and some arguments and finished by
# \r\n.
#
#  Client                          Server
#
#              >> HELO <id> >>
#
#              << HELO <id> <text> <<
#
#              >> RECV >>  
#
#              << DATA <data> <<
#
#              <repeated as often as necessary>
#
#              << DEND <<
#
#
# The host then closes the connection.  If there is no more data, the
# host sends
#
#              << QUIT <<
#
# after the RECV of the subprocess or instead of DEND and closes the
# connection.
#
# This protocoll probably can't handle raw binary data.  You have to
# encode the binary data first, e.g. with uuencode/uudecode.
#
# The sub process (or rather a human being) can also say the following
# instead of the initial HELO:
#
#  >> STAT >>
#
#  This makes the host print lines of statistics and wait for further
#  instruction like the following:
#
#  STAT STAT 3 data items of 4 delivered so far. (75.0%)
#  ENDS.
#
#  The stats are always terminated by the ENDS line.  Afterwards, the
#  server is in the same state as if the connection had just opened,
#  i.e. it still accepts another STAT or a HELO, etc.
#
#  The client can also pass
#
#  >> DOWN <password> >>
#
#  as the first command (only).  If the password is the same as
#  delivered upon startup, the server shuts down.  
#
#  Every child process is supposed to shut down eventually, too, after
#  all data items have been distributed and the server indicates that
#  there is no more data left, but the protocol gives no explicit
#  command that could make the clients shutdown.
#
#  There is no way of shutting down every child immediately, as the
#  children don't listen (hehe); except for shutting them down
#  explicitly.
#
#  If at any point the server encounters an invalid command or a line
#  it cannot understand, it leaves a messages and closes the
#  connection.  The proper way to close the connection prematurely is
#  by using this mechanism.
#
#  The server is closing the connection after a timeout, currently 30
#  seconds.  If the server closes a connection it tries to deliver the
#  reason for the closing.  The only might be:
#
#  DOWN    Server is shutting down because client told him so.
#  ABRT    Server is closing connection because of illegal command.
#          This mechanism is supposed to be used by the client to prematurely 
#          close the connection.
#  QUIT    All data items have been delivered.  There are no more data
#          items the server could deliver.
#  DEND    All data items of the current set have been delivered.  There are data
#          items left, but the client needs to reconnect for that.


my $dataDone = 0;		# data items delivered so far
my $finished = 0;		# next_arg returned undef.
sub process_request {
  my $self = shift;
  my $timeout = 30;		# give the user 30 seconds to type a line
  my $previous_alarm = alarm($timeout);
  eval {

    local $SIG{ALRM} = sub { die "Timed Out!\n" };

    my $input;
  BLOCK:
    {	    # block to break out via last;
      $input = <STDIN>;
      die "No input\n" unless defined $input;
      my $clientID;
      netChomp \$input;
      if ($input =~ /^HELO\s+(\d+)$/i) {       # HELO
	$clientID = $1;
	print "HELO $clientID Go ahead.\r\n";
      } elsif ($input =~ /^STAT$/i) {	       # STAT
	print "STAT $dataDone data items delivered so far.\r\n";
	print "STAT All data items delivered.\r\n" if ($finished);
	print "ENDS\r\n";
	redo BLOCK;
      } elsif ($input =~ /^DOWN\s+(\w+)$/) {   # DOWN
	if ($1 eq $serverPassword) {
	  print "DOWN Server shutting down.\r\n";
	  $self->server_close;
	} else {
	  print "ABRT Not a valid command ($input) or intended quit.\r\n";
	  last BLOCK;
	}
      } else {
	print "ABRT Not a valid command ($input) or intended quit.\r\n";
	last BLOCK;
      }

      alarm($timeout);
      $input = <STDIN>;
      die "No input\n" unless defined $input;
      netChomp \$input;
      if ($input =~ /^RECV\s*/i) {	       # RECV
	for (1..$dataPerCon) {
	  if ( my $next_arg = &$nextData() ) { # next DATA
	    print "DATA $next_arg\r\n";
	    $dataDone++;
	    alarm($timeout);
	  } else {			       # no more data: QUIT
	    print STDERR "All data items delivered.\r\n" if (!$finished);
	    $finished = 1;
	    print "QUIT All data items delivered.\r\n";
	    last BLOCK;
	  }
	}
	print "DEND\r\n";		       # DEND: end of current
                                               # data set
      } else {
	print "ABRT Not a valid command ($input) or intended quit.\r\n";
	last BLOCK;
      }
    }	    # end of block

  };

  alarm($previous_alarm);

  if ( $@=~/timed out/i ) {
    print STDOUT "Timed Out.\r\n";
    return;
  } elsif ($@ =~ /no input/i) {
    print STDOUT "ABRT empty line\r\n";
    return;
  } elsif ($@ ne "") {
    print $nextData, "\r\n";
    print STDOUT "Internal error: $@\r\n";
    return;
  }

}

1;
