Unable to load S3-hosted CSV into Spark Dataframe on Jupyter Notebook.
I believe I uploaded the 2 required packages with the os.environ
line below. If I did it incorrectly please show me how to correctly install it. The Jupyter Notebook is hosted on an EC2 instance, which is why I’m trying to pull the CSV from a S3 bucket.
Here is my code:
import os import pyspark os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell' from pyspark.sql import SparkSession spark = SparkSession.builder.appName('test').getOrCreate() spark
Output:
Then i do:
%%time df = spark.read.csv(f"s3://{AWS_BUCKET}/prices/{ts}_all_prices_{user}.csv", inferSchema = True, header = True)
And i get an error of:
WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3://blah-blah-blah/prices/1655999784.356597_blah_blah_blah.csv. org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
Advertisement
Answer
Here is an example using s3a.
import os import pyspark os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell' from pyspark.sql import SQLContext from pyspark import SparkContext sc = SparkContext() sqlContext = SQLContext(sc) filePath = "s3a://yourBucket/yourFile.parquet" df = sqlContext.read.parquet(filePath) # Parquet file read example
Here is a more complete example taken from here to build the spark session with required config.
from pyspark.sql import SparkSession def get_spark(): spark = SparkSession.builder.master("local[4]").appName('SparkDelta') .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0," "org.apache.hadoop:hadoop-aws:3.2.2," "com.amazonaws:aws-java-sdk-bundle:1.12.180") .getOrCreate() # This is mandate config on spark session to use AWS S3 spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true") spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain") spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") # spark.sparkContext.setLogLevel("DEBUG") return spark