pprocess - Reference

The pprocess module defines a simple parallel processing API for Python, inspired somewhat by the thread module, slightly less by pypar, and slightly less still by pypvm.

This document complements the tutorial by providing an overview of the different styles of parallel programming supported by the module. For an introduction and in order to get a clearer idea of the most suitable styles for your own programs, consult the tutorial.

Thread-style Processing

To create new processes to run a function or any callable object, specify the "callable" and any arguments as follows:

channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2)

This returns a channel which can then be used to communicate with the created process. Meanwhile, in the created process, the given callable will be invoked with another channel as its first argument followed by the specified arguments:

def fn(channel, arg1, arg2, named1, named2):
    # Read from and write to the channel.
    # Return value is ignored.
    ...

Fork-style Processing

To create new processes in a similar way to that employed when using os.fork (ie. the fork system call on various operating systems), use the following method:

channel = pprocess.create()
if channel.pid == 0:
    # This code is run by the created process.
    # Read from and write to the channel to communicate with the
    # creating/calling process.
    # An explicit exit of the process may be desirable to prevent the process
    # from running code which is intended for the creating/calling process.
    ...
    pprocess.exit(channel)
else:
    # This code is run by the creating/calling process.
    # Read from and write to the channel to communicate with the created
    # process.
    ...

Message Exchanges

When creating many processes, each providing results for the consumption of the main process, the collection of those results in an efficient fashion can be problematic: if some processes take longer than others, and if we decide to read from those processes when they are not ready instead of other processes which are ready, the whole activity will take much longer than necessary.

One solution to the problem of knowing when to read from channels is to create an Exchange object, optionally initialising it with a list of channels through which data is expected to arrive:

exchange = pprocess.Exchange()           # populate the exchange later
exchange = pprocess.Exchange(channels)   # populate the exchange with channels

We can add channels to the exchange using the add method:

exchange.add(channel)

To test whether an exchange is active - that is, whether it is actually monitoring any channels - we can use the active method which returns all channels being monitored by the exchange:

channels = exchange.active()

We may then check the exchange to see whether any data is ready to be received; for example:

for channel in exchange.ready():
    # Read from and write to the channel.
    ...

If we do not wish to wait indefinitely for a list of channels, we can set a timeout value as an argument to the ready method (as a floating point number specifying the timeout in seconds, where 0 means a non-blocking poll as stated in the select module's select function documentation).

Convenient Message Exchanges

A convenient form of message exchanges can be adopted by defining a subclass of the Exchange class and defining a particular method:

class MyExchange(pprocess.Exchange):
    def store_data(self, channel):
        data = channel.receive()
        # Do something with data here.

The exact operations performed on the received data might be as simple as storing it on an instance attribute. To make use of the exchange, we would instantiate it as usual:

exchange = MyExchange()         # populate the exchange later
exchange = MyExchange(limit=10) # set a limit for later population

The exchange can now be used in a simpler fashion than that shown above. We can add channels as before using the add method, or we can choose to only add channels if the specified limit of channels is not exceeded:

exchange.add(channel)           # add a channel as normal
exchange.add_wait(channel)      # add a channel, waiting if the limit would be
                                # exceeded

Or we can request that the exchange create a channel on our behalf:

channel = exchange.create()

We can even start processes and monitor channels without ever handling the channel ourselves:

exchange.start(fn, arg1, arg2, named1=value1, named2=value2)

We can explicitly wait for "free space" for channels by calling the wait method, although the start and add_wait methods make this less interesting:

exchange.wait()

Finally, when finishing the computation, we can choose to merely call the finish method and have the remaining data processed automatically:

exchange.finish()

Clearly, this approach is less flexible but more convenient than the raw message exchange API as described above. However, it permits much simpler and clearer code.

Exchanges as Queues

Instead of having to subclass the pprocess.Exchange class and to define the store_data method, it might be more desirable to let the exchange manage the communications between created and creating processes and to let the creating process just consume received data as it arrives, without particular regard for the order of the received data - perhaps the creating process has its own way of managing such issues.

For such situations, the Queue class may be instantiated and channels added to the queue using the various methods provided:

queue = pprocess.Queue(limit=10)
channel = queue.create()
if channel:
    # Do some computation.
    pprocess.exit(channel)

The results can then be consumed by treating the queue like an iterator:

for result in queue:
    # Capture each result.

This approach does not, of course, require the direct handling of channels. One could instead use the start method on the queue to create processes and to initiate computations (since a queue is merely an enhanced exchange with a specific implementation of the store_data method).

Exchanges as Maps

Where the above Queue class appears like an attractive solution for the management of the results of computations, but where the order of their consumption by the creating process remains important, the Map class may offer a suitable way of collecting and accessing results:

results = pprocess.Map(limit=10)
for value in inputs:
    results.start(fn, args)

The results can then be consumed in an order corresponding to the order of the computations which produced them:

for result in results:
    # Process each result.

Internally, the Map object records a particular ordering of channels, ensuring that the received results can be mapped to this ordering, and that the results can be made available with this ordering preserved.

Managed Callables

A further simplification of the above convenient use of message exchanges involves the creation of callables (eg. functions) which are automatically monitored by an exchange. We create such a callable by calling the manage method on an exchange:

myfn = exchange.manage(fn)

This callable can then be invoked instead of using the exchange's start method:

myfn(arg1, arg2, named1=value1, named2=value2)

The exchange's finish method can be used as usual to process incoming data.

Making Existing Functions Parallel

In making a program parallel, existing functions which only return results can be manually modified to accept and use channels to communicate results back to the main process. However, a simple alternative is to use the MakeParallel class to provide a wrapper around unmodified functions which will return the results from those functions in the channels provided. For example:

fn = pprocess.MakeParallel(originalfn)

Map-style Processing

In situations where a callable would normally be used in conjunction with the Python built-in map function, an alternative solution can be adopted by using the pmap function:

pprocess.pmap(fn, sequence)

Here, the sequence would have to contain elements that each contain the required parameters of the specified callable, fn. Note that the callable does not need to be a parallel-aware function which has a channel argument: the pmap function automatically wraps the given callable internally.

Reusing Processes and Channels

So far, all parallel computations have been done with newly-created processes. However, this can seem somewhat inefficient, especially if processes are being continually created and destroyed (although if this happens too often, the amount of work done by each process may be too little, anyway). One solution is to retain processes after they have done their work and request that they perform more work for each new parallel task or invocation. To enable the reuse of processes in this way, a special keyword argument may be specified when creating Exchange instances (and instances of subclasses such as Map and Queue). For example:

exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes

Code invoked through such exchanges must be aware of channels and be constructed in such a way that it does not terminate after sending a result back to the creating process. Instead, it should repeatedly wait for subsequent sets of parameters (compatible with those either in the signature of a callable or with the original values read from the channel). Reusable code is terminated when the special value of None is sent from the creating process to the created process, indicating that no more parameters will be sent; this should cause the code to terminate.

Making Existing Functions Parallel and Reusable

An easier way of making reusable code sections for parallel use is to employ the MakeReusable class to wrap an existing callable:

fn = pprocess.MakeReusable(originalfn)

This wraps the callable in a similar fashion to MakeParallel, but provides the necessary mechanisms described above for reusable code.

Continuous Processes and Channels

Much of the usage of exchanges so far has concentrated on processes which are created, whose callables are invoked, and then, once those callables have returned, either they are invoked again in the same process (when reused) or in a new process (when not reused). However, the underlying mechanisms actually support processes whose callables not only receive input at the start of their execution and send output at the end of their execution, but may provide output on a continuous basis (similar to iterator or generator objects).

To enable support for continuous communications between processes, a keyword argument must be specified when creating an Exchange instance (or an instance of a subclass of Exchange such as Map or Queue):

exchange = MyExchange(limit=10, continuous=1) # support up to 10 processes

Code invoked in this mode of communication must be aware of channels, since it will need to explicitly send data via a channel to the creating process, instead of terminating and sending data only once (as would be done automatically using convenience classes such as MakeParallel).

Background Processes and Callables

So far, all parallel computations have involved created processes which depend on the existence of the created process to collect results and to communicate with these created processes, preventing the created process from terminating, even if the created processes actually perform work and potentially create output which need not concern the process which created them. In order to separate creating and created processes, the concept of a background process (also known as a daemon process) is introduced.

The BackgroundCallable class acts somewhat like the manage method on exchange-based objects, although no exchange is immediately involved, and instances of BackgroundCallable provide wrappers around existing parallel-aware callables which then be invoked in order to initiate a background computation in a created process. For example:

backgroundfn = pprocess.BackgroundCallable(address, fn)

This wraps the supplied callable (which can itself be the result of using MakeParallel), with the resulting wrapper lending itself to invocation like any other function. One distinguishing feature is that of the address: in order to contact the background process after invocation to (amongst other things) receive any result, a specific address must be given to define the contact point between the created process and any processes seeking to connect to it. Since these "persistent" communications employ special files (specifically UNIX-domain sockets), the address must be a suitable filename.

Background and Persistent Queues

Background processes employing persistent communications require adaptations of the facilities described in the sections above. For a single background process, the BackgroundQueue function is sufficient to create a queue-like object which can monitor the communications channel between the connecting process and a background process. For example:

queue = pprocess.BackgroundQueue(address)

This code will cause the process reachable via the given address to be contacted and any results made available via the created queue-like object.

Where many background processes have been created, a single PersistentQueue object can monitor their communications by being connected to them all, as in the following example:

queue = pprocess.PersistentQueue()
for address in addresses:
    queue.connect(address)

Here, the queue monitors all previously created processes whose addresses reside in the addresses sequence. Upon iterating over the queue, results will be taken from whichever process happens to have data available in no particular pre-defined order.

Implementation Notes

Signals and Waiting

When created/child processes terminate, one would typically want to be informed of such conditions using a signal handler. Unfortunately, Python seems to have issues with restartable reads from file descriptors when interrupted by signals:

Select and Poll

The exact combination of conditions indicating closed pipes remains relatively obscure. Here is a message/thread describing them (in the context of another topic):

It would seem, from using sockets and from studying the asyncore module, that sockets are more predictable than pipes.

Notes about poll implementations can be found here: