Python multiprocessing
Jump to navigation
Jump to search
Contents
- 1 multiprocessing Constants
- 2 multiprocessing Classes
- 3 multiprocessing Functions
- 4 multiprocessing Imported Modules
- 5 multiprocessing.pool Globals
- 6 multiprocessing.pool Classes
- 7 multiprocessing.pool Functions
- 8 multiprocessing.pool Imported Modules
- 9 Reference
- 10 General
- 11 Process
- 12 Logging
- 13 Queue
- 14 The saga of having a shared Pychrm numpy array
- 15 more multiprocessing objects
- 16 Miscellaneous
- 17 Tests for my own edification
multiprocessing
Constants
- SUBDEBUG = 5
- SUBWARNING = 25
- ProcessError - inherits from type Exception
- BufferTooShort
- TimeoutError
- AuthenticationError
multiprocessing
Classes
- Process
multiprocessing
Functions
Array()
- Returns a synchronized shared arrayBoundedSemaphore()
- Returns object of typemultiprocessing.synchronize.BoundedSemaphore
Condition()
- Returns object of typemultiprocessing.synchronize.Condition
Event()
- Returns object of typemultiprocessing.synchronize.Event
JoinableQueue()
- Returns object of typemultiprocessing.queues.JoinableQueue
Lock()
- Returns object of typemultiprocessing.synchronize.Lock
Manager()
- Returns object of typemultiprocessing.managers.SyncManager
Pipe()
- Returns a tuple of two objects of type_multiprocessing.Connection
connected by a pipePool()
- Returns object of typemultiprocessing.pool.Pool
Queue()
- Returns object of typemultiprocessing.queues.Queue
RLock()
- Returns recursive lock object of typemultiprocessing.synchronize.RLock
RawArray()
- Returns a shared arrayRawValue()
- Returns a shared objectSemaphore()
- Returns object of typemultiprocessing.synchronize.Semaphore
Value()
- Returns a synchronized shared objectactive_children()
- Return list of process objects corresponding to live child processesallow_connection_pickling()
- Install support for sending connections and sockets between processescpu_count()
- returns int of number of cpus detectedcurrent_process()
- Return process object representing the current processfreeze_support()
- Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit.get_logger()
- Return object of typelogging.Logger
-- if it does not already exist then it is createdlog_to_stderr()
- Turn on logging and add a handler which prints to stderr
multiprocessing
Imported Modules
- os
- sys
- multiprocessing.process
- multiprocessing.util
- multiprocessing.synchronize
multiprocessing.pool Globals
- job_counter - of type itertools.count
- CLOSE
- RUN
- TERMINATE
multiprocessing.pool Classes
- ApplyResult
- MapResult
- Finalize
- IMapIterator
- IMapUnorderedIterator
multiprocessing.pool.Pool
- apply() - Equivalent of "apply()" builtin, which is deprecated
- apply_async() - Asynchronous equivalent of "apply() builtin which is deprecated
- close()
- imap() - Equivalent of "itertools.imap()" -- can be MUCH slower than "Pool.map()"
- imap_unordered() - Like `imap()` method but ordering of results is arbitrary
- join()
- map() - Equivalent of `map()` builtin
- map_async() - Asynchronous equivalent of `map()` builtin
- terminate()
- ThreadPool
multiprocessing.pool Functions
- cpu_count() - same as above
- debug()
- mapstar()
- worker()
multiprocessing.pool
Imported Modules
- collections
- itertools
- threading
- time
Reference
- python Module of the Week spotlight, part 1
- python Module of the Week spotlight, part 2
- Python's concurrent.futures
General
- The child process needs to import script from parent so wrap main part of program in
if __name__ == '__main__':
Process
- Create an instance of a
multiprocessing.Process
class with a target function and start it
p = multiprocessing.Process( target = SomeCostlyFunction ) p.start()
- To pass arguments to a Process, the argument must be pickleable
p = multiprocessing.Process( target=SomeCostlyFunction, args=(42,) ) p.start()
- Can give a Process instance a name, or stick with default:
def my_service(): name = multiprocessing.current_process().name print name, 'Starting' service = multiprocessing.Process(name='my_service', target=my_service)
- Allow a main program to exit before child process has finished by setting Process member
daemon = True
d = multiprocessing.Process(name='daemon', target=daemon) d.daemon = True
- Wait for a process to finish even if it's a daemon by calling Process member function join()
- By default, join() blocks indefinitely. It is also possible to pass a timeout argument (a float representing the number of seconds to wait for the process to become inactive). If the process does not complete within the timeout period, join() returns anyway.
- It is important to join() the process after terminating it in order to give the background machinery time to update the status of the object to reflect the termination.
- Terminate a process using member function
terminate()
- Ask if a process is alive using member function is_alive()
- After the process is completed, can get exit status but looking at member
exitcode
- can derive subclass from
multiprocessing.Process
Logging
import logging if __name__ == '__main__': multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) ...
Queue
- ctypes gets you a c-style double
import ctypes thing = ctypes.c_double() thing.value = 8
multiprocessing.sharedctypes.RawArray( ctypes.c_double, N)
gets you an array of N doubles allocated in shared memory without synchronization- This would be good, except the only way to pass these around is through inheritance, i.e., somehow declaring it again in the child process code as a global
- Can we have a manager take care of it?
more multiprocessing objects
multiprocessing.managers.BaseManager
In [28]: from multiprocessing.managers import BaseManager In [29]: manager = BaseManager(address=('', 50000), authkey='abc') In [33]: type(manager) Out[33]: multiprocessing.managers.BaseManager In [34]: dir(manager) Out[34]: ['_Client', '_Listener', '_Server', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_address', # address on which the manager process listens for new connections '_authkey', # authentication key which will be used to check the validity of incoming connections to the server process '_create', '_debug_info', '_finalize_manager', '_number_of_objects', '_registry', '_run_server', '_serializer', '_state', 'address', 'connect', # Connect a local manager object to a remote manager process 'get_server', 'join', 'register', 'start', 'shutdown' # Stop the process used by the manager. This is only available if start() has been used to start the server process.]
multiprocessing.managers.Server
In [28]: from multiprocessing.managers import BaseManager In [29]: manager = BaseManager(address=('', 50000), authkey='abc') In [30]: server = manager.get_server() In [31]: type(server) Out[31]: multiprocessing.managers.Server In [32]: dir(server) Out[32]: ['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'accept_connection', 'address', 'authkey', 'create', 'debug_info', 'decref', 'dummy', 'fallback_getvalue', 'fallback_mapping', 'fallback_repr', 'fallback_str', 'get_methods', 'handle_request', 'id_to_obj', 'id_to_refcount', 'incref', 'listener', 'mutex', 'number_of_objects', 'public', 'registry', 'serve_client', 'serve_forever', 'shutdown', 'stop']
Miscellaneous
Tests for my own edification
- Test concurrent writes (without synchronization) to a referent in shared memory governed by a manager.