Unable to load S3-hosted CSV into Spark Dataframe on Jupyter Notebook.
I believe I uploaded the 2 required packages with the os.environ
line below. If I did it incorrectly please show me how to correctly install it. The Jupyter Notebook is hosted on an EC2 instance, which is why I’m trying to pull the CSV from a S3 bucket.
Here is my code:
JavaScript
x
7
1
import os
2
import pyspark
3
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'
4
from pyspark.sql import SparkSession
5
spark = SparkSession.builder.appName('test').getOrCreate()
6
spark
7
Output:
Then i do:
JavaScript
1
3
1
%%time
2
df = spark.read.csv(f"s3://{AWS_BUCKET}/prices/{ts}_all_prices_{user}.csv", inferSchema = True, header = True)
3
And i get an error of:
JavaScript
1
3
1
WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3://blah-blah-blah/prices/1655999784.356597_blah_blah_blah.csv.
2
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
3
Advertisement
Answer
Here is an example using s3a.
JavaScript
1
10
10
1
import os
2
import pyspark
3
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'
4
from pyspark.sql import SQLContext
5
from pyspark import SparkContext
6
sc = SparkContext()
7
sqlContext = SQLContext(sc)
8
filePath = "s3a://yourBucket/yourFile.parquet"
9
df = sqlContext.read.parquet(filePath) # Parquet file read example
10
Here is a more complete example taken from here to build the spark session with required config.
JavaScript
1
21
21
1
from pyspark.sql import SparkSession
2
def get_spark():
3
spark = SparkSession.builder.master("local[4]").appName('SparkDelta')
4
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
5
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
6
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
7
.config("spark.jars.packages",
8
"io.delta:delta-core_2.12:1.1.0,"
9
"org.apache.hadoop:hadoop-aws:3.2.2,"
10
"com.amazonaws:aws-java-sdk-bundle:1.12.180")
11
.getOrCreate()
12
# This is mandate config on spark session to use AWS S3
13
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
14
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
15
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
16
spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
17
# spark.sparkContext.setLogLevel("DEBUG")
18
19
return spark
20
21