Semaphore overflow prevention Medium

When you remove an item from the JoinableQueue without calling JoinableQueue.task_done(), and then process that item, a semaphore overflow exception might be thrown. This is caused when the semaphore used to count the number of unfinished tasks overflows.

Detector ID
python/semaphore-overflow-prevention@v1.0
Category
Common Weakness Enumeration (CWE) external icon
-

Noncompliant example

1def post_tasks_noncompliant(jobs, es_url):
2    import multiprocessing
3    import requests
4    jobs = multiprocessing.JoinableQueue()
5    while True:
6        try:
7            # Noncompliant: fails to call JoinableQueue.task_done()
8            # for each task removed from the JoinableQueue.
9            image, image_name, tag = jobs.get()
10            formatted_es_url = es_url.format(image_name)
11            files = {'file': image.content, 'tag': tag}
12            r = requests.post(formatted_es_url, files=files)
13        finally:
14            print("Task Done!!")

Compliant example

1def post_tasks_compliant(jobs, es_url):
2    import multiprocessing
3    import requests
4    jobs = multiprocessing.JoinableQueue()
5    while True:
6        try:
7            image, image_name, tag = jobs.get()
8            formatted_es_url = es_url.format(image_name)
9            files = {'file': image.content, 'tag': tag}
10            r = requests.post(formatted_es_url, files=files)
11        finally:
12            # Compliant: calls JoinableQueue.task_done()
13            # for each task removed from the JoinableQueue.
14            jobs.task_done()