I have a scenario where I need to copy files from Azure Blob Storage
to SFTP
location in Databricks
Is there a way to achieve this scenario using pySpark
or Scala
?
Advertisement
Answer
Regarding the issue, please refer to the following steps (I use scala)
- Mount Azure Blob storage containers to DBFS
JavaScript
x
7
1
dbutils.fs.mount(
2
source = "<container-name>@<storage-account-name>.blob.core.windows.net",
3
mountPoint = "/mnt/blob",
4
extraConfigs = Map("fs.azure.account.key.<storage-account-name>.blob.core.windows.net" -> "<key>"))
5
6
dbutils.fs.ls("/mnt/blob")
7
- Copy these file to clusters local file system
JavaScript
1
5
1
%sh
2
3
cp -R /dbfs/mnt/blob /databricks/driver
4
ls -R /databricks/driver/blob
5
- Code. Before running the code, please add library
com.jcraft.jsch
vai Maven in databricks
JavaScript
1
32
32
1
import java.io.File
2
import scala.sys.process._
3
import com.jcraft.jsch._
4
def recursiveListFiles(f: File): Array[File] = {
5
val these = f.listFiles
6
these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
7
}
8
val jsch = new JSch()
9
val session = jsch.getSession("<usename>", "<host>",<port>) // Set your username and host
10
session.setPassword("<password>") // Set your password
11
val config = new java.util.Properties()
12
config.put("StrictHostKeyChecking", "no")
13
session.setConfig(config)
14
session.connect()
15
val channelSftp = session.openChannel("sftp").asInstanceOf[ChannelSftp]
16
channelSftp.connect()
17
18
val files =recursiveListFiles(new File("/databricks/driver/blob"))
19
20
files.foreach(file =>{
21
22
if(file.isFile()){
23
println(file.getPath())
24
channelSftp.put(file.getPath(),"/home/testqw/upload")
25
}
26
27
28
29
})
30
channelSftp.disconnect()
31
session.disconnect()
32
- Check with FileZilla
#Update
After we mount Azure blob, we can directly access file and upload it.
For example
JavaScript
1
33
33
1
import java.io.File
2
import scala.sys.process._
3
import com.jcraft.jsch._
4
def recursiveListFiles(f: File): Array[File] = {
5
val these = f.listFiles
6
these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
7
}
8
val jsch = new JSch()
9
val session = jsch.getSession("", "",22) // Set your username and host
10
session.setPassword("") // Set your password
11
val config = new java.util.Properties()
12
config.put("StrictHostKeyChecking", "no")
13
session.setConfig(config)
14
session.connect()
15
val channelSftp = session.openChannel("sftp").asInstanceOf[ChannelSftp]
16
channelSftp.connect()
17
val home = channelSftp.getHome()
18
19
val files =recursiveListFiles(new File("/dbfs/mnt/blob"))
20
21
files.foreach(file =>{
22
23
if(file.isFile()){
24
println(file.getPath())
25
channelSftp.put(file.getPath(),"/home/testqw/upload")
26
}
27
28
29
30
})
31
channelSftp.disconnect()
32
session.disconnect()
33