When you remove an item from the JoinableQueue
without calling JoinableQueue.task_done()
, and then process that item, a semaphore overflow exception might be thrown. This is caused when the semaphore used to count the number of unfinished tasks overflows.
1def post_tasks_noncompliant(jobs, es_url):
2 import multiprocessing
3 import requests
4 jobs = multiprocessing.JoinableQueue()
5 while True:
6 try:
7 # Noncompliant: fails to call JoinableQueue.task_done()
8 # for each task removed from the JoinableQueue.
9 image, image_name, tag = jobs.get()
10 formatted_es_url = es_url.format(image_name)
11 files = {'file': image.content, 'tag': tag}
12 r = requests.post(formatted_es_url, files=files)
13 finally:
14 print("Task Done!!")
1def post_tasks_compliant(jobs, es_url):
2 import multiprocessing
3 import requests
4 jobs = multiprocessing.JoinableQueue()
5 while True:
6 try:
7 image, image_name, tag = jobs.get()
8 formatted_es_url = es_url.format(image_name)
9 files = {'file': image.content, 'tag': tag}
10 r = requests.post(formatted_es_url, files=files)
11 finally:
12 # Compliant: calls JoinableQueue.task_done()
13 # for each task removed from the JoinableQueue.
14 jobs.task_done()