Multiprocessing and multithreading

To begin with, let us clear up some terminlogy:

    • Concurrency is when two or more tasks can start, run, and complete in overlapping time periods. It doesn’t necessarily mean they’ll ever both be running at the same instant. Eg. multitasking on a single-core machine.

    • Parallelism is when two or more tasks are executed simultaneously.

    • A thread is a sequence of instructions within a process. It can be thought of as a lightweight process. Threads share the same memory space.

    • A process is an instance of a program running in a computer which can contain one or more threads. A process has its independant memory space.

The threading module is used for working with threads in Python.

The CPython implementation has a Global Interpreter Lock (GIL) which allows only one thread to be active in the interpreter at once. This means that threads cannot be used for parallel execution of Python code. While parallel CPU computation is not possible, parallel IO operations are possible using threads. This is because performing IO operations releases the GIL. To learn more about the GIL refer here.

What are threads used for in Python?

    • In GUI applications to keep the UI thread responsive

    • IO tasks (network IO or filesystem IO)

Threads should not be used for CPU bound tasks. Using threads for CPU bound tasks will actually result in worse performance compared to using a single thread.

The following example demonstrates the use of threads for filesystem IO.

A queue is used to store the files that need to be processed. A dictionary is used to store the input and output file names. The process_queue() function is used to retrieve items from the queue and perform the copy operation. The copy operation is done in the copy_op function using the shutil module.

import threading from queue import Queue import time import shutil print_lock = threading.Lock() def copy_op(file_data): with print_lock: print("Starting thread : {}".format(threading.current_thread().name)) mydata = threading.local() mydata.ip, mydata.op = next(iter(file_data.items())) shutil.copy(mydata.ip, mydata.op) with print_lock: print("Finished thread : {}".format(threading.current_thread().name)) def process_queue(): while True: file_data = compress_queue.get() copy_op(file_data) compress_queue.task_done() compress_queue = Queue() output_names = [{'v1.mp4' : 'v11.mp4'},{'v2.mp4' : 'v22.mp4'}] for i in range(2): t = threading.Thread(target=process_queue) t.daemon = True t.start() start = time.time() for file_data in output_names: compress_queue.put(file_data) compress_queue.join() print("Execution time = {0:.5f}".format(time.time() - start))

Note : The v1.mp4 and v2.mp4 were 250MB each.

7 to 10 seconds was the time taken when using one thread

4.5 to 5.5 seconds was the time taken when using two threads

So it’s clear that threads can be used for parallel filesystem IO.

The following example demonstrates the use of threads for network IO using the requests library. This is a toy example use case of threads for networking IO.

import threading from queue import Queue import requests import bs4 import time print_lock = threading.Lock() def get_url(current_url): with print_lock: print("\nStarting thread {}".format(threading.current_thread().name)) res = requests.get(current_url) res.raise_for_status() current_page = bs4.BeautifulSoup(res.text,"html.parser") current_title = current_page.select('title')[0].getText() with print_lock: print("{}\n".format(threading.current_thread().name)) print("{}\n".format(current_url)) print("{}\n".format(current_title)) print("\nFinished fetching : {}".format(current_url)) def process_queue(): while True: current_url = url_queue.get() get_url(current_url) url_queue.task_done() url_queue = Queue() url_list = ["https://www.google.com"]*5 for i in range(5): t = threading.Thread(target=process_queue) t.daemon = True t.start() start = time.time() for current_url in url_list: url_queue.put(current_url) url_queue.join() print(threading.enumerate()) print("Execution time = {0:.5f}".format(time.time() - start))

Single thread : 4 seconds

Two threads : 3 seconds

Five threads : 2 seconds

In network IO, most of the time is spent waiting for the response from the URL, so this is another use case where using threads improves performance.

Let me demonstrate why it’s a bad idea to use threads for CPU bound tasks. In the following program a queue holds numbers. The task is to find the sum of prime number less than or equal to the given number. This is clearly a CPU bound task.

import threading from queue import Queue import time list_lock = threading.Lock() def find_rand(num): sum_of_primes = 0 ix = 2 while ix <= num: if is_prime(ix): sum_of_primes += ix ix += 1 sum_primes_list.append(sum_of_primes) def is_prime(num): if num <= 1: return False elif num <= 3: return True elif num%2 == 0 or num%3 == 0: return False i = 5 while i*i <= num: if num%i == 0 or num%(i+2) == 0: return False i += 6 return True def process_queue(): while True: rand_num = min_nums.get() find_rand(rand_num) min_nums.task_done() min_nums = Queue() rand_list = [1000000, 2000000, 3000000] sum_primes_list = list() for i in range(2): t = threading.Thread(target=process_queue) t.daemon = True t.start() start = time.time() for rand_num in rand_list: min_nums.put(rand_num) min_nums.join() end_time = time.time() sum_primes_list.sort() print(sum_primes_list) print("Execution time = {0:.5f}".format(end_time - start))

Single thread : 25.5 seconds

Two threads : 28 seconds

The results are very clear : don’t use threads to improve performance of CPU bound tasks. You will always end up with worse performance.

For parallel execution of tasks, the multiprocessing module can be used.

In the following example we take the same task used above and process the inputs in parallel using the multiprocessing module.

from multiprocessing import Pool import time def sum_prime(num): sum_of_primes = 0 ix = 2 while ix <= num: if is_prime(ix): sum_of_primes += ix ix += 1 return sum_of_primes def is_prime(num): if num <= 1: return False elif num <= 3: return True elif num%2 == 0 or num%3 == 0: return False i = 5 while i*i <= num: if num%i == 0 or num%(i+2) == 0: return False i += 6 return True if __name__ == '__main__': start = time.time() with Pool(1) as p: print(p.map(sum_prime, [1000000, 2000000, 3000000])) print("Time taken = {0:.5f}".format(time.time() - start))

Using a single process : 27 seconds

Using two processes : 19 seconds

Using three processes : 18 seconds

We see a huge improvement from using a single process to using two processes. But the jump from two processes to three processes is minimal. The reason behind this is my hardware. I have a dual core (laptop) CPU with hyperthreading (the OS detects it as four CPUs due to hyperthreading). On a side note, I knew Intel’s hyperthreading was not a replacement for more cores. The above example is a noob verification.

So using the multiprocessing module results in the full utilization of the CPU.

Inter process communication can be achieved using queues or pipes. The Queue in the multiprocessing module works similar to the queue module used to demonstrate how the threading module works so I won’t cover it again.

Another useful communication mechanism between processes is a pipe. A pipe is a duplex (two way) communication channel. Note : Reading or writing to the same end of the pipe simultaneously can result in data corruption.

The following is a basic example:

import multiprocessing as mp import os def info(conn): conn.send("Hello from {}\nppid = {}\npid={}".format(mp.current_process().name, os.getppid(), os.getpid())) conn.close() if __name__ == '__main__': parent_conn, child_conn = mp.Pipe() p = mp.Process(target=info, args=(child_conn,)) p.daemon = True p.start() print(parent_conn.recv())

Output:

Hello from Process-1 ppid = 18621 pid=18622

Code:

References: