Skip to content
Advertisement

Databricks – Autoloader – Not Terminating?

I’m new to databricks and I have several azure blob .parquet locations I’m pulling data from and want to put through the autoloader so I can “create table … using delta location ”” in SQL in another step. (Each parquet file is in its own directory at the parent blob dir, so we will iterate over all dirs in the blob location and grab their .parquet contents).

But the problem is, even though my test files only have 3 records in them, this process never terminates. It should basically finish almost instantly, but will never end unless I set a timeout. The script below works in loading data, but it will never ‘end’ even though it has reached the end of my files.

I was originally just doing .start() in the for loop, then I moved over to storing each stream in a list and starting them and waiting on them, thinking that would fix the issue, but it just hangs around for the max timeout, which won’t scale once I start pushing real data through here. I’m just “guessing” 15s is more than enough time to terminate.

I can manually terminate the script, or make it timeout as I’ve done here, but the data loads probably in a few seconds right now and then hangs around and does nothing. Why doesn’t it know to terminate? I know this because I can cancel the script after 10 seconds of execution and then select from my freshly recreated tables. So for some reason the .start() doesn’t terminate on its own? I shouldn’t need to tell it to terminate. What am I doing wrong?

Any suggestions/advice would be appreciated, as looking over the DataQuery/Stream classes provided by Databricks, and several tutorials, I think I’m doing it mostly right? But I’m clearly missing something obvious, here.

from pathlib import Path
import os
import time
import types
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<azure client id>",
           "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="<scope>",key="<your key>"),
           "fs.azure.account.oauth2.client.endpoint": https://login.microsoftonline.com/<tenantId>/oauth2/token}

#commented out things really only have to be run once
#dbutils.fs.unmount("/mounted/data_location")
#dbutils.fs.mount(
#  source = "abfss://data_location@<blob>.dfs.core.windows.net/",
#  mount_point = "/mounted/data_location",
#  extra_configs = configs)

#dbutils.fs.mount(
#  source = "abfss://<location>@<blob>.dfs.core.windows.net/",
#  mount_point = "/mounted/dbfs/",
#  extra_configs = configs)

#dbutils.fs.mount(
#  source = "abfss://<location>@<blob>.dfs.core.windows.net/",
#  mount_point = "/mounted/tmp/",
#  extra_configs = configs)

#create a unique dir for each schema load attempt
snapshot = str(int(time.time()))
#source directory for data
info = dbutils.fs.ls("/mounted/data_location")
block_sz = 1024
threads = []

#played with these & re-using schema location instead of unique location
#.option("overwriteSchema", "true") 
#.option("mergeSchema", "true") 
#.option("allowOverwrites", "true") 

for dir in info:
  display('Starting:',dir.name[:-1])
  #autoload destination
  target_location = dir.path.replace('/mounted/data_location','/mounted/dbfs/autoload')
  display('Clearing:', target_location)
  #assume it already exists and wipe it out for now for testing
  dbutils.fs.rm(target_location.replace('dbfs:',''), True)
  schema_location = dir.path.replace('/mounted/data_location','/mounted/dbfs/schema/' + snapshot)
  display(target_location, ' ', schema_location)
  Path(schema_location).mkdir(parents=True, exist_ok=True)
  Path(target_location).mkdir(parents=True, exist_ok=True)
  thread = spark.readStream.format("cloudFiles") 
    .option("cloudFiles.format", "parquet") 
    .option("cloudFiles.inferColumountedypes", "True") 
    .option("cloudFiles.schemaLocation", schema_location) 
    .option("pathGlobfilter", "*.parquet") 
    .option('cloudFiles.useIncrementalListing', 'false') 
    .load(dir.path) 
    .writeStream 
    .option("checkpointLocation", schema_location) 
    .option("parquet.block.size", block_sz)

  item = types.SimpleNamespace()
  item.thread = thread
  item.path = target_location
  threads.append(item)
  display('Constructed',dir)

for t in threads:
  work = t.thread.start(t.path)
  t.work = work

for t in threads:
  display(t)
  t.work.awaitTermination(15) #it should already be done in a few seconds, but waits the full 15
  t.work.stop()

display('Script end.')
#dbutils.notebook.exit("complete")``

Advertisement

Answer

The problem is that you don’t specify a trigger in your streams, so your streams are running forever by default. Add .trigger(availableNow=True) (Spark 3.3 as I remember), or .trigger(once=True) (earlier versions)

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement