Skip to content
Advertisement

Tag: pyspark

Median and quantile values in Pyspark

In my dataframe I have an age column. The total number of rows are approx 77 billion. I want to calculate the quantile values of that column using PySpark. I have some code but the computation time is huge (maybe my process is very bad). Is there any good way to improve this? Dataframe example: What I have done so

PySpark Dataframe melt columns into rows

As the subject describes, I have a PySpark Dataframe that I need to melt three columns into rows. Each column essentially represents a single fact in a category. The ultimate goal is to aggregate the data into a single total per category. There are tens of millions of rows in this dataframe, so I need a way to do the

PySpark 2.4 – Read CSV file with custom line separator

Support for custom line separators (for various text file formats) was added to spark in 2017 (see: https://github.com/apache/spark/pull/18581). … or maybe it wasn’t added in 2017 – or ever (see: https://github.com/apache/spark/pull/18304) Today, with Pyspark 2.4.0 I am unable to use custom line separators to parse CSV files. Here’s some code: Here’s two sample csv files: one.csv – lines are separated

PySpark 2.x: Programmatically adding Maven JAR Coordinates to Spark

The following is my PySpark startup snippet, which is pretty reliable (I’ve been using it a long time). Today I added the two Maven Coordinates shown in the spark.jars.packages option (effectively “plugging” in Kafka support). Now that normally triggers dependency downloads (performed by Spark automatically): However the plugins aren’t downloading and/or loading when I run the snippet (e.g. ./python -i

Pyspark: how to duplicate a row n time in dataframe?

I’ve got a dataframe like this and I want to duplicate the row n times if the column n is bigger than one: And transform like this: I think I should use explode, but I don’t understand how it works… Thanks Answer The explode function returns a new row for each element in the given array or map. One way

Pyspark- Subquery in a case statement

I am trying to run a subquery inside a case statement in Pyspark and it is throwing an exception. I am trying to create a new flag if id in one table is present in a different table. Is this even possible in pyspark? Here is the error: I am using Spark 2.2.1. Answer This appears to be the latest

collect_list by preserving order based on another variable

I am trying to create a new column of lists in Pyspark using a groupby aggregation on existing set of columns. An example input data frame is provided below: The expected output is: The values within a list are sorted by the date. I tried using collect_list as follows: But collect_list doesn’t guarantee order even if I sort the input

Advertisement