Skip to content
Advertisement

How to assign values that are available to threads

Im currently working on a scraper where I am trying to figure out how I can assign proxies that are avaliable to use, meaning that if I use 5 threads and if thread-1 uses proxy A, no other threads should be able to access proxy A and should try do randomize all available proxy pool.

import random
import time
from threading import Thread

import requests

list_op_proxy = [
    "http://test.io:12345",
    "http://test.io:123456",
    "http://test.io:1234567",
    "http://test.io:12345678"
]

session = requests.Session()


def handler(name):
    while True:
        try:
            session.proxies = {
                'https': f'http://{random.choice(list_op_proxy)}'
            }
            with session.get("https://stackoverflow.com"):
                print(f"{name} - Yay request made!")

            time.sleep(random.randint(5, 10))
        except requests.exceptions as err:
            print(f"Error! Lets try again! {err}")
            continue

        except Exceptions as err:
            print(f"Error! Lets debug! {err}")
            raise Exception


for i in range(5):
    Thread(target=handler, args=(f'Thread {i}',)).start()

I wonder how I can create a way where I can use proxies that are available and not being used in any threads and “block” the proxy to not be able to be used to other threads and release once it is finished?

Advertisement

Answer

One way to go about this would be to just use a global shared list, that holds the currently active proxies or to remove the proxies from the list and readd them after the request is finished. You do not have to worry about concurrent access on the list, since CPython suffers from the GIL.

proxy = random.choice(list_op_proxy)
list_op_proxy.remove(proxy)
session.proxies = {
    'https': f'http://{proxy}'
}
# ... do request

list_op_proxy.append(proxy)

you could also do this using a queue and just pop and add to make it more efficient.

Using a Proxy Queue

Another option is to put the proxies into a queue and get() a proxy before each query, removing it from the available proxies, and the put() it back after the request has been finished. This is a more efficient version of the above mentioned list approach.

First we need to initialize the proxy queue.

proxy_q = queue.Queue()
for proxy in proxies:
    proxy_q.put(proxy)

Within the handler we then get a proxy from the queue before performing a request. We perform the request and put the proxy back to the queue.
We are using block=True, such that the queue blocks the thread if there is no proxy currently available. Otherwise the thread would terminate with a queue.Empty exception once all proxies are in use and a new one should be aquired.

def handler(name):
    global proxy_q
    while True:
        proxy = proxy_q.get(block=True) # we want blocking behaviour
        # ... do request
        proxy_q.put(proxy)
        # ... response handling can be done after proxy put to not
        # block it longer than required
        # do not forget to define a break condition

Using Queue and Multiprocessing

First you would initialize the manager and put all your data into the queue and initialize another structure for collecting your results (here we initialize a shared list).

manager = multiprocessing.Manager()
q = manager.Queue()
for e in entities:
   q.put(e)
print(q.qsize())
results = manager.list()

The you initialize the scraping processes:

for proxy in proxies:
    processes.append(multiprocessing.Process(
        target=scrape_function,
        args=(q, results, proxy)
        daemon=True))

And then start each of them

for w in processes:
    w.start()

lastly you join every process to ensure that the main process is not terminated before the subprocesses are finished

for w in processes:
    w.join()

Inside the scrape_function you then simply get one item at a time and perform the request. The queue object in the default configuration raises an queue.Empty error when it is empty, so we are using an infinite while loop with a break condition catching the exception.

def scrape_function(q, results, proxy)
    session = requests.Session()
    session.proxies = {
        'https': f'http://{proxy}'
    }
    while True:
        try:
            request_uri = q.get(block=False)
            with session.get("https://stackoverflow.com"):
                print(f"{name} - Yay request made!")
                results.append(result)
            time.sleep(random.randint(5, 10))
        except queue.Empty:
            break

The results of each query are appended to the results list, which is also shared among the different processes.

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement