pprocess - Tutorial

The pprocess module provides several mechanisms for running Python code concurrently in several processes. The most straightforward way of making a program parallel-aware - that is, where the program can take advantage of more than one processor to simultaneously process data - is to use the pmap function.

For a brief summary of each of the features of pprocess, see the reference document.

A Note on Parallel Processes

The way pprocess uses multiple processes to perform work in parallel involves the fork system call, which on modern operating systems involves what is known as "copy-on-write" semantics. In plain language, when pprocess creates a new child process to perform work in parallel with other work that needs to be done, this new process will be a near-identical copy of the original parent process, and the running code will be able to access data resident in that parent process.

However, when a child process modifies data, instead of changing that data in such a way that the parent process can see the modifications, the parent process will, in fact, remain oblivious to such changes. What happens is that as soon as the child process attempts to modify the data, it obtains its own separate copy which is then modified independently of the original data. Thus, a copy of any data is made when an attempt is made to write to such data. Meanwhile, the parent's copy of that data will be left untouched by the activities of the child.

It is therefore essential to note that any data distributed to other processes, and which will then be modified by those processes, will not appear to change in the parent process even if the objects employed are mutable. This is rather different to the behaviour of a normal Python program: passing a list to a function, for example, mutates that list in such a way that upon returning from that function the modifications will still be present. For example:

def mutator(l):
    l.append(3)

l = [1, 2]
mutator(l) # l is now [1, 2, 3]

In contrast, passing a list to a child process will cause the list to mutate in the child process, but the parent process will not see the list change. For example:

def mutator(l):
    l.append(3)

results = pprocess.Map()
mutator = results.manage(pprocess.MakeParallel(mutator))

l = [1, 2]
mutator(l) # l is now [1, 2]

To communicate changes to data between processes, the modified objects must be explicitly returned from child processes using the mechanisms described in this documentation. For example:

def mutator(l):
    l.append(3)
    return l       # the modified object is explicitly returned

results = pprocess.Map()
mutator = results.manage(pprocess.MakeParallel(mutator))

l = [1, 2]
mutator(l)

all_l = results[:] # there are potentially many results, not just one
l = all_l[0]       # l is now [1, 2, 3], taken from the first result

It is perhaps easiest to think of the communications mechanisms as providing a gateway between processes through which information can be passed, with the rest of a program's data being private and hidden from the other processes (even if that data initially resembles what the other processes also see within themselves).

Converting Map-Style Code

Consider a program using the built-in map function and a sequence of inputs:

    t = time.time()

    # Initialise an array.

    sequence = []
    for i in range(0, N):
        for j in range(0, N):
            sequence.append((i, j))

    # Perform the work.

    results = map(calculate, sequence)

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_map.py file.)

The principal features of this program involve the preparation of an array for input purposes, and the use of the map function to iterate over the combinations of i and j in the array. Even if the calculate function could be invoked independently for each input value, we have to wait for each computation to complete before initiating a new one. The calculate function may be defined as follows:

def calculate(t):

    "A supposedly time-consuming calculation on 't'."

    i, j = t
    time.sleep(delay)
    return i * N + j

In order to reduce the processing time - to speed the code up, in other words - we can make this code use several processes instead of just one. Here is the modified code:

    t = time.time()

    # Initialise an array.

    sequence = []
    for i in range(0, N):
        for j in range(0, N):
            sequence.append((i, j))

    # Perform the work.

    results = pprocess.pmap(calculate, sequence, limit=limit)

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_pmap.py file.)

By replacing usage of the map function with the pprocess.pmap function, and specifying the limit on the number of processes to be active at any given time (the value of the limit variable is defined elsewhere), several calculations can now be performed in parallel.

Converting Invocations to Parallel Operations

Although some programs make natural use of the map function, others may employ an invocation in a nested loop. This may also be converted to a parallel program. Consider the following Python code:

    t = time.time()

    # Initialise an array.

    results = []

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            results.append(calculate(i, j))

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple1.py file.)

Here, a computation in the calculate function is performed for each combination of i and j in the nested loop, returning a result value. However, we must wait for the completion of this function for each element before moving on to the next element, and this means that the computations are performed sequentially. Consequently, on a system with more than one processor, even if we could call calculate for more than one combination of i and j and have the computations executing at the same time, the above program will not take advantage of such capabilities.

We use a slightly modified version of calculate which employs two parameters instead of one:

def calculate(i, j):

    """
    A supposedly time-consuming calculation on 'i' and 'j'.
    """

    time.sleep(delay)
    return i * N + j

In order to reduce the processing time - to speed the code up, in other words - we can make this code use several processes instead of just one. Here is the modified code:

    t = time.time()

    # Initialise the results using a map with a limit on the number of
    # channels/processes.

    results = pprocess.Map(limit=limit)

    # Wrap the calculate function and manage it.

    calc = results.manage(pprocess.MakeParallel(calculate))

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            calc(i, j)

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_managed_map.py file.)

The principal changes in the above code involve the use of a pprocess.Map object to collect the results, and a version of the calculate function which is managed by the Map object. What the Map object does is to arrange the results of computations such that iterating over the object or accessing the object using list operations provides the results in the same order as their corresponding inputs.

Converting Arbitrarily-Ordered Invocations

In some programs, it is not important to receive the results of computations in any particular order, usually because either the order of these results is irrelevant, or because the results provide "positional" information which let them be handled in an appropriate way. Consider the following Python code:

    t = time.time()

    # Initialise an array.

    results = [0] * N * N

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            i2, j2, result = calculate(i, j)
            results[i2*N+j2] = result

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple2.py file.)

Here, a result array is initialised first and each computation is performed sequentially. A significant difference to the previous examples is the return value of the calculate function: the position details corresponding to i and j are returned alongside the result. Obviously, this is of limited value in the above code because the order of the computations and the reception of results is fixed. However, we get no benefit from parallelisation in the above example.

We can bring the benefits of parallel processing to the above program with the following code:

    t = time.time()

    # Initialise the communications queue with a limit on the number of
    # channels/processes.

    queue = pprocess.Queue(limit=limit)

    # Initialise an array.

    results = [0] * N * N

    # Wrap the calculate function and manage it.

    calc = queue.manage(pprocess.MakeParallel(calculate))

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            calc(i, j)

    # Store the results as they arrive.

    print "Finishing..."
    for i, j, result in queue:
        results[i*N+j] = result

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_managed_queue.py file.)

This revised code employs a pprocess.Queue object whose purpose is to collect the results of computations and to make them available in the order in which they were received. The code collecting results has been moved into a separate loop independent of the original computation loop and taking advantage of the more relevant "positional" information emerging from the queue.

Replacing Queues with Exchanges

We can take this example further, illustrating some of the mechanisms employed by pprocess. Instead of collecting results in a queue, we can define a class containing a method which is called when new results arrive:

class MyExchange(pprocess.Exchange):

    "Parallel convenience class containing the array assignment operation."

    def store_data(self, ch):
        i, j, result = ch.receive()
        self.D[i*N+j] = result

This code exposes the channel paradigm which is used throughout pprocess and is available to applications, if desired. The effect of the method is the storage of a result received through the channel in an attribute of the object. The following code shows how this class can be used, with differences to the previous program illustrated:

    t = time.time()

    # Initialise the communications exchange with a limit on the number of
    # channels/processes.

    exchange = MyExchange(limit=limit)

    # Initialise an array - it is stored in the exchange to permit automatic
    # assignment of values as the data arrives.

    results = exchange.D = [0] * N * N

    # Wrap the calculate function and manage it.

    calc = exchange.manage(pprocess.MakeParallel(calculate))

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            calc(i, j)

    # Wait for the results.

    print "Finishing..."
    exchange.finish()

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_managed.py file.)

The main visible differences between this and the previous program are the storage of the result array in the exchange, the removal of the queue consumption code from the main program, placing the act of storing values in the exchange's store_data method, and the need to call the finish method on the MyExchange object so that we do not try and access the results too soon. One underlying benefit not visible in the above code is that we no longer need to accumulate results in a queue or other structure so that they may be processed and assigned to the correct positions in the result array.

Using Channels in Callables

For the curious, we may remove some of the remaining conveniences of the above program to expose other features of pprocess. First, we define a slightly modified version of the calculate function:

def calculate(ch, i, j):

    """
    A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to
    communicate with the parent process.
    """

    time.sleep(delay)
    ch.send((i, j, i * N + j))

This function accepts a channel, ch, through which results will be sent, and through which other values could potentially be received, although we choose not to do so here. The program using this function is as follows, with differences to the previous program illustrated:

    t = time.time()

    # Initialise the communications exchange with a limit on the number of
    # channels/processes.

    exchange = MyExchange(limit=limit)

    # Initialise an array - it is stored in the exchange to permit automatic
    # assignment of values as the data arrives.

    results = exchange.D = [0] * N * N

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            exchange.start(calculate, i, j)

    # Wait for the results.

    print "Finishing..."
    exchange.finish()

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_start.py file.)

Here, we have discarded two conveniences: the wrapping of callables using MakeParallel, which lets us use functions without providing any channel parameters, and the management of callables using the manage method on queues, exchanges, and so on. The start method still calls the provided callable, but using a different notation from that employed previously.

Converting Inline Computations

Although many programs employ functions and other useful abstractions which can be treated as parallelisable units, some programs perform computations "inline", meaning that the code responsible appears directly within a loop or related control-flow construct. Consider the following code:

    t = time.time()

    # Initialise an array.

    results = [0] * N * N

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            time.sleep(delay)
            results[i*N+j] = i * N + j

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple.py file.)

To simulate "work", as in the different versions of the calculate function, we use the time.sleep function (which does not actually do work, and which will cause a process to be descheduled in most cases, but which simulates the delay associated with work being done). This inline work, which must be performed sequentially in the above program, can be performed in parallel in a somewhat modified version of the program:

    t = time.time()

    # Initialise the results using a map with a limit on the number of
    # channels/processes.

    results = pprocess.Map(limit=limit)

    # Perform the work.
    # NOTE: Could use the with statement in the loop to package the
    # NOTE: try...finally functionality.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            ch = results.create()
            if ch:
                try: # Calculation work.

                    time.sleep(delay)
                    ch.send(i * N + j)

                finally: # Important finalisation.

                    pprocess.exit(ch)

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_create_map.py file.)

Although seemingly more complicated, the bulk of the changes in this modified program are focused on obtaining a channel object, ch, at the point where the computations are performed, and the wrapping of the computation code in a try...finally statement which ensures that the process associated with the channel exits when the computation is complete. In order for the results of these computations to be collected, a pprocess.Map object is used, since it will maintain the results in the same order as the initiation of the computations which produced them.

Reusing Processes in Parallel Programs

One notable aspect of the above programs when parallelised is that each invocation of a computation in parallel creates a new process in which the computation is to be performed, regardless of whether existing processes had just finished producing results and could theoretically have been asked to perform new computations. In other words, processes were created and destroyed instead of being reused.

However, we can request that processes be reused for computations by enabling the reuse feature of exchange-like objects and employing suitable reusable callables. Consider this modified version of the simple_managed_map program:

    t = time.time()

    # Initialise the results using a map with a limit on the number of
    # channels/processes.

    results = pprocess.Map(limit=limit, reuse=1)

    # Wrap the calculate function and manage it.

    calc = results.manage(pprocess.MakeReusable(calculate))

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            calc(i, j)

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_manage_map_reusable.py file.)

By indicating that processes and channels shall be reused, and by wrapping the calculate function with the necessary support, the computations may be performed in parallel using a pool of processes instead of creating a new process for each computation and then discarding it, only to create a new process for the next computation.

Supporting Continuous Processes in Parallel Programs

Although reusable processes offer the opportunity to invoke a callable over and over within the same created process, they do not fully support the potential of the underlying mechanisms in pprocess: created processes can communicate multiple values to the creating process and can theoretically run within the same callable forever.

Consider this modified form of the calculate function:

def calculate(ch, i):

    """
    A supposedly time-consuming calculation on 'i'.
    """

    for j in range(0, N):
        time.sleep(delay)
        ch.send((i, j, i * N + j))

This function accepts a channel ch together with an argument i corresponding to an entire row of the input array, as opposed to having two arguments (i and j) corresponding to a single cell in the input array. In this function, a series of calculations are performed and a number of values are returned through the channel, without the function terminating until all values have been returned for the row data.

To use this modified function, a modified version of the simple_managed_queue program is used:

    t = time.time()

    # Initialise the communications queue with a limit on the number of
    # channels/processes.

    queue = pprocess.Queue(limit=limit, continuous=1)

    # Initialise an array.

    results = [0] * N * N

    # Manage the calculate function.

    calc = queue.manage(calculate)

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        calc(i)

    # Store the results as they arrive.

    print "Finishing..."
    for i, j, result in queue:
        results[i*N+j] = result

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_continuous_queue.py file.)

Although the inner loop in the work section has been relocated to the calculate function, the queue still receives outputs from that function with positional information and a result for the result array. Thus, no change is needed for the retrieval of the results: they arrive in the queue as before.

Performing Computations in Background Processes

Occasionally, it is desirable to initiate time-consuming computations and to not only leave such processes running in the background, but to be able to detach the creating process from them completely, potentially terminating the creating process altogether, and yet also be able to collect the results of the created processes at a later time, potentially in another completely different process. For such situations, we can make use of the BackgroundCallable class, which converts a parallel-aware callable into a callable which will run in a background process when invoked.

Consider this excerpt from a modified version of the simple_managed_queue program:

def task():

    # Initialise the communications queue with a limit on the number of
    # channels/processes.

    queue = pprocess.Queue(limit=limit)

    # Initialise an array.

    results = [0] * N * N

    # Wrap the calculate function and manage it.

    calc = queue.manage(pprocess.MakeParallel(calculate))

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            calc(i, j)

    # Store the results as they arrive.

    print "Finishing..."
    for i, j, result in queue:
        results[i*N+j] = result

    return results

Here, we have converted the main program into a function, and instead of printing out the results, we return the results list from the function.

Now, let us consider the new main program (with the relevant mechanisms highlighted):

    t = time.time()

    if "--reconnect" not in sys.argv:

        # Wrap the computation and manage it.

        ptask = pprocess.BackgroundCallable("task.socket", pprocess.MakeParallel(task))

        # Perform the work.

        ptask()

        # Discard the callable.

        del ptask
        print "Discarded the callable."

    if "--start" not in sys.argv:

        # Open a queue and reconnect to the task.

        print "Opening a queue."
        queue = pprocess.BackgroundQueue("task.socket")

        # Wait for the results.

        print "Waiting for persistent results"
        for results in queue:
            pass # should only be one element

        # Show the results.

        for i in range(0, N):
            for result in results[i*N:i*N+N]:
                print result,
            print

        print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_background_queue.py file.)

This new main program has two parts: the part which initiates the computation, and the part which connects to the computation in order to collect the results. Both parts can be run in the same process, and this should result in similar behaviour to that of the original simple_managed_queue program.

In the above program, however, we are free to specify --start as an option when running the program, and the result of this is merely to initiate the computation in a background process, using BackgroundCallable to obtain a callable which, when invoked, creates the background process and runs the computation. After doing this, the program will then exit, but it will leave the computation running as a collection of background processes, and a special file called task.socket will exist in the current working directory.

When the above program is run using the --reconnect option, an attempt will be made to reconnect to the background processes already created by attempting to contact them using the previously created task.socket special file (which is, in fact, a UNIX-domain socket); this being done using the BackgroundQueue function which will handle the incoming results in a fashion similar to that of a Queue object. Since only one result is returned by the computation (as defined by the return statement in the task function), we need only expect one element to be collected by the queue: a list containing all of the results produced in the computation.

Managing Several Background Processes

In the above example, a single background process was used to manage a number of other processes, with all of them running in the background. However, it can be desirable to manage more than one background process.

Consider this excerpt from a modified version of the simple_managed_queue program:

def task(i):

    # Initialise the communications queue with a limit on the number of
    # channels/processes.

    queue = pprocess.Queue(limit=limit)

    # Initialise an array.

    results = [0] * N

    # Wrap the calculate function and manage it.

    calc = queue.manage(pprocess.MakeParallel(calculate))

    # Perform the work.

    print "Calculating..."
    for j in range(0, N):
        calc(i, j)

    # Store the results as they arrive.

    print "Finishing..."
    for i, j, result in queue:
        results[j] = result

    return i, results

Just as we see in the previous example, a function called task has been defined to hold a background computation, and this function returns a portion of the results. However, unlike the previous example or the original example, the scope of the results of the computation collected in the function have been changed: here, only results for calculations involving a certain value of i are collected, with the particular value of i returned along with the appropriate portion of the results.

Now, let us consider the new main program (with the relevant mechanisms highlighted):

    t = time.time()

    if "--reconnect" not in sys.argv:

        # Wrap the computation and manage it.

        ptask = pprocess.MakeParallel(task)

        for i in range(0, N):

            # Make a distinct callable for each part of the computation.

            ptask_i = pprocess.BackgroundCallable("task-%d.socket" % i, ptask)

            # Perform the work.

            ptask_i(i)

        # Discard the callable.

        del ptask
        print "Discarded the callable."

    if "--start" not in sys.argv:

        # Open a queue and reconnect to the task.

        print "Opening a queue."
        queue = pprocess.PersistentQueue()
        for i in range(0, N):
            queue.connect("task-%d.socket" % i)

        # Initialise an array.

        results = [0] * N

        # Wait for the results.

        print "Waiting for persistent results"
        for i, result in queue:
            results[i] = result

        # Show the results.

        for i in range(0, N):
            for result in results[i]:
                print result,
            print

        print "Time taken:", time.time() - t

(This code in context with import statements and functions is found in the examples/simple_persistent_queue.py file.)

In the first section, the process of making a parallel-aware callable is as expected, but instead of then invoking a single background version of such a callable, as in the previous example, we create a version for each value of i (using BackgroundCallable) and then invoke each one. The result of this is a total of N background processes, each running an invocation of the task function with a distinct value of i (which in turn perform computations), and each employing a UNIX-domain socket for communication with a name of the form task-i.socket.

In the second section, since we now have more than one background process, we must find a way to monitor them after reconnecting to them; to achieve this, a PersistentQueue is created, which acts like a regular Queue object but is instead focused on handling persistent communications. Upon connecting the queue to each of the previously created UNIX-domain sockets, the queue acts like a regular Queue and exposes received results through an iterator. Here, the principal difference from previous examples is the structure of results: instead of collecting each individual value in a flat i by j array, a list is returned for each value of i and is stored directly in another list.

Applications of Background Computations

Background computations are useful because they provide flexibility in the way the results can be collected. One area in which they can be useful is Web programming, where a process handling an incoming HTTP request may need to initiate a computation but then immediately send output to the Web client - such as a page indicating that the computation is "in progress" - without having to wait for the computation or to allocate resources to monitor it. Moreover, in some Web architectures, notably those employing the Common Gateway Interface (CGI), it is necessary for a process handling an incoming request to terminate before its output will be sent to clients. By using a BackgroundCallable, a Web server process can initiate a communication, and then subsequent server processes can be used to reconnect to the background computation and to wait efficiently for results.

Summary

The following table indicates the features used in converting one sequential example program to another parallel program:

Sequential Example Parallel Example Features Used
simple_map simple_pmap pmap
simple1 simple_managed_map MakeParallel, Map, manage
simple2 simple_managed_queue MakeParallel, Queue, manage
simple_continuous_queue Queue, manage (continuous)
simple_managed MakeParallel, Exchange (subclass), manage, finish
simple_start Channel, Exchange (subclass), start, finish
simple_background_queue MakeParallel, BackgroundCallable, BackgroundQueue
simple_persistent_queue MakeParallel, BackgroundCallable, PersistentQueue
simple simple_create_map Channel, Map, create, exit