
Examples
********

dispy can be used to distribute standalone programs or Python program
fragments to nodes and execute them in parallel. It supports various
options to handle rather comprehensive cases (such as fault tolerance,
sharing nodes in multiple programs simulataneously, using nodes in
multiple networks etc.); however, in common setups, the usage is
simple, as done in the demonstrative examples below.

1. dispy can be used as a command line tool (for simple cases); in
   this case the computations should only be programs and dependencies
   should only be files.:

      dispy.py -f /some/file1 -f file2 -a "arg11 arg12" -a "arg21 arg22" -a "arg3" /some/program

   will distribute '/some/program' with dependencies '/some/file1' and
   'file2' and then execute '/some/program' in parallel with 3
   instances: a) arg11 and arg12 (two arguments to the program), b)
   arg21 and arg22 (two arguments), and c) arg3 (one argument).

2. A simple script that distributes a program (say,
   '/path/to/program') to all the nodes in a local network running
   *dispynode*, executes them with a sequence of numbers is:

      import dispy
      cluster = dispy.JobCluster('/path/to/program')
      for i in range(50):
          cluster.submit(i)

   The program '/path/to/program' on the client computer is
   transferred to each of the nodes, so if the program is a binary
   program then all the nodes should have same architecture as the
   client.

   In the cases above we assume that the programs execute and save the
   computation results in a database, file system etc. If we are
   interested in exit status, output from each run etc., then we need
   to collect each of the jobs submitted from which interested
   attributes can be retrieved, as done in the example below.

3. A canonical cluster that distributes computation 'compute'
   (Python function) to nodes (running *dispynode* on a local
   network), schedules jobs with the cluster, gets jobs' results and
   prints them is:

      # function 'compute' is distributed and executed with arguments
      # supplied with 'cluster.submit' below
      def compute(n):
          import time, socket
          time.sleep(n)
          host = socket.gethostname()
          return (host, n)

      if __name__ == '__main__':
          import dispy, random
          cluster = dispy.JobCluster(compute)
          jobs = []
          for i in range(20):
              job = cluster.submit(random.randint(5,20))
              job.id = i # associate an ID to identify jobs (if needed later)
              jobs.append(job)
          # cluster.wait() # wait until all jobs finish
          for job in jobs:
              host, n = job() # waits for job to finish and returns results
              print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n))
              # other fields of 'job' that may be useful:
              # job.stdout, job.stderr, job.exception, job.ip_addr, job.end_time
          cluster.stats()

4. If the computation has any dependencies, such as classes,
   objects or files, they can be specified with 'depends' argument and
   dispy will distribute them along with the computation.

   Continuing trivial but illustrative examples, the program below
   distributes computation to be executed with instances of a class.:

      class C(object):
          def __init__(self, i, n):
              self.i = i
              self.n = n

          def show(self):
              print('%s: %.2f' % (self.i, self.n))

      def compute(obj):
          # obj is an instance of C
          import time
          time.sleep(obj.n)
          obj.show()
          return obj.n

      if __name__ == '__main__':
          import random, dispy
          cluster = dispy.JobCluster(compute, depends=[C])
          jobs = []
          for i in range(10):
              c = C(i, random.uniform(1, 3)) # create object of C
              job = cluster.submit(c) # it is sent to a node for executing 'compute'
              job.id = c # store this object for later use
              jobs.append(job)
          for job in jobs:
              job() # wait for job to finish
              print('%s: %.2f / %s' % (job.id.i, job.result, job.stdout))

   Note that class C is given in 'depends' so the code for it is
   transferred to the nodes automatically and the objects created in
   client program work transparently in 'compute' on remote nodes. The
   objects are serialized using Pickle and sent over the to the nodes,
   so the objects must be serializable. If they are not serializable
   (e.g., they contain references to locks), then the class must
   provide **__getstate__** and **__setstate__** methods; see Python
   object serialization for details. In addition, the objects
   shouldn't contain file descriptors, references to other objects not
   being transferred etc., which are not valid on remote nodes.

5. *setup* and *cleanup* parameters to cluster can be used to
   initialize/de-initialize a node for running jobs of that
   computaiton, e.g., to manipulate transferred files, read data into
   memory so jobs can process data efficiently, set/unset global
   variables on that node.

   This feature works with Posix systems (Linux, OS X and other Unix
   variants) without limitations - any variable can be declared global
   and initialized in *setup*, as operating system's *fork* is used to
   create child process, which shares the address space of parent
   process (where *setup* function is executed) with copy-on-write.
   This feature can be used, for example, to read large amount of data
   in file(s) so computations (jobs) can directly access the data in
   memory, instead of reading same data from file each time.

   Under Windows, though, *fork* is not available, so the global
   variables are pickled (see multiprocessing's Programming guidelines
   for Windows) and passed to child process. Thus, under Windows,
   modules, for example, can't be used as global variables in *setup*.

   In the example below, data in file "file.dat" is read and stored in
   global variable in *setup* function. The jobs compute checksum of
   data in memory. (This example does not work under Windows for the
   reasons just mentioned.) The *cleanup* function deletes the global
   variable.:

      def setup():
          # read data in file to global variable
          global data, algorithms
          import hashlib
          data = open('file.dat').read()
          if sys.version_info.major > 2:
              data = data.encode()
              algorithms = list(hashlib.algorithms_guaranteed)
          else:
              algorithms = hashlib.algorithms
          return 0

      def cleanup():
          del globals()['data']

      def compute(n):
          import hashlib
          # 'data' and 'algorithms' global variables are initialized in 'setup'
          alg = algorithms[n % len(algorithms)]
          csum = getattr(hashlib, alg)()
          csum.update(data)
          return (alg, csum.hexdigest())

      if __name__ == '__main__':
          import dispy
          cluster = dispy.JobCluster(compute, depends=['file.dat'], setup=setup, cleanup=cleanup)
          jobs = []
          for n in range(10):
              job = cluster.submit(n)
              job.id = n
              jobs.append(job)

          for job in jobs:
              job()
              if job.status == dispy.DispyJob.Finished:
                  print('%s: %s : %s' % (job.id, job.result[0], job.result[1]))
              else:
                  print(job.exception)
          cluster.stats()
          cluster.close()

6. *setup* and *cleanup* can also be used to create shared
   variables so jobs executing on a node can share state (in a limited
   way, using multiprocessing module's shared memory support). The
   program below creates an integer shared variable that is updated by
   jobs running on that node.:

      def setup():
          import multiprocessing, multiprocessing.sharedctypes
          global shvar
          lock = multiprocessing.Lock()
          shvar = multiprocessing.sharedctypes.Value('i', 1, lock=lock)
          return 0

      def cleanup():
          del globals()['shvar']

      def compute():
          import random
          r = random.randint(1, 10)
          global shvar
          shvar.value += r
          return shvar.value

      if __name__ == '__main__':
          import dispy
          cluster = dispy.JobCluster(compute, depends=['file.dat'], setup=setup, cleanup=cleanup)
          jobs = []
          for n in range(10):
              job = cluster.submit()
              job.id = n
              jobs.append(job)

          for job in jobs:
              job()
              if job.status != dispy.DispyJob.Finished:
                  print('job %s failed: %s' % (job.id, job.exception))
              else:
                  print('%s: %s' % (job.id, job.result))
          cluster.stats()
          cluster.close()

7. *dispy_send_file* can be used to transfer file(s) to the client.
   Assume that computaion creates files with the parameter given (in
   this case *n*) so different runs create different files (otherwise,
   file(s) sent by one computation will overwrite files sent by other
   computations). Such files can be sent to the client with:

      def compute(n):
          import time
          time.sleep(n)
          # assume that computation saves data in file n.dat
          dispy_send_file(str(n) + '.dat') # send file to client
          # ... continue further computations
          return n

      if __name__ == '__main__':
          import dispy, random
          cluster = dispy.JobCluster(compute)
          jobs = []
          for i in range(20):
              job = cluster.submit(random.randint(5,20))
              job.id = i
              jobs.append(job)
          for job in jobs:
              job()
              print('job %s results in file %s' % (job.id, str(job.id) + '.dat'))

   If the client needs to process the files as soon as they are
   transferred, Provisional/Intermediate Results feature along with
   callback can be used to notify the client.

This framework can be customized for various use cases; some examples
are:

* **cluster = dispy.JobCluster(compute, depends=[ClassA, moduleB,
  'file1'])** distributes 'compute' along with ClassA (Python object),
  moduleB (Python object) and 'file1', a file on client computer.
  Presumably ClassA, moduleB and file1 are needed by 'compute'.

* **cluster = dispy.JobCluster(compute, nodes=['node20',
  '192.168.2.21', 'node24'])** sends computation to nodes 'node20',
  'node24' and node with IP address '192.168.2.21'.  These nodes could
  be in different networks, as explicit names / IP addresses are
  listed.

* If nodes are on remote network, then certain ports need to be
  forwarded as the nodes connect to the client to send status /
  results of jobs; see NAT/Firewall Forwarding. If port forwarding is
  not possible, then ssh tunneling can be used. To use this, ssh to
  each node with **ssh -R 51347:127.0.0.1:51347 node** (to possibly
  execute *dispynode* program on the node if not already running),
  then specify *ip_addr=127.0.0.1* to JobCluster; dispy issues a
  warning about using localhost address, but in this case the warning
  is harmless.

* **cluster = dispy.JobCluster(compute, nodes=['192.168.2.*'])**
  sends computation to all nodes whose IP address starts with
  '192.168.2'. In this case, it is assumed that '192.168.2' is local
  network (since UDP broadcast is used to discover nodes in a network
  and broadcasting packets don't cross networks).

* **cluster = dispy.JobCluster(compute, nodes=['192.168.3.5',
  '192.168.3.22', '172.16.11.22', 'node39', '192.168.2.*'])** sends
  computation to nodes with IP addresses '192.168.3.5',
  '192.168.3.22', '172.16.11.22' and node 'node39' (since explicit
  names / IP addresses are listed, they could be on different
  networks), all nodes whose IP address starts with '192.168.2' (local
  network).

* **cluster = dispy.JobCluster(compute, nodes=['192.168.3.5',
  '192.168.3.*', '192.168.2.*'])** In this case, dispy will send
  discovery messages to node with IP address '192.168.3.5'. If this
  node is running 'dispynetrelay', then all the nodes on that network
  are eligible for executing this computation, as wildcard
  '192.168.3.*' matches IP addresses of those nodes. In addition,
  computation is also sent to all nodes whose IP address starts with
  '192.168.2' (local network).

* **cluster = dispy.JobCluster(compute, nodes=['192.168.3.5',
  '192.168.8.20', '172.16.2.99', '*'])** In this case, dispy will send
  discovery messages to nodes with IP address '192.168.3.5',
  '192.168.8.20' and '172.16.2.99'. If these nodes all are running
  dispynetrelay, then all the nodes on those networks are eligible for
  executing this computation, as wildcard '*' matches IP addresses of
  those nodes. In addition, computation is also sent to all nodes on
  local network (since they also match wildcard '*' and discovery
  message is broadcast on local network).

* Assuming that 192.168.1.39 is the (private) IP address where dispy
  client is used, a.b.c.d is the (public) IP address of NAT
  firewall/gateway (that can be reached from outside) and dispynode is
  running at another public IP address e.f.g.h (so that a.b.c.d and
  e.f.g.h can communicate, but e.f.g.h can't communicate with
  192.168.1.39), **cluster = dispy.JobCluster(compute,
  ip_addr='192.168.1.39', ext_ip_addr='a.b.c.d', nodes=['e.f.g.h'])**
  would work if NAT firewall/gateway forwards UDP and TCP ports 51347
  to 192.168.1.39.

* **cluster = dispy.JobCluster(compute, secret='super')**
  distributes 'compute' to nodes that also use secret 'super' (i.e.,
  nodes started with **dispynode.py -s super**). Note that secret is
  used only for establishing communication, but not used to encrypt
  programs or code for python objects. This can be useful to prevent
  other users from (inadvertantly) using the nodes. If encryption is
  needed, SSL can be used; see below.

* **cluster = dispy.JobCluster(compute, certfile='mycert',
  keyfile='mykey')** distributes 'compute' and encrypts all
  communication using SSL certificate stored in 'mycert' file and key
  stored in 'mykey' file. In this case, dispynode must also use same
  certificate and key; i.e., each dispynode must be invoked with
  **dispynode --certfile="mycert" --keyfile="mykey"'**

  If both certificate and key are stored in same file, say,
  'mycertkey', they are expected to be in certfile: **cluster =
  dispy.JobCluster(compute, certfile='mycertkey')**

* **cluster1 = dispy.JobCluster(compute1, nodes=['192.168.3.2',
  '192.168.3.5'])**  **cluster2 = dispy.JobCluster(compute2,
  nodes=['192.168.3.10', '192.168.3.11'])**  distribute 'compute1' to
  nodes 192.168.3.2 and 192.168.3.5, and 'compute2' to nodes
  192.168.3.10 and 192.168.3.11. With this setup, specific
  computations can be scheduled on certain node(s).

A simple version of word count example from MapReduce:

   # a version of word frequency example from mapreduce tutorial

   def mapper(doc):
       # input reader and map function are combined
       import os
       words = []
       with open(os.path.join('/tmp', doc)) as fd:
           for line in fd:
               words.extend((word.lower(), 1) for word in line.split() \
                            if len(word) > 3 and word.isalpha())
       return words

   def reducer(words):
       # we should generate sorted lists which are then merged,
       # but to keep things simple, we use dicts
       word_count = {}
       for word, count in words:
           if word not in word_count:
               word_count[word] = 0
           word_count[word] += count
       # print('reducer: %s to %s' % (len(words), len(word_count)))
       return word_count

   if __name__ == '__main__':
       import dispy, logging
       # assume nodes node1 and node2 have 'doc1', 'doc2' etc. on their
       # local storage, so no need to transfer them
       map_cluster = dispy.JobCluster(mapper, nodes=['node1', 'node2'], reentrant=True)
       # any node can work on reduce
       reduce_cluster = dispy.JobCluster(reducer, nodes=['*'], reentrant=True)
       map_jobs = []
       for f in ['doc1', 'doc2', 'doc3', 'doc4', 'doc5']:
           job = map_cluster.submit(f)
           map_jobs.append(job)
       reduce_jobs = []
       for map_job in map_jobs:
           words = map_job()
           if not words:
               print(map_job.exception)
               continue
           # simple partition
           n = 0
           while n < len(words):
               m = min(len(words) - n, 1000)
               reduce_job = reduce_cluster.submit(words[n:n+m])
               reduce_jobs.append(reduce_job)
               n += m
       # reduce
       word_count = {}
       for reduce_job in reduce_jobs:
           words = reduce_job()
           if not words:
               print(reduce_job.exception)
               continue
           for word, count in words.iteritems():
               if word not in word_count:
                   word_count[word] = 0
               word_count[word] += count
       # sort words by frequency and print
       for word in sorted(word_count, key=lambda x: word_count[x], reverse=True):
           count = word_count[word]
           print(word, count)
       reduce_cluster.stats()
