ThreadPoolExecutor

ThreadPoolExecutor

Executors and Features

The ThreadPoolExecutor Python class is used to create and manage thread pools and is provided in the concurrent.futures module. The ThreadPoolExecutor extends the Executor class and will return Future objects when it is called.

  • Executor: Parent class for the ThreadPoolExecutor that defines basic lifecycle operations for the pool.
  • Future: Object returned when submitting tasks to the thread pool that may complete later.

Executors

The ThreadPoolExecutor class extends the abstract Executor class.

The Executor class defines three methods used to control our thread pool

  • submit(): Dispatch a function to be executed and return a future object.
  • map(): Apply a function to an iterable of elements.
  • shutdown(): Shut down the executor.

The Executor is started when the class is created and must be shut down explicitly by calling shutdown(), which will release any resources held by the Executor.

submit() and map() functions are used to submit tasks to the Executor for asynchronous execution.

  • The map() function operates just like the built-in map() function and is used to apply a function to each element in an iterable object (e.g., list). Unlike the built-in map() function, each application of the function to an element will happen asynchronously instead of sequentially.
  • The submit() function takes a function, as well as any arguments, and will execute it asynchronously, although the call returns immediately and provides a Future object.

Features

A future is an object that represents a delayed result for an asynchronous task.

  • It is also sometimes called a promise or a delay.
  • It provides a context for the result of a task that may or may not be executing and a way of getting a result once it is available.
  • In Python, the Future object is returned from an Executor, such as a ThreadPoolExecutor when calling the submit() function to dispatch a task to be executed asynchronously.
  • In general, we do not create Future objects; we only receive them and we may need to call functions on them. There is always one Future object for each task sent into the ThreadPoolExecutor via a call to submit().

The Future object provides a number of helpful functions for inspecting the status of the task

  • cancelled(): Returns True if the task was cancelled before being executed.
  • running(): Returns True if the task is currently running.
  • done(): Returns True if the task has completed or was cancelled.

A running task cannot be cancelled and a done task could have been cancelled.

A Future object also provides access to the result of the task via the result() function. If an exception was raised while executing the task, it will be re-raised when calling the result() function or can be accessed via the exception() function.

  • result(): Access the result from running the task.
  • `exception(): Access any exception raised while running the task.

Both the result() and exception() functions allow a timeout to be specified as an argument, which is the number of seconds to wait for a return value if the task is not yet complete. If the timeout expires, then a TimeoutError will be raised.

If we want to have the thread pool automatically call a function once the task is completed, we can attach a callback to the Future object for the task via the add_done_callback() function.

  • add_done_callback(): Add a callback function to the task to be executed by the thread pool once the task is completed.
    • We can add more than one callback to each task and they will be executed in the order they were added. If the task has already completed before we add the callback, then the callback is executed immediately.
    • Any exceptions raised in the callback function will not impact the task or thread pool.

ThreadPoolExecutor Lifecycle

There are four main steps in the lifecycle of using the ThreadPoolExecutor class;

  • Create: Create the thread pool by calling the constructor ThreadPoolExecutor().
  • Submit: Submit tasks and get futures by calling submit() or map().
  • Wait: Wait and get results as tasks complete (optional).
  • Shut down: Shut down the thread pool by calling shutdown().
截屏2024-02-18 18.44.08

1. Create the Thread Pool

When an instance of a ThreadPoolExecutor is created, it must be configured with

  • the fixed number of threads in the pool

    • Default Total Threads = (Total CPUs) + 4

      if you have 4 CPUs, each with hyperthreading (most modern CPUs have this), then Python will see 8 CPUs and will allocate (8 + 4) or 12 threads to the pool by default.

    • It is typically not a good idea to have thousands of threads as it may start to impact the amount of available RAM and results in a large amount of switching between threads, which may result in worse performance.

  • a prefix used when naming each thread in the pool, and

  • the name of a function to call when initializing each thread along with any arguments for the function

# create a thread pool with the default number of worker threads
executor = ThreadPoolExecutor()

# create a thread pool with 10 worker threads
executor = ThreadPoolExecutor(max_workers=10)

2. Submit tasks to the thread pool

Once the thread pool has been created, you can submit tasks for asynchronous execution. There are two main approaches for submitting tasks defined on the Executor parent class: map() and submit().

Submit tasks with map

The map() function is an asynchronous version of the built-in map() function for applying a function to each element in an iterable, like a list. You can call the map() function on the pool and pass it the name of your function and the iterable. One common use case to use map() is to convert a for-loop to run using one thread per loop iteration:

# perform all tasks in parallel
results = pool.map(my_task, my_items) # does not block
  • my_task : the name of the function you want to execute
  • my_items: iterable of objects, each to be executed by the my_task function

The tasks will be queued up in the thread pool and executed by worker threads in the pool as they become available. The map() function will return an iterable immediately. This iterable can be used to access the results from the target task function as they are available in the order that the tasks were submitted (e.g. order of the iterable you provided).

Even though the tasks are executed concurrently, the executor.map() method ensures that the results are returned in the original order of the input iterable.
3

Example:

from time import sleep
from random import random
from concurrent.futures import ThreadPoolExecutor

def task(num):
    sleep(random())
    return num * 2


with ThreadPoolExecutor(10) as executor:
    # execute tasks concurrently and process results in order
    for result in executor.map(task, range(5)):
        # retrieve the result
        print(result)

Output:

0
2
4
6
8

You can also set a timeout when calling map() via the “timeout” argument in seconds if you wish to impose a limit on how long you’re willing to wait for each task to complete as you’re iterating, after which a TimeOut error will be raised.

# perform all tasks in parallel
# iterate over results as they become available
for result in executor.map(my_task, my_items, timeout=5):
	# wait for task to complete or timeout expires
	print(result)

Submit tasks with submit()

The submit() function submits one task to the thread pool for execution.

The function takes the name of the function to call and all arguments to the function, then returns a Future object immediately.

  • The Future object is a promise to return the results from the task (if any) and provides a way to determine if a specific task has been completed or not.
with ThreadPoolExecutor(10) as executor:
	# submit a task with arguments and get a future object
	future = executor.submit(my_task, arg1, arg2) # does not block
  • my_task : the name of the function you want to execute
  • arg1, arg2: the first and second arguments to pass to the my_task function

You can access the result of the task via the result() function on the returned Future object. This call will block until the task is completed.

# get the result from a future
result = future.result() # blocks

You can also set a timeout when calling result() via the**timeout** argument in seconds if you wish to impose a limit on how long you’re willing to wait for each task to complete, after which a TimeOut error will be raised.

# wait for task to complete or timeout expires
result = future.result(timeout=5) # blocks

3. Wait for Tasks to Complete (Optional)

The concurrent.futures module provides two module utility functions for waiting for tasks via their Future objects, which are only created when we call submit() to push tasks into the thread pool.

  • wait(): Wait on one or more Future objects until they are completed.
  • as_completed(): Returns Future objects from a collection as they complete their execution.

(These wait functions are optional to use, as you can wait for results directly after calling map() or submit() or wait for all tasks in the thread pool to finish.)

Both functions are useful to use with an idiom of dispatching multiple tasks into the thread pool via submit in a list compression:

# dispatch tasks into the thread pool and create a list of futures
futures = [executor.submit(my_task, my_data) for my_data in my_datalist]

Example:

from time import sleep
from random import random
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    return name

# start the thread pool
with ThreadPoolExecutor(10) as executor:
    # submit tasks and collect futures
    futures = [executor.submit(task, i) for i in range(10)]
    # process task results as they are available
    for future in as_completed(futures):
        # retrieve the result
        print(future.result())

Output:

6
7
9
8
4
0
3
2
5
1

Note: the output may vary from time to time, as the task() functions are executed cocurrently in different threads and the order of completion can not be guaranteed. Using as_completed() will print the results as soon as each task completes, regardless of the order in which they were submitted.

ThreadPoolExecutor Example

Reference