Im currently working on a scraper where I am trying to figure out how I can assign proxies that are avaliable to use, meaning that if I use 5 threads and if thread-1 uses proxy A, no other threads should be able to access proxy A and should try do randomize all available proxy pool.
import random import time from threading import Thread import requests list_op_proxy = [ "http://test.io:12345", "http://test.io:123456", "http://test.io:1234567", "http://test.io:12345678" ] session = requests.Session() def handler(name): while True: try: session.proxies = { 'https': f'http://{random.choice(list_op_proxy)}' } with session.get("https://stackoverflow.com"): print(f"{name} - Yay request made!") time.sleep(random.randint(5, 10)) except requests.exceptions as err: print(f"Error! Lets try again! {err}") continue except Exceptions as err: print(f"Error! Lets debug! {err}") raise Exception for i in range(5): Thread(target=handler, args=(f'Thread {i}',)).start()
I wonder how I can create a way where I can use proxies that are available and not being used in any threads and “block” the proxy to not be able to be used to other threads and release once it is finished?
Advertisement
Answer
One way to go about this would be to just use a global
shared list, that holds the currently active proxies or to remove
the proxies from the list and readd them after the request is finished. You do not have to worry about concurrent access on the list, since CPython suffers from the GIL.
proxy = random.choice(list_op_proxy) list_op_proxy.remove(proxy) session.proxies = { 'https': f'http://{proxy}' } # ... do request list_op_proxy.append(proxy)
you could also do this using a queue and just pop and add to make it more efficient.
Using a Proxy Queue
Another option is to put the proxies into a queue
and get()
a proxy before each query, removing it from the available proxies, and the put()
it back after the request has been finished. This is a more efficient version of the above mentioned list approach.
First we need to initialize the proxy queue.
proxy_q = queue.Queue() for proxy in proxies: proxy_q.put(proxy)
Within the handler
we then get a proxy from the queue before performing a request. We perform the request and put the proxy back to the queue.
We are using block=True
, such that the queue
blocks the thread if there is no proxy currently available. Otherwise the thread would terminate with a queue.Empty
exception once all proxies are in use and a new one should be aquired.
def handler(name): global proxy_q while True: proxy = proxy_q.get(block=True) # we want blocking behaviour # ... do request proxy_q.put(proxy) # ... response handling can be done after proxy put to not # block it longer than required # do not forget to define a break condition
Using Queue and Multiprocessing
First you would initialize the manager
and put all your data into the queue and initialize another structure for collecting your results (here we initialize a shared list).
manager = multiprocessing.Manager() q = manager.Queue() for e in entities: q.put(e) print(q.qsize()) results = manager.list()
The you initialize the scraping processes:
for proxy in proxies: processes.append(multiprocessing.Process( target=scrape_function, args=(q, results, proxy) daemon=True))
And then start each of them
for w in processes: w.start()
lastly you join
every process to ensure that the main process is not terminated before the subprocesses are finished
for w in processes: w.join()
Inside the scrape_function
you then simply get
one item at a time and perform the request. The queue
object in the default configuration raises an queue.Empty
error when it is empty, so we are using an infinite while loop with a break condition catching the exception.
def scrape_function(q, results, proxy) session = requests.Session() session.proxies = { 'https': f'http://{proxy}' } while True: try: request_uri = q.get(block=False) with session.get("https://stackoverflow.com"): print(f"{name} - Yay request made!") results.append(result) time.sleep(random.randint(5, 10)) except queue.Empty: break
The results of each query are appended to the results list, which is also shared among the different processes.