I have a scenario where I need to copy files from Azure Blob Storage
to SFTP
location in Databricks
Is there a way to achieve this scenario using pySpark
or Scala
?
Advertisement
Answer
Regarding the issue, please refer to the following steps (I use scala)
- Mount Azure Blob storage containers to DBFS
dbutils.fs.mount( source = "<container-name>@<storage-account-name>.blob.core.windows.net", mountPoint = "/mnt/blob", extraConfigs = Map("fs.azure.account.key.<storage-account-name>.blob.core.windows.net" -> "<key>")) dbutils.fs.ls("/mnt/blob")
- Copy these file to clusters local file system
%sh cp -R /dbfs/mnt/blob /databricks/driver ls -R /databricks/driver/blob
- Code. Before running the code, please add library
com.jcraft.jsch
vai Maven in databricks
import java.io.File import scala.sys.process._ import com.jcraft.jsch._ def recursiveListFiles(f: File): Array[File] = { val these = f.listFiles these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles) } val jsch = new JSch() val session = jsch.getSession("<usename>", "<host>",<port>) // Set your username and host session.setPassword("<password>") // Set your password val config = new java.util.Properties() config.put("StrictHostKeyChecking", "no") session.setConfig(config) session.connect() val channelSftp = session.openChannel("sftp").asInstanceOf[ChannelSftp] channelSftp.connect() val files =recursiveListFiles(new File("/databricks/driver/blob")) files.foreach(file =>{ if(file.isFile()){ println(file.getPath()) channelSftp.put(file.getPath(),"/home/testqw/upload") } }) channelSftp.disconnect() session.disconnect()
- Check with FileZilla
#Update
After we mount Azure blob, we can directly access file and upload it.
For example
import java.io.File import scala.sys.process._ import com.jcraft.jsch._ def recursiveListFiles(f: File): Array[File] = { val these = f.listFiles these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles) } val jsch = new JSch() val session = jsch.getSession("", "",22) // Set your username and host session.setPassword("") // Set your password val config = new java.util.Properties() config.put("StrictHostKeyChecking", "no") session.setConfig(config) session.connect() val channelSftp = session.openChannel("sftp").asInstanceOf[ChannelSftp] channelSftp.connect() val home = channelSftp.getHome() val files =recursiveListFiles(new File("/dbfs/mnt/blob")) files.foreach(file =>{ if(file.isFile()){ println(file.getPath()) channelSftp.put(file.getPath(),"/home/testqw/upload") } }) channelSftp.disconnect() session.disconnect()