Multiprocessing in Python

Key Components of Multiprocessing in Python

Okan Yenigün
Level Up Coding

--

Photo by Jorge Salvador on Unsplash

This blog post focuses on applying the multiprocessing module in Python. It builds on a previous introduction to concurrent and parallel work in Python, emphasizing practical application over conceptual explanation.

Multiprocessing enables concurrent programming by circumventing the Global Interpreter Lock (GIL), allowing the use of multiple processors on a machine.

Beginning with a basic example, consider a cube function that calculates the cube of a provided value. We will execute this function in parallel for various values.

import os
from multiprocessing import Process, current_process

def cube(x: int):
#the operation
value = x * x * x

#get process id from os
process_id = os.getpid()
#process id from multiprocessing
process_name = current_process().name
print(f"Process ID: {process_id} Process Name: {process_name} x: {x} value: {value}")

if __name__ == '__main__':
process_list = []
value_list = [1,2,3,4,5]

for value in value_list:
#create process
process = Process(target=cube, args=(value,))
process_list.append(process)

process.start()

The multiprocessing package in Python facilitates concurrency and shares many similarities with Python’s threading package, despite not being based on it. Both aim to achieve parallel execution, but they operate differently. Official documentation can provide more detailed insights.

To ensure multiprocessing functions correctly, __name__ == '__main__' protection must be used to prevent issues like the freeze_support() error. Processes should be declared within this conditional block.

This principle is particularly relevant in functional programming styles, emphasizing that processes should not initiate directly upon a module’s importation. It’s acceptable to embed multiprocessing within a class method, for example. Utilizing multiprocessing at the module’s top-level can lead to an issue where the initial process spawns a new one, and each subsequent process continues to spawn additional processes, resulting in an infinite loop of process creation.

In the mentioned example, we create a new process for each value within a loop. Now, let’s examine the Process class in more detail.

Process

Constructor: (group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

We must call the constructor using the keywords all the time.

  • group: is always None. This argument is there for threading compatibility.
  • target: It is the callable object, it is what the process runs.
  • name: The name of the process.
  • args: The argument tuple that will be passed to the target object.
  • kwargs: The argument keyword dictionary that will be passed to the target object.
  • daemon: It’s a flag to set the process as a daemon. A daemon process terminates itself after the main process is terminated.

Above, we set the cube function as the target and send the values as arguments.

Methods:

  • run(): invokes the target function within the process. Although calling run directly is not necessary, doing so executes all tasks within the same process.
#just change the start method with the run method above
process.run()

"""
Process ID: 14316 Process Name: MainProcess x: 1 value: 1
Process ID: 14316 Process Name: MainProcess x: 2 value: 8
Process ID: 14316 Process Name: MainProcess x: 3 value: 27
Process ID: 14316 Process Name: MainProcess x: 4 value: 64
Process ID: 14316 Process Name: MainProcess x: 5 value: 125
"""
  • start(): It creates a new process and invokes the run method.
  • join([timeout]) : decides whether to wait for a process to complete. Without using join , the parent process may finish before its child processes, leading to their premature termination. When a timeout value is specified as an integer, it limits the wait to a maximum of the specified seconds.
def sleeping():
print("Before sleep")
time.sleep(2)
print("After sleep")

if __name__ == '__main__':
p = Process(target=sleeping)
print('Before start')
p.start()
print('After start')
p.join()
print("After join")

"""
Before start
After start
Before sleep
After sleep
After join
"""

if __name__ == '__main__':
p = Process(target=sleeping)
print('Before start')
p.start()
print('After start')
#p.join()
print("After join")

"""
Before start
After start
After join
Before sleep
After sleep
"""
  • name(): returns the name of the process.
  • is_alive(): returns a boolean whether the process is alive.
if __name__ == '__main__':
p = Process(target=sleeping, daemon=False)
print('Before start')
p.start()
print(p.is_alive())
print('After start')
p.join(timeout=3)
print("After join")
print(p.is_alive())

"""
Before start
True
After start
Before sleep
After sleep
After join
False
"""
  • terminate(): terminates the process. Unix (SIGTERM) signal
  • kill(): same as terminate. In Unix systems, another signal is used (SIGKILL)
  • close(): closes the process. A ValueError is raised after closing a process.

Attributes:

  • daemon: a boolean whether the process is a daemon.
  • pid: process id
  • exitcode: If the process isn’t terminated yet, then it is None. If it is terminated normally, then it is 0. If it is terminated via sys.exit(), then it is the argument N of sys.exit().
  • authkey: authentication key of the process. Something like: b’\xce\xeay\x9a\xcb\xa6;D\xcb\xa3\x9bn\xbdy\xe2\xd4{\x04\x955b\xc9_t\x89\xdd\xe9\xe5*\x84N\x19'
  • sentinel: allows us to numerically identify when a process has been completed, functioning similarly to a queue. However, using join is a simpler approach.
def cube(numbers):
for number in numbers:
time.sleep(0.5)
value = number * number * number
print(f"{value} for {number}")

if __name__ == '__main__':
process_list = []
numbers = range(10)

for i in range(5):
p = Process(target=cube, args=(numbers,))
process_list.append(p)
p.start()

for process in process_list:
process.join()

print("completed")

"""
0 for 0
0 for 0
0 for 0
0 for 0
0 for 0
1 for 1
1 for 1
1 for 1
1 for 1
1 for 1
8 for 2
8 for 2
8 for 2
...
"""

#If we invoked join immediately after starting each, then we would be waiting
#for each process to finish
if __name__ == '__main__':
process_list = []
numbers = range(10)

for i in range(5):
p = Process(target=cube, args=(numbers,))
process_list.append(p)
p.start()
p.join()

"""
0 for 0
1 for 1
8 for 2
27 for 3
64 for 4
125 for 5
216 for 6
343 for 7
512 for 8
729 for 9
0 for 0
1 for 1
8 for 2
27 for 3
"""

Locks

A lock (also called Mutex) is a synchronization mechanism to prevent a race condition.

“When threads or processes attempt to simultaneously access a shared resource, and the accesses can result in an error, we often say the program has a race condition because the threads or processes are in a “race” to carry out an operation.” An Introduction to Parallel Programming

When a shared variable is present, we employ a lock to prevent any process from altering that variable until the initial process has completed its task with it.

We will explore this through two functions: one adds to a value, while the other subtracts from it.

def add(x: int) -> int:
for _ in range(50):
time.sleep(0.01)
x += 10
return x

def sub(x: int) -> int:
for _ in range(50):
time.sleep(0.01)
x -= 10
return x

if __name__ == '__main__':
x = 1000
print(f"Initial x: {x}")
x = add(x)
print(f"Added x: {x}")
x = sub(x)
print(f"Substracted x: {x}")

"""
Initial x: 1000
Added x: 1500
Substracted x: 1000
"""

Now, I will demonstrate using two processes for these functions without employing a lock.

import time
from multiprocessing import Process, Value, sharedctypes


def add(x: sharedctypes.Synchronized) -> int:
for _ in range(50):
time.sleep(0.01)
x.value += 10
return x.value

def sub(x: sharedctypes.Synchronized) -> int:
for _ in range(50):
time.sleep(0.01)
x.value -= 10
return x.value

if __name__ == '__main__':
x =Value('i', 1000)
add_p = Process(target=add, args=(x,))
sub_p = Process(target=sub, args=(x,))

add_p.start()
sub_p.start()

add_p.join()
sub_p.join()
print(x.value)

"""
840
"""

Initially, a shared type variable was defined, allocating shared memory. Its value can be accessed using the value attribute.

When two processes operate in parallel without a lock, the result differs from the initial example, due to a race condition in updating the shared variable.

Next, we will implement a lock to prevent this race condition:

import time
from multiprocessing import Process, Value, sharedctypes, Lock


def add(x: sharedctypes.Synchronized, lock: Lock) -> int:
for _ in range(50):
time.sleep(0.01)
lock.acquire()
x.value += 10
lock.release()
return x.value

def sub(x: sharedctypes.Synchronized, lock: Lock) -> int:
for _ in range(50):
time.sleep(0.01)
lock.acquire()
x.value -= 10
lock.release()
return x.value

if __name__ == '__main__':
x =Value('i', 1000)
lock = Lock()
add_p = Process(target=add, args=(x, lock))
sub_p = Process(target=sub, args=(x, lock))

add_p.start()
sub_p.start()

add_p.join()
sub_p.join()
print(x.value)

"""
1000
"""

Lock class has two methods:

  • acquire(block=True, timeout=None): It puts a lock on the process. That way, before anything else happens it conducts its operation. We can also provide a timeout limit on locking duration.
  • release(): Once the operation finishes, it releases the lock and allows the process to complete.

Pool

A pool manages worker processes.

The pool class can be used to manage a fixed number of workers for simple situations where the work to be done can be fragmented and distributed among workers independently.

The return values from the jobs are summed and returned as a list.

Pool arguments include the number of processes and a function to run when starting the task process (invoked once for each child).

  • Large data -> Pool | Small data -> Process
  • Pools maintain only the processes currently in execution in memory, whereas individual processes keep all data in memory regardless of their execution status. Consequently, a pool is more efficient for handling numerous task.
import time
import os
from multiprocessing import Pool

def cube(number):
total = 0
for i in range(number):
total += i * i * i
return total

if __name__ == '__main__':
cpus = os.cpu_count() #cpu count available in machine
numbers = range(5)
p = Pool(cpus-1) #a pool with cpu_count - 1 cores (default is all cores)
result = p.map(cube, numbers)
print(result)
p.close()
p.join()

"""
[0, 0, 1, 9, 36]
"""
import time
import os
from multiprocessing import Pool

def cube(number):
total = 0
for i in range(number):
total += i * i * i
return total

def cubing_multiprocess(numbers):
cpus = os.cpu_count()
numbers = range(5)
start = time.time()
p = Pool(cpus-1)
result = p.map(cube, numbers)
p.close()
p.join()
end = time.time() - start
print("Multiprocess time: ",end)

def cubing_serial(numbers):
start = time.time()
result = []
for i in numbers:
result.append(cube(i))
end = time.time() - start
print("Serial time: ",end)


if __name__ == '__main__':
#number = range(100)
number = range(10000)
cubing_multiprocess(number)
cubing_serial(number)

"""
for range 100:
Multiprocess time: 0.1426091194152832
Serial time: 0.0005009174346923828

for range 10000:
Multiprocess time: 0.15067601203918457
Serial time: 5.112032175064087

As the workload increases, the benefit of working in parallel
becomes more apparent.
"""

Methods:

  • apply(): It takes the task and executes it while blocking until the result is ready. Returns the result of the task.
def cube(number):
print(f"number: {number} ; process id: {os.getpid()}")
total = 0
for i in range(number):
total += i * i * i
return total

if __name__ == '__main__':
p = Pool()
result = p.apply(cube, args=(50,))
print("result: ",result)
result = p.apply(cube, args=(10,))
print("result: ",result)
p.close()
print("end")

"""
number: 50 ; process id: 19202
1500625
number: 10 ; process id: 19202
2025
end
"""
  • apply_async(): Asynchronous variant of apply method. Unlike apply, it doesn’t block. It returns an AsyncResult object. It can execute callback functions after the task is completed.
def cube(number):
print(f"number: {number} ; process id: {os.getpid()}")
total = 0
for i in range(number):
total += i * i * i
return total

def custom_callback(result):
print(f'Got result: {result}')

if __name__ == '__main__':
p = Pool()
result = p.apply_async(cube, callback=custom_callback,args=(50,))
print("result: ",result)
result = p.apply_async(cube, callback=custom_callback,args=(10,))
print("result: ",result)
p.close()
p.join()
print("end")

"""
result: <multiprocessing.pool.ApplyResult object at 0x7fb510360b80>
result: <multiprocessing.pool.ApplyResult object at 0x7fb510360bb0>
number: 50 ; process id: 19304
number: 10 ; process id: 19304
Got result: 1500625
Got result: 2025
end
"""

We can call the same function n times with apply_async.

#let's generate a dummy dataset using multiprocessing and apply_async


import os
import pickle
import random
import pandas as pd
from faker import Faker
from multiprocessing import Pool

N= 1_000_000
fake = Faker()

def get_a_dummy_record():
name = fake.name()
city = fake.city()
plate = fake.license_plate()
job = fake.job()
return [name, city, plate, job]

if __name__ == '__main__':
cpus = os.cpu_count()
pool = Pool(cpus-1)
results = []
for _ in range(N):
results(pool.apply_async(get_a_dummy_record))
pool.close()
pool.join()
data = []
for i, results in enumerate(results):
data.append(results.get())
df = pd.DataFrame(data=data)
  • map(): The apply method assigns one-time tasks to the pool, while map allows the same function to be applied iteratively to many arguments. However, map can only pass a single argument to each task.
result = p.map(cube, numbers)
#we have sent the same function for many iterable. cube will be rerun
#for each number in numbers
  • map_async(): It is a variant of the map method, which returns an AsyncResult object. A callback function can be specified, though it may only accept a single argument.
def custom_callback(result):
print(f'Got result: {result}')

def cube(number):
total = 0
for i in range(number):
total += i * i * i
return total

if __name__ == '__main__':
numbers = range(5)
p = Pool()
result = p.map_async(cube, numbers,callback=custom_callback)
print(result)
for value in result.get():
print(value)
p.close()
p.join()

"""
<multiprocessing.pool.MapResult object at 0x7faef0278a30>
Got result: [0, 0, 1, 9, 36]
0
0
1
9
36
"""
  • imap(): lazy map. Returns an iterator.
if __name__ == '__main__':
numbers = range(5)
p = Pool()
r = p.imap(cube, numbers)
print(r)
print(next(r))
print(next(r))
print(next(r))
print("___")
for result in p.imap(cube, numbers):
print(result)
p.close()
p.join()

"""
<multiprocessing.pool.IMapIterator object at 0x7feea8238b20>
0
0
1
___
0
0
1
9
36
"""
  • imap_unordered(): The ordering of the returned results is arbitrary. Other than that it is the same as imap.
  • starmap(): just like map, but allows multiple arguments.
def multiply(a, b):
return a * b

if __name__ == '__main__':
numbers = range(50)
p = Pool()
items = [(i, random()) for i in range(10)]
result = p.starmap(multiply, items)
print(result)
p.close()
p.join()
"""
[0.0, 0.5344124538146042, 0.32835463076056914, 2.4787556253795424, 3.591264897479123, 0.18079992820166868, 5.686081327214689, 0.936017685360898, 6.241723690027337, 0.47898336456594226]
"""

#if we would use the map instead of starmap, we would get
#TypeError: multiply() missing 1 required positional argument: 'b'
  • starmap_async(): Asynchronous variant of starmap method.
if __name__ == '__main__':
numbers = range(50)
p = Pool()
items = [(i, random()) for i in range(10)]
result = p.starmap_async(multiply, items)
print(result)
for value in result.get():
print(value)
p.close()
p.join()

"""
<multiprocessing.pool.MapResult object at 0x7fc370196760>
0.0
0.15467946959662104
1.848298292394195
0.8187329778385587
0.35192885312433964
1.5011637932143362
0.46087324835219023
6.432101499360136
2.970120169297701
1.8696971399038436
"""
  • close(): Once the tasks are completed, it releases the workers.
  • terminate(): Stops workers immediately without waiting for them to complete their tasks.
  • join(): waits for workers to exit. It can only be called after close or terminate methods are called.

Queue

We use queues to share data between processes. It’s a class to communicate between different processes.

from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Queue

def producer(queue):
print('Producer is running')
for i in range(10):
value = random()
sleep(value)
# add to the queue
print("Producer generated: ", value)
queue.put(value)
# all done
queue.put(None)
print('Producer finished')

def consumer(queue):
print('Consumer is running')
while True:
item = queue.get()
# check for stop
if item is None:
break
value = random()
new_val = item * value
print(f'>from producer: {item} and new value: {new_val}')
print('Consumer finished')

# entry point
if __name__ == '__main__':
queue = Queue()
consumer_p = Process(target=consumer, args=(queue,))
consumer_p.start()
producer_p = Process(target=producer, args=(queue,))
producer_p.start()
# wait for all processes to finish
producer_p.join()
consumer_p.join()

"""
Consumer is running
Producer is running
Producer generated: 0.23572440272864092
>from producer: 0.23572440272864092 and new value: 0.048785819023252984
Producer generated: 0.7656479584438751
>from producer: 0.7656479584438751 and new value: 0.5556933624110151
Producer generated: 0.45935918843841095
>from producer: 0.45935918843841095 and new value: 0.1940662477009978
Producer generated: 0.3775766218597175
>from producer: 0.3775766218597175 and new value: 0.31882165082562186
Producer generated: 0.2210393194445769
>from producer: 0.2210393194445769 and new value: 0.010097210955178933
Producer generated: 0.3365449280075862
>from producer: 0.3365449280075862 and new value: 0.3066809374677486
Producer generated: 0.031468177390468255
>from producer: 0.031468177390468255 and new value: 0.012702824353497563
Producer generated: 0.6236820349083204
>from producer: 0.6236820349083204 and new value: 0.026716990276386285
Producer generated: 0.012352625457066502
>from producer: 0.012352625457066502 and new value: 0.006689869352644401
Producer generated: 0.6995692026265108
Producer finished
>from producer: 0.6995692026265108 and new value: 0.09095476181355872
Consumer finished
"""

Pipe

Duplex (two-way) connected objects. Each side’s object has send and recv methods and transmits messages.

from multiprocessing import Process, Pipe

def foo(conn):
conn.send(["Hello","World",None])
conn.close()

if __name__ == '__main__':
parent, child = Pipe()
p = Process(target=foo, args=(child,))
p.start()
print(parent.recv())
p.join()

"""
['Hello', 'World', None]
"""

Manager

Lists, dictionaries, and queues can be shared through a Manager object. It controls a server process that holds Python objects and allows other processes to manipulate them using proxies.

from multiprocessing import Process, Manager, current_process

def worker(liste, x):
liste.append(x)
print("list by: ",current_process().name, liste)

if __name__ == '__main__':
m = Manager()
liste = m.list()

p1 = Process(name='worker1', target=worker, args=(liste, 30))
p2 = Process(name='worker2', target=worker, args=(liste, 70))

p1.start()
p2.start()

p1.join()
p2.join()

print("final list: ", liste)

"""
list by: worker1 [30]
list by: worker2 [30, 70]
final list: [30, 70]
"""

Conclusion

Multiprocessing enables the use of multiple processors simultaneously, offering a way to bypass GIL and thus maximize computational power. Python’s built-in multiprocessing module facilitates concurrent programming through its diverse features.

Read More…

Sources

https://docs.python.org/3/library/multiprocessing.html

https://superfastpython.com/

https://www.youtube.com/watch?v=TQx3IfCVvQ0

https://www.youtube.com/watch?v=GT10PnUFLlE&t=485s

https://www.youtube.com/watch?v=tKdolYuydVE

--

--