Skip to content
Advertisement

Unable to load S3-hosted CSV into Spark Dataframe on Jupyter Notebook

Unable to load S3-hosted CSV into Spark Dataframe on Jupyter Notebook.

I believe I uploaded the 2 required packages with the os.environ line below. If I did it incorrectly please show me how to correctly install it. The Jupyter Notebook is hosted on an EC2 instance, which is why I’m trying to pull the CSV from a S3 bucket.

Here is my code:

import os
import pyspark
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()
spark

Output:

enter image description here

Then i do:

%%time
df = spark.read.csv(f"s3://{AWS_BUCKET}/prices/{ts}_all_prices_{user}.csv", inferSchema = True, header = True)

And i get an error of:

WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3://blah-blah-blah/prices/1655999784.356597_blah_blah_blah.csv.
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"

Advertisement

Answer

Here is an example using s3a.

import os
import pyspark
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
filePath = "s3a://yourBucket/yourFile.parquet"
df = sqlContext.read.parquet(filePath) # Parquet file read example

Here is a more complete example taken from here to build the spark session with required config.

from pyspark.sql import SparkSession
def get_spark():
    spark = SparkSession.builder.master("local[4]").appName('SparkDelta') 
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") 
        .config("spark.jars.packages", 
                "io.delta:delta-core_2.12:1.1.0,"
                "org.apache.hadoop:hadoop-aws:3.2.2,"
                "com.amazonaws:aws-java-sdk-bundle:1.12.180") 
        .getOrCreate()
# This is mandate config on spark session to use AWS S3
       spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
# spark.sparkContext.setLogLevel("DEBUG")

return spark

User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement