When threads are created they start executing concurrently, but in some scenarios we may need to prioritize certain threads over others to ensure the critical task complete first.

There are different ways we can do this

  1. Using Queues
  2. Implementing custom scheduler

Using Queues for prioritization

We can use a priority queue to manage task execution. We can use the queue.PriorityQueue class to assign properties to the tasks ensuring that high-priority tasks are executed first followed by the low-priority tasks.

See the following example for more information on this:

import threading
import queue

def worker(task_queue):
    while not task_queue.empty():
        priority, task = task_queue.get()
        task()
        task_queue.task_done()

def high_priority_task():
    print("High priority task is running")
    time.sleep(1)

def low_priority_task():
    print("Low priority task is running")
    time.sleep(1)

task_queue = queue.PriorityQueue()

task_queue.put((1, high_priority_task))  
task_queue.put((2, low_priority_task)) 

worker_thread = threading.Thread(target=worker, args=(task_queue,))
worker_thread.start()

task_queue.join()
worker_thread.join()

Implementing Custom Scheduler

For the advanced thread prioritization, we can also implement the customer scheduler to manage thread execution based on priorities. This involves creating a scheduler mechanism that decides which thread to run based on their priorities.

In the following example, I have written a scheduler which is used to manage task execution based on priorities, ensuring that higher-priority tasks are run before lower-priority ones.

import threading
import time
import queue

class Scheduler:
    def __init__(self):
        self.task_queue = queue.PriorityQueue()
        self.lock = threading.Lock()

    def add_task(self, priority, task):
        with self.lock:
            self.task_queue.put((priority, task))

    def run(self):
        while not self.task_queue.empty():
            priority, task = self.task_queue.get()
            task()
            self.task_queue.task_done()

def high_priority_task():
    print("High priority task is running")
    time.sleep(1)

def low_priority_task():
    print("Low priority task is running")
    time.sleep(1)

scheduler = Scheduler()

scheduler.add_task(1, high_priority_task)
scheduler.add_task(2, low_priority_task)

scheduler_thread = threading.Thread(target=scheduler.run)
scheduler_thread.start()
scheduler_thread.join()

Categorized in:

Tagged in: