
dispy
*****

dispy module provides API for the client (program) to create
cluster(s) that define which nodes can execute computations. These
clusters can then be used to schedule jobs (execution of computations
used to create cluster with different parameters). The jobs can later
be examined for result of execution, output, error messages and
exception trace (if the execution failed).

While dispy and other components have various options that cover
rather comprehensive use cases, making it seem complex, most of the
options have default values that likely work for common cases. For
example, starting *dispynode* program on each of the nodes on a local
network and using JobCluster with computation, and possibly *depends*
parameters, as done with example in *dispy: Distributed and Parallel
Computing with/for Python*, may be sufficient.

There are two ways to create clusters with dispy: JobCluster and
SharedJobCluster. If only one instance of client program may be
running at anytime, JobCluster is simple to use; it already contains a
scheduler that will schedule jobs to nodes running *dispynode*. If,
however, multiple programs using dispy may be running simultaneously,
JobCluster cannot be used - each of the schedulers in each instance of
dispy will assume the nodes are controlled exclusively by each,
causing conflicts. Instead, SharedJobCluster must be used. In this
case, *dispyscheduler* program must also be running on some computer
and SharedJobCluster must set *scheduler_node* parameter with the node
running dispyscheduler (default is the host that calls
SharedJobCluster).


JobCluster
==========

class class dispy.JobCluster(computation, nodes=['*'], depends=[], callback=None, cluster_status=None, ip_addr=None, ext_ip_addr=None, port=51347, node_port=51348, recover_file=None, dest_path=None, loglevel=logging.WARNING, setup=None, cleanup=True, pulse_interval=None, ping_interval=None, reentrant=False, secret='', keyfile=None, certfile=None)

   Creates and returns a cluster for given computation, which can be
   used to schedule executions, as explained above, and Cluster. The
   parameters are:

* *computation* should be either a Python function or a string. If
  it is a string, it must be path to executable program. This
  computation is sent to nodes in the given cluster. When a job is
  submitted (to invoke computation with arguments), dispynode executes
  the computation with those arguments in isolation - the computation
  should not depend on global state, such as modules imported in the
  main program or global variables etc., except as described below for
  *setup* parameter (see also *Examples*).

* *nodes* must be a list, each element of which must be either a
  string or NodeAllocate object. If element is a string, it must be
  either IP address or host name; this string is converted to
  NodeAllocate object.

  This list serves two purposes: dispy initially sends a request to
  all the nodes listed to find out information about them (e.g.,
  number of processing units available for dispy), then sends given
  computation to only those nodes that match the listed nodes (dispy
  may know about nodes not listed in this computation, as it also
  broadcasts identification request).  Wildcard '*' can be used to
  match (part of) any IP address; e.g., '192.168.3.*' matches any node
  whose IP address starts with '192.168.3'.  If there are any nodes
  beyond local network, then all such nodes should be mentioned in
  *nodes*. If there are many such nodes (on outside local network), it
  may be cumbersome to list them all (it is not possible to send
  identification request to outside networks with wildcard in them);
  in that case, *dispynetrelay* may be started on one of the nodes on
  that network and the node running dispynetrelay should be added to
  nodes list (and a wildcard for that network, so that other nodes on
  that network match that wildcard); see *Examples*.

* *depends* is list of dependencies needed for *computation*. If
  computation is a Python function, each element of this list can be
  either Python function, Python class, an instance of class (object),
  a Python module, or path to a file. If computation is a program,
  each element can be either a Python module (useful if computation is
  a Python program), or path to a file.  Only Python modules that are
  not present on nodes already need be listed; standard modules that
  are present on all nodes do not need to be listed here.

  These dependencies are available to all the jobs executed by nodes.
  Unless *cleanup* parameter is set to **False**, they are removed
  after the computation is done. See *cluster.submit* in Cluster on
  how to distribute dependencies for each job, instead of for the
  computation.

* *callback* is a user provided function. When a job's results
  become available, dispy will call provided callback function with
  that job as the argument. If a job sends provisional results with
  **dispy_provisional_result** multiple times, then dispy will call
  provided callback each such time. The (provisional) results of
  computation can be retrieved with 'result' field of job, etc. While
  computations are run on nodes in isolated environments, callbacks
  are run in the context of user programs from which
  (Shared)JobCluster is called - for example, callbacks can access
  global variables in programs that created cluster(s).

* *cluster_status* is a user provided function. dispy calls this
  function whenever there is a change in a job's status or a node's
  status. The function is called with the arguments status (either
  DispyJob's status, such as DispyJob.Created, DispyJob.Running, or
  DispyNode's status, such as DispyNode.Initialized), an instance of
  DispyNode if the status is about a node, and an instance of DispyJob
  if the status is about a job. This feature is used by *Monitor and
  Manage Cluster* so the cluster status can be monitored in a web
  browser.

* *ip_addr* is IP address to use for (client) communication. If it
  is not set, all configured network interfaces are used. If it is a
  string, it must be either a host name or IP address, in which case
  corresponding interface address is used. If it is a list, each must
  be a string of host name or IP address, in which case interface
  addresses for each of those is used.

* *ext_ip_addr* is a string or list of strings of host names or IP
  addresses to use as return address for node to client communication.
  This may be needed in case the client is behind a NAT
  firewall/gateway and (some) nodes are outside. Typically, in such a
  case, ext_ip_addr must be the address of NAT firewall/gateway and
  the NAT firewall/gateway must forward ports to ip_addr
  appropriately. See below for more on NAT/firewall information.

* *port* is port to use for (client) communication. Usually not
  necessary. If not given, dispy will request socket library to choose
  any available port.

* *node_port* is port to use for communicating with nodes (servers).
  If this is different from default, **dispynode.py** programs must be
  run with the same port.

* *dest_path* is directory on the nodes where files are transferred
  to. Default is to create a separate directory for each computation.
  If a computation transfers files (dependencies) and same computation
  is run again with same files, the transfer can be avoided by
  specifying same *dest_path*, along with the option 'cleanup=False'.

* *recover_file*, if given, must be a string. If it is a string,
  dispy uses it as path to file in which it stores information about
  cluster. If it is **None**, then a file of the form
  '_dispy_YYYYMMDDHHMMSS' in the current directory is used. In case
  user program terminates unexpectedly (for example, because of
  network failure, uncaught exception), this file can be used to
  retrive results of jobs that were scheduled but not yet finished at
  the time of crash. See (Fault) Recover Jobs below.

* *loglevel* is message priority for logging module.

* *setup*, if given, must be a Python function or partial function
  that takes no (further) arguments. This function is transferred,
  along with dependencies and computation, to each node and executed
  on each node after all dependencies have been copied to *dest_path*
  and working directory has also been set to *dest_path* (which would
  also be the working directory when computation jobs are executed).

  The setup function is executed on a node exactly once before any
  jobs are scheduled on that node; the function should return 0 to
  indicate successful initialization. This function can be used, for
  example, to prepare data as needed to execute jobs (efficiently).
  Under Linux, OS X (and other Unix variants, but not Windows), child
  processes (that execute computation jobs) inherit parent's address
  space (among other things), so the setup function can initialize any
  data in to global variables; these variables can be used (as read-
  only data) in jobs.

  Under Windows, *fork* is not available, so the global variables are
  pickled and sent as arguments to new process; see multiprocessing's
  Programming guidelines for Windows. Thus, under Windows, modules,
  for example, can't be declared global variables in *setup* function,
  as modules are not serializable.

  If this feature is used, care must be taken not to share same node
  with other computations, as one computation's global variables may
  interfere with another computation's, and *cleanup* function may be
  provided, for example, to delete the global variables initialized.
  See *Examples* where *setup* and *cleanup* functions are used to
  initialize and de-initialize global variables.

* *cleanup*: If True (default value), any files (dependencies)
  transferred will be deleted after the computation is done. If it is
  False, the files are left on the nodes; this may speedup if same
  files are needed for another cluster later. However, this can be
  security risk and/or require manual cleanup. If same files are used
  for multiple clusters, then cleanup may be set to False and same
  *dest_path* used.

  If *cleanup* is neither True nor False, must be a Python function or
  partial function that takes no (further) arguments. This function is
  transferred to each of the nodes and executed on the nodes before
  deleting files as mentiond above. If *setup* function creates global
  variables, for example, then *cleanup* function may delete those
  variables.

* *pulse_interval* is number of seconds between 'pulse' messages
  that nodes send to indicate they are alive and computing submitted
  jobs. If this value is given as an integer or floating number
  between 1 and 600, then a node is presumed dead if 5*pulse_interval
  seconds elapse without a pulse message. See 'reentrant' below.

* *reentrant* must be either True or False. This value is used only
  if 'pulse_interval' is set for any of the clusters. If
  pulse_interval is given and reentrant is False (default), jobs
  scheduled for a dead node are automatically cancelled (for such
  jobs, execution result, output and error fields are set to None,
  exception field is set to 'Cancelled' and status is set to
  Cancelled); if reentrant is True, then jobs scheduled for a dead
  node are resubmitted to other available nodes.

* *ping_interval* is number of seconds.  Normally dispy can locate
  nodes running dispynode by broadcasting UDP ping messages on local
  network and point-to-point UDP messages to nodes on remote networks.
  However, UDP messages may get lost.  Ping interval is number of
  seconds between repeated ping messages to find any nodes that have
  missed previous ping messages.

* *secret* is a string that is (hashed and) used for handshaking of
  communication with nodes; i.e., this cluster will only work with
  nodes that use same secret (see "secret" option to *dispynode*).
  This prevents unauthorized use of nodes. However, the hashed string
  (not the secret itself) is passed in clear text, so an unauthorized,
  determined person may be able to figure out how to circumvent. This
  feature can be used, for example, to easily create a private cluster
  with small number of machines in a large network.

* *keyfile* is path to file containing private key for SSL
  communication, same as 'keyfile' parameter to ssl.wrap_socket of
  Python ssl module. This key may be stored in 'certfile' itself, in
  which case this should be *None*.  Same file must be used as
  "keyfile" option for *dispynode* in the case of JobCluster or for
  *dispyscheduler* in the case of SharedJobCluster.

* *certfile* is path to file containing SSL certificate, same as
  'certfile' parameter to ssl.wrap_socket of Python ssl module. Same
  file must be used as "certfile" option for *dispynode* in the case
  of JobCluster or for *dispyscheduler* in the case of
  SharedJobCluster.


SharedJobCluster
================

   SharedJobCluster (along with running *dispyscheduler* program)
   should be used when multiple dispy client programs run
   simultaneously.  SharedJobCluster has almost the same syntax as
   JobCluster, except as noted below.

class class dispy.SharedJobCluster(computation, nodes=['*'], depends=[], ip_addr=None, port=None, scheduler_node=None, scheduler_port=None, ext_ip_addr=None dest_path=None, loglevel=logging.WARNING, cleanup=True, reentrant=False, secret='', keyfile=None, certfile=None)

   where all arguments common to JobCluster are same, and

* *port* is client's port (which is 51347 in the case of JobCluster)
  and by default choosen randomly to an unused port. This is so that
  in case dispyscheduler is running on the same computer, the client
  port would be different from the port used by dispyscheduler. In
  case the client needs to get results from dispyscheduler in remote
  network and port forwarding is used, a specific port (e.g., 51347)
  can be given.

* *scheduler_node* is either IP address or host name where
  *dispyscheduler* is running; if it is not given, the node where
  "SharedJobCluster" is invoked is used

* *pulse_interval* is not used in case of "SharedJobCluster";
  instead, *dispyscheduler* program must be started with
  "pulse_interval" option appropriately.

* *secret* is a string that is (hashed and) used for handshaking of
  communication with dispyscheduler; i.e., this cluster will only work
  with dispyscheduler that uses same secret (see "clustersecret"
  option to *dispyscheduler*).


Cluster
=======

A cluster created by either JobCluster or SharedJobCluster has
following methods:

cluster.submit(*args, **kwargs[, dispy_job_depends=[]])

   Creates a DispyJob object (see below for useful fields of this
   object), schedules it for execution on an eligible node (whenever
   one becomes available) and returns the job. Results from execution
   of computation with given arguments will be available in the job
   object after execution finishes.

   This method should be called with the arguments exactly as expected
   by the *computation* given to JobCluster or SharedJobCluster. If
   *computation* is a Python function, the arguments may also contain
   keyword arguments. All arguments must be serializable (picklable),
   as these are sent over the network to the nodes. If an argument is
   a class object that contains non-serializable members, then the
   classes may provide **__getstate__** method for this purpose (see
   '_Job' class in dispy.py for an example). If *computation* is a
   standalone program, then all arguments must be strings.

   As noted in JobCluster, dispy can distribute dependencies to nodes
   when the cluster is created. All the jobs created by computations
   can use those dependencies. If, however, each job has its own
   dependencies (that are needed only for that job's execution), then
   such dependencies can be passed with optional keyword parameter
   *dispy_job_depends* list to *cluster.submit*. The format of this
   dependencies list is same as that for *depends* parameter of
   JobCluster.  The keyword parameter *dispy_job_depends* is used by
   *cluster.submit* and removed before job is created, so that the
   computation itself doesn't get this parameter. Dependencies
   distributed by *dispy_job_depeneds* are automaticaly removed after
   the job is done (even if *cleanup* is set to **False** when
   creating cluster).

cluster.cancel(job)

   Terminates given job (returned from previous "cluster.submit()").
   If the job is queued for execution, it is removed from the
   scheduler, or terminated (killed) it if it has started execution at
   a node. Note that if the job execution has any side effects (such
   as updating database, files etc.), cancelling a job may leave
   unpredictable side effects, depending on at what point job is
   terminated.

cluster.allocate_node(node)

   Allocate given node for the cluster. 'node' may be host name or IP
   address, or an instance of NodeAllocate.

   If the request is successful, the return value is 0; otherwise, it
   will be -1.

cluster.node_jobs(node)

   Returns list of jobs currently running on given node, given as host
   name or IP address.

cluster.set_node_cpus(node, cpus)

   Sets (alters) CPUs managed by dispy on a node, given as host name
   or IP address, to given number of CPUs. If the number of CPUs given
   is negative then that many CPUs are not used (from the available
   CPUs).

   If the cluster is SharedJobCluster, *set_node_cpus* does not change
   the CPUs (as the node is shared by other users/programs).

   If the request is successful, the return value indicates number of
   CPUs used by dispy for given node. Otherwise, the method returns
   -1.

cluster.wait()
cluster()

   Waits for all submitted jobs to complete.

cluster.close()

   Waits for all submitted jobs to complete and then closes the
   computation (such as removing any transferred files at the nodes,
   deleting *computation* itself from the nodes).

cluster.stats()

   Prints statistics about nodes, time each node spent executing jobs
   etc.


DispyJob
========

The result of "cluster.submit()" call of a cluster is an instance of
DispyJob (see dispy.py), which can be used to examine status of job
execution, retrieve job results etc. It has following attributes:

* *id* can be used by client program to set to any value
  appropriate. For example, *id* field can be set to a unique value to
  distinguish one job from another, as done in the example in *dispy:
  Distributed and Parallel Computing with/for Python*. This is the
  only field that can be written to by the client. Rest of the
  attributes are read- only.

  While any Python object can be assigned to this attribute, dispy
  debug log (if enabled) prints this as string and node information in
  Manage Cluster (Nodes) shows *id* attribute as string as well. So if
  *id* is an instance of a class, providing appropriate *__str__*
  method for the class, as done in the example in Example, will give
  useful information about the job; otherwise, *id* may be shown
  simply as reference to class instance.

* *status* indicates current status of job. Its value is one of
  *Created*, *Running*, *Finished*, *Cancelled* or *Terminated*
  (DispyJob class properties).

* *ip_addr* is IP address of node where the job is currently
  executing, or executed.

When a submitted job is called with **job()**, it returns that job's
execution result, waiting until the job is finished if it has not
finished yet. After a job is finished, following attributes can be
read:

* *result* will have computation's result - return value if
  computation is a function and exit status if it is a program.
  **job.result** is same as return value of **job()**

* *stdout* and *stderr* will have stdout and stderr strings of the
  execution of computation at server node

* *exception* will have exception trace if computation raises any
  exception; in this case **job.result** will be *None*

* *start_time* will be the time when job was scheduled for execution
  on a node

* *end_time* will be time when results became available

Job's result, stdout and stderr should not be large - these are
buffered and hence will consume memory (not stored on disk). Moreover,
like *args* and *kwargs*, result should be serializable (picklable
object). If result is (or has) an instance of Python class, that class
may have to provide **__getstate__** function to serialize the object.
If the result is not serializable, or it is too big, then the data can
be saved to a file and transferred to the client, as explained in
Transferring Files.

After jobs are submitted, "cluster.wait()" can be used to wait until
all submitted jobs for that cluster have finished. If necessary,
results of execution can be retrieved by either **job()** or
**job.result**, as described above.


NodeAllocate
============

As mentioned in JobCluster above, *nodes* must be a list, each element
of which must be either a string or NodeAllocate object. If it is a
string, it is converted to NodeAllocate object. When dispy finds a
node, each NodeAllocate's *allocate* method is called with the
cluster, IP address of node, name of node and CPUs available on the
node. The method should return a positive number indicating number of
CPUs on the node to use for given cluster. This class can be inherited
to override *allocate* method, for example, to allocate fewer CPUs
during certain hours of the day.


Provisional/Intermediate Results
================================

dispy_provisional_result(result)

      Computations (if they are Python functions) can use this
      function to send provisional or intermediate results to the
      client. For example, in optimization computations, there may be
      many (sub) optimal results that the computations can send to the
      client. The client may ignore, cancel computations or create
      additional computations based on these results. When computation
      calls **dispy_provisional_result(result)**, the Python object
      *result* (which must be serializable) is sent back to the client
      and computation continues to execute. The client should use
      *callback* option to process the information, as shown in the
      example:

         import random, dispy

         def compute(n): # executed on nodes
             import random, time, socket
             name = socket.gethostname()
             cur_best = 1
             for i in xrange(0, n):
                 r = random.uniform(0, 1)
                 if r <= cur_best:
                     # possible result
                     dispy_provisional_result((name, r))
                     cur_best = r
                 time.sleep(0.1)
             # final result
             return (name, cur_best)

         def job_callback(job): # executed at the client
             if job.status == dispy.DispyJob.ProvisionalResult:
                 if job.result[1] < 0.005:
                     # acceptable result; terminate jobs
                     print('%s computed: %s' % (job.result[0], job.result[1]))
                     # 'jobs' and 'cluster' are created in '__main__' below
                     for j in jobs:
                         if j.status in [dispy.DispyJob.Created, dispy.DispyJob.Running,
                                         dispy.DispyJob.ProvisionalResult]:
                             cluster.cancel(j)

         if __name__ == '__main__':
             cluster = dispy.JobCluster(compute, callback=job_callback)
             jobs = []
             for n in range(4):
                 job = cluster.submit(random.randint(50,100))
                 if job is None:
                     print('creating job %s failed!' % n)
                     continue
                 job.id = n
                 jobs.append(job)
             cluster.wait()
             cluster.stats()
             cluster.close()

   In the above example, computations send provisional result if
   computed number is &lt;= threshold (0.2). If the number computed is
   less than 0.005, job_callback deems it acceptable and terminates
   computations.


Transferring Files
==================

dispy_send_file(path)

   Computations (if they are Python functions) can use this function
   to send files (on server nodes) to the client. This is useful if
   the nodes and client don't share a file system and computations
   generate files.

   As explained in DispyJob, computation results (return value in the
   case of Python functions and exit status in the case of programs),
   along with any output, errors and exception trace are transferred
   to the client after a job is finished executing. As these are
   stored in memory (and sent back as serializable objects), they
   should not be too big in size. If computations generate large
   amount of data, the data can be saved in file(s) (on the nodes) and
   then transfer to the client with **dispy_send_file(path)**. The
   files are saved on the client with the same path under the
   directory where the client is running.

   For example, a computation that saves data in a file named
   "file.dat" and transferrs to the client is:

      def compute(n):
          # ... generate "file.dat"
          dispy_send_file("file.dat")
          # ... computation may continue, possibly send more files
          return result

   **dispy_send_file** returns 0 if the file is transferred
   successfully. The maximum size of the file transferred is as per
   *max_file_size* option to *dispynode*; its default value is 10MB.

   All the files sent by the computations are saved under the
   directory where client program is running. If more than one file is
   sent (generated on multiple nodes, say), care must be taken to use
   different paths/names to prevent overwriting files.

   Note that *depends* parameter to JobCluster and SharedJobCluster
   can be used to transfer files (and other types of dependencies)
   from client to nodes.


(Fault) Recover Jobs
====================

As mentioned above, dispy stores information about cluster in a file
as per *recover_file* option. If user program terminates unexpectedly,
the nodes that execute those jobs can't send the results back to
client. In such a case, the results for the jobs that were scheduled
at the time of crash can be retrieved from the nodes with the function
in dispy module:

dispy.recover_jobs(recover_file, timeout=None, terminate_pending=False)

* *recover_file* is path to the file used when cluster is created.
  See *recover_file* option in JobCluster above .

* *timeout* is number of seconds to wait for job results from nodes.
  If *timeout* seconds is not *None*, then results for jobs that have
  finished before *timeout* are returned; other jobs continue to
  execute.

* *terminate_pending*, if *True*, will terminate jobs on the nodes
  that have not finished. This can be used in combination with
  *timeout*

This function reads the information about cluster in the
*recover_file*, inquires nodes about jobs, retrieves results of jobs,
waiting until they are all finished. The result of this function is
list of DispyJob instances with certain attributes filled, such as
result, stdout, stderr, exception, status etc. This function also
releases the nodes from the computation after retrieving the results
of all pending jobs, so the nodes will respond to other clients.


NAT/Firewall Forwarding
=======================

By default dispy client uses UDP and TCP ports 51347, dispynode uses
UDP and TCP ports 51348, and dispyscheduler uses UDP and TCP pots
51347 and TCP port 51348. If client/node/scheduler are behind a NAT
firewall/gateway, then these ports must be forwarded appropriately and
*ext_ip_addr* option must be used. For example, if dispy client is
behdind NAT firewall/gateway, JobCluster / SharedJobCluster must set
*ext_ip_addr* to the NAT firewall/gateway address and forward UDP and
TCP ports 51347 to the IP address where client is running. Similarly,
if dispynode is behind NAT firewall/gateway, *ext_ip_addr* option must
be used.


Cloud Computing (with Amazon EC2)
=================================

*ext_ip_addr* of JobCluster and *dispynode* can be used to work with
Amazon EC2 cloud computing service. With EC2 service, a node has a
private IP address (called 'Private DNS Address') that uses private
network of the form 10.x.x.x and public address (called 'Public DNS
Address') that is of the form ec2-x-x-x-x.x.amazonaws.com. After
launching instance(s), one can copy dispy files to the node(s) and run
dispynode as:

   dispynode.py --ext_ip_addr ec2-x-x-x-x.y.amazonaws.com

(this address can't be used with *-i*/*--ip_addr* option, as the
network interface is configured with private IP address only). This
node can then be used by dispy client from outside EC2 network by
specifying ec2-x-x-x-x.x.amazonaws.com in the *nodes* list (thus,
using EC2 servers to augment processing units). Roughly, dispy uses
*ext_ip_addr* similar to NAT - it announces *ext_ip_addr* to other
services instead of the configured *ip_addr* so that external services
send requests to *ext_ip_addr* and if firewall/gateway forwards them
appropriately, dispy will process them.
