Python multiprocessing

From Colettapedia
Jump to navigation Jump to search

multiprocessing Constants

  1. SUBDEBUG = 5
  2. SUBWARNING = 25
  3. ProcessError - inherits from type Exception
    1. BufferTooShort
    2. TimeoutError
    3. AuthenticationError

multiprocessing Classes

  1. Process

multiprocessing Functions

  1. Array() - Returns a synchronized shared array
  2. BoundedSemaphore() - Returns object of type multiprocessing.synchronize.BoundedSemaphore
  3. Condition() - Returns object of typemultiprocessing.synchronize.Condition
  4. Event() - Returns object of type multiprocessing.synchronize.Event
  5. JoinableQueue() - Returns object of type multiprocessing.queues.JoinableQueue
  6. Lock() - Returns object of type multiprocessing.synchronize.Lock
  7. Manager() - Returns object of type multiprocessing.managers.SyncManager
  8. Pipe() - Returns a tuple of two objects of type _multiprocessing.Connection connected by a pipe
  9. Pool() - Returns object of type multiprocessing.pool.Pool
  10. Queue() - Returns object of type multiprocessing.queues.Queue
  11. RLock() - Returns recursive lock object of type multiprocessing.synchronize.RLock
  12. RawArray() - Returns a shared array
  13. RawValue() - Returns a shared object
  14. Semaphore() - Returns object of type multiprocessing.synchronize.Semaphore
  15. Value() - Returns a synchronized shared object
  16. active_children() - Return list of process objects corresponding to live child processes
  17. allow_connection_pickling() - Install support for sending connections and sockets between processes
  18. cpu_count() - returns int of number of cpus detected
  19. current_process() - Return process object representing the current process
  20. freeze_support() - Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit.
  21. get_logger() - Return object of typelogging.Logger -- if it does not already exist then it is created
  22. log_to_stderr() - Turn on logging and add a handler which prints to stderr

multiprocessing Imported Modules

  1. os
  2. sys
  3. multiprocessing.process
  4. multiprocessing.util
  5. multiprocessing.synchronize

multiprocessing.pool Globals

  1. job_counter - of type itertools.count
  2. CLOSE
  3. RUN
  4. TERMINATE

multiprocessing.pool Classes

  1. ApplyResult
    1. MapResult
  2. Finalize
  3. IMapIterator
    1. IMapUnorderedIterator

multiprocessing.pool.Pool

  • apply() - Equivalent of "apply()" builtin, which is deprecated
  • apply_async() - Asynchronous equivalent of "apply() builtin which is deprecated
  • close()
  • imap() - Equivalent of "itertools.imap()" -- can be MUCH slower than "Pool.map()"
  • imap_unordered() - Like `imap()` method but ordering of results is arbitrary
  • join()
  • map() - Equivalent of `map()` builtin
  • map_async() - Asynchronous equivalent of `map()` builtin
  • terminate()
    1. ThreadPool

multiprocessing.pool Functions

  1. cpu_count() - same as above
  2. debug()
  3. mapstar()
  4. worker()

multiprocessing.pool Imported Modules

  1. collections
  2. itertools
  3. threading
  4. time

Reference

General

  • The child process needs to import script from parent so wrap main part of program in if __name__ == '__main__':

Process

  • Create an instance of a multiprocessing.Process class with a target function and start it
p = multiprocessing.Process( target = SomeCostlyFunction )
p.start()
  • To pass arguments to a Process, the argument must be pickleable
p = multiprocessing.Process( target=SomeCostlyFunction, args=(42,) )
p.start()
  • Can give a Process instance a name, or stick with default:
def my_service():
    name = multiprocessing.current_process().name
    print name, 'Starting'

service = multiprocessing.Process(name='my_service', target=my_service)
  • Allow a main program to exit before child process has finished by setting Process member daemon = True
d = multiprocessing.Process(name='daemon', target=daemon)
d.daemon = True
  • Wait for a process to finish even if it's a daemon by calling Process member function join()
    • By default, join() blocks indefinitely. It is also possible to pass a timeout argument (a float representing the number of seconds to wait for the process to become inactive). If the process does not complete within the timeout period, join() returns anyway.
  • It is important to join() the process after terminating it in order to give the background machinery time to update the status of the object to reflect the termination.
  • Terminate a process using member function terminate()
  • Ask if a process is alive using member function is_alive()
  • After the process is completed, can get exit status but looking at member exitcode
  • can derive subclass from multiprocessing.Process

Logging

import logging
if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    ...

Queue

The saga of having a shared Pychrm numpy array

  1. ctypes gets you a c-style double
import ctypes
thing = ctypes.c_double()
thing.value = 8
  1. multiprocessing.sharedctypes.RawArray( ctypes.c_double, N) gets you an array of N doubles allocated in shared memory without synchronization
    • This would be good, except the only way to pass these around is through inheritance, i.e., somehow declaring it again in the child process code as a global
  2. Can we have a manager take care of it?

more multiprocessing objects

multiprocessing.managers.BaseManager

In [28]: from multiprocessing.managers import BaseManager

In [29]: manager = BaseManager(address=('', 50000), authkey='abc')

In [33]: type(manager)
Out[33]: multiprocessing.managers.BaseManager

In [34]: dir(manager)
Out[34]: 
['_Client',
 '_Listener',
 '_Server',
 '__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__enter__',
 '__exit__',
 '__format__',
 '__getattribute__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_address', # address on which the manager process listens for new connections
 '_authkey', # authentication key which will be used to check the validity of incoming connections to the server process
 '_create',
 '_debug_info',
 '_finalize_manager',
 '_number_of_objects',
 '_registry',
 '_run_server',
 '_serializer',
 '_state',
 'address',
 'connect', # Connect a local manager object to a remote manager process
 'get_server',
 'join',
 'register',
 'start',
 'shutdown' # Stop the process used by the manager. This is only available if start() has been used to start the server process.]

multiprocessing.managers.Server

In [28]: from multiprocessing.managers import BaseManager

In [29]: manager = BaseManager(address=('', 50000), authkey='abc')

In [30]: server = manager.get_server()

In [31]: type(server)
Out[31]: multiprocessing.managers.Server

In [32]: dir(server)
Out[32]: 
['__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__format__',
 '__getattribute__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 'accept_connection',
 'address',
 'authkey',
 'create',
 'debug_info',
 'decref',
 'dummy',
 'fallback_getvalue',
 'fallback_mapping',
 'fallback_repr',
 'fallback_str',
 'get_methods',
 'handle_request',
 'id_to_obj',
 'id_to_refcount',
 'incref',
 'listener',
 'mutex',
 'number_of_objects',
 'public',
 'registry',
 'serve_client',
 'serve_forever',
 'shutdown',
 'stop']

Miscellaneous

Tests for my own edification

  • Test concurrent writes (without synchronization) to a referent in shared memory governed by a manager.