Unable to load S3-hosted CSV into Spark Dataframe on Jupyter Notebook.
I believe I uploaded the 2 required packages with the os.environ line below. If I did it incorrectly please show me how to correctly install it. The Jupyter Notebook is hosted on an EC2 instance, which is why I’m trying to pull the CSV from a S3 bucket.
Here is my code:
import os
import pyspark
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()
spark
Output:
Then i do:
%%time
df = spark.read.csv(f"s3://{AWS_BUCKET}/prices/{ts}_all_prices_{user}.csv", inferSchema = True, header = True)
And i get an error of:
WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3://blah-blah-blah/prices/1655999784.356597_blah_blah_blah.csv. org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
Advertisement
Answer
Here is an example using s3a.
import os import pyspark os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell' from pyspark.sql import SQLContext from pyspark import SparkContext sc = SparkContext() sqlContext = SQLContext(sc) filePath = "s3a://yourBucket/yourFile.parquet" df = sqlContext.read.parquet(filePath) # Parquet file read example
Here is a more complete example taken from here to build the spark session with required config.
from pyspark.sql import SparkSession
def get_spark():
    spark = SparkSession.builder.master("local[4]").appName('SparkDelta') 
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") 
        .config("spark.jars.packages", 
                "io.delta:delta-core_2.12:1.1.0,"
                "org.apache.hadoop:hadoop-aws:3.2.2,"
                "com.amazonaws:aws-java-sdk-bundle:1.12.180") 
        .getOrCreate()
# This is mandate config on spark session to use AWS S3
       spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
# spark.sparkContext.setLogLevel("DEBUG")
return spark
 
						