I have been tasked lately, to ingest JSON responses onto Databricks Delta-lake. I have to hit the REST API endpoint URL 6500 times with different parameters and pull the responses.
I have tried two modules, ThreadPool and Pool from the multiprocessing library, to make each execution a little quicker.
ThreadPool:
- How to choose the number of threads for ThreadPool, when the Azure Databricks cluster is set to autoscale from 2 to 13 worker nodes?
Right now, I’ve set n_pool = multiprocessing.cpu_count(), will it make any difference, if the cluster auto-scales?
Pool
- When I use Pool to use processors instead of threads. I see the following errors randomly on each execution. Well, I understand from the error that Spark Session/Conf is missing and I need to set it from each process. But I am on Databricks with default spark session enabled, then why do I see these errors.
Py4JError: SparkConf does not exist in the JVM **OR** py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
- Lastly, planning to replace multiprocessing with ‘concurrent.futures.ProcessPoolExecutor’. Does it make any difference?
Advertisement
Answer
if you’re using thread pools, they will run only on the driver node, executors will be idle. Instead you need to use Spark itself to parallelize the requests. This is usually done by creating a dataframe with list of URLs (or parameters for URL if base URL is the same), and then use Spark user defined function to do actual requests. Something like this:
import urllib df = spark.createDataFrame([("url1", "params1"), ("url2", "params2")], ("url", "params")) @udf("body string, status int") def do_request(url: str, params: str): full_url = url + "?" + params # adjust this as required with urllib.request.urlopen(full_url) as f: status = f.status body = f.read().decode("utf-8") return {'status': status, 'body': body} res = df.withColumn("result", do_requests(col("url"), col("params")))
This will return dataframe with a new column called result
that will have two fields – status
and body
(JSON answer as string).