Skip to content
Advertisement

Multi-processing in Azure Databricks

I have been tasked lately, to ingest JSON responses onto Databricks Delta-lake. I have to hit the REST API endpoint URL 6500 times with different parameters and pull the responses.

I have tried two modules, ThreadPool and Pool from the multiprocessing library, to make each execution a little quicker.

ThreadPool:

  1. How to choose the number of threads for ThreadPool, when the Azure Databricks cluster is set to autoscale from 2 to 13 worker nodes?

Right now, I’ve set n_pool = multiprocessing.cpu_count(), will it make any difference, if the cluster auto-scales?

Pool

  1. When I use Pool to use processors instead of threads. I see the following errors randomly on each execution. Well, I understand from the error that Spark Session/Conf is missing and I need to set it from each process. But I am on Databricks with default spark session enabled, then why do I see these errors.
Py4JError: SparkConf does not exist in the JVM 
**OR** 
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
  1. Lastly, planning to replace multiprocessing with ‘concurrent.futures.ProcessPoolExecutor’. Does it make any difference?

Advertisement

Answer

if you’re using thread pools, they will run only on the driver node, executors will be idle. Instead you need to use Spark itself to parallelize the requests. This is usually done by creating a dataframe with list of URLs (or parameters for URL if base URL is the same), and then use Spark user defined function to do actual requests. Something like this:

import urllib

df = spark.createDataFrame([("url1", "params1"), ("url2", "params2")], 
                           ("url", "params"))

@udf("body string, status int")
def do_request(url: str, params: str):
  full_url = url + "?" + params # adjust this as required
  with urllib.request.urlopen(full_url) as f:
    status = f.status
    body = f.read().decode("utf-8")
  
  return {'status': status, 'body': body}
  

res = df.withColumn("result", do_requests(col("url"), col("params")))

This will return dataframe with a new column called result that will have two fields – status and body (JSON answer as string).

User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement