Skip to content
Advertisement

What are alternative methods for pandas quantile and cut in pyspark 1.6

I’m newbie to pyspark. I have pandas code like below.

bindt = df[df[var].notnull()][var].quantile([0,.1,.2,.3,.4,.5,.6,.7,.8,.9,1]).unique()

df['{0}_quartile'.format(var)] = pd.cut(df[var], bindt, labels=False, include_lowest=True )

I have found ‘approxQuantile’ in pyspark 2.x but I didn’t find any such in pyspark 1.6.0

My sample input:

df.show()

+-----------+----------+---------------+--------------+------------------------+
|  id       | col_1    |col_2          |col_3         |col_4                   |
+-----------+----------+---------------+--------------+------------------------+
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.660000|
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.900000|
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.660000|
|1.10919E+16|  36718.55|           null|          null|                0.860000|
|1.10919E+16|  36718.55|           null|          null|                0.780000|
|1.10919E+16|  36718.55|           null|          null|                0.660000|
|1.10919E+16|  36718.55|           null|          null|                0.900000|
|1.10919E+16|  36718.55|           null|          null|                0.660000|

df.collect()

[Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.080000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.780000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.780000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.860000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.330000'))]

I have to loop the above logic for all input columns.

for var in df.columns:
    bindt = df[df[var].notnull()][var].quantile([0,.1,.2,.3,.4,.5,.6,.7,.8,.9,1]).unique()    
    df['{0}_quartile'.format(var)] = pd.cut(df[var], bindt, labels=False, include_lowest=True )

Could anyone please suggest how to rewrite above code in pyspark 1.6 dataframe.

Thanks in advance

Advertisement

Answer

If you’re using pyspark 2.x, you can use QuantileDiscretizer from ml lib which uses approxQuantile() and Bucketizer under the hood.

However, since you’re using pyspark 1.6.x you need to:

1. Find the quantile values of a column

You can find the quantile values in two ways:

  1. Compute the percentile of a column by computing the percent_rank() and extract the column values which has percentile value close to the quantile that you want

  2. Follow the methods in this answer which explains how to perform quantile approximations with pyspark < 2.0.0

Here’s my example implementation of approximating quantile values:

from pyspark.sql import functions as F
from pyspark.sql import Window

def compute_quantiles(df, col, quantiles):
  quantiles = sorted(quantiles)

  # 1. compute percentile
  df = df.withColumn("percentile", F.percent_rank().over(Window.orderBy(col)))

  # 2. categorize quantile based on the desired quantile and compute errors
  df = df.withColumn("percentile_cat1", F.lit(-1.0))
  df = df.withColumn("percentile_err1", F.lit(-1.0))
  df = df.withColumn("percentile_cat2", F.lit(-1.0))
  df = df.withColumn("percentile_err2", F.lit(-1.0))

  # check percentile with the lower boundaries
  for idx in range(0, len(quantiles)-1):
    q = quantiles[idx]
    df = df.withColumn("percentile_cat1", F
                       .when( (F.col("percentile_cat1") == -1.0) & 
                             (F.col("percentile") <= q), q)
                       .otherwise(F.col("percentile_cat1")))
    df = df.withColumn("percentile_err1", F
                       .when( (F.col("percentile_err1") == -1.0) & 
                             (F.col("percentile") <= q), 
                             F.pow(F.col("percentile") - q, 2))
                       .otherwise(F.col("percentile_err1")))

  # assign the remaining -1 values in the error to the largest squared error of 1
  df = df.withColumn("percentile_err1", F
                     .when(F.col("percentile_err1") == -1.0, 1)
                     .otherwise(F.col("percentile_err1")))

  # check percentile with the upper boundaries
  for idx in range(1, len(quantiles)):
    q = quantiles[idx]
    df = df.withColumn("percentile_cat2", F
                       .when((F.col("percentile_cat2") == -1.0) & 
                             (F.col("percentile") <= q), q)
                       .otherwise(F.col("percentile_cat2")))
    df = df.withColumn("percentile_err2",F
                       .when((F.col("percentile_err2") == -1.0) & 
                             (F.col("percentile") <= q), 
                             F.pow(F.col("percentile") - q, 2))
                       .otherwise(F.col("percentile_err2")))

  # assign the remaining -1 values in the error to the largest squared error of 1
  df = df.withColumn("percentile_err2", F
                     .when(F.col("percentile_err2") == -1.0, 1)
                     .otherwise(F.col("percentile_err2")))

  # select the nearest quantile to the percentile
  df = df.withColumn("percentile_cat", F
                     .when(F.col("percentile_err1") < F.col("percentile_err2"), 
                           F.col("percentile_cat1"))
                     .otherwise(F.col("percentile_cat2")))
  df = df.withColumn("percentile_err", F
                     .when(F.col("percentile_err1") < F.col("percentile_err2"), 
                           F.col("percentile_err1"))
                     .otherwise(F.col("percentile_err2")))

  # 3. approximate quantile values by choosing the value with the lowest error at each percentile category
  df = df.withColumn("approx_quantile", F
                     .first(col).over(Window
                                      .partitionBy("percentile_cat")
                                      .orderBy(F.asc("percentile_err"))))

  return df

def extract_quantiles(df):
  df_quantiles = df.select("percentile_cat", "approx_quantile").distinct()
  rows = df_quantiles.collect()
  quantile_values = [ row.approx_quantile for row in rows ]

  return quantile_values

What I wanted to achieve from above is computing the percentile of each row in the column, and categorizing it to the nearest quantile. Categorizing a percentile to nearest quantile can be done by choosing the lowest quantile category which has the lowest difference (squared error) to the percentile.

1. Computing Percentile

First, I compute the percentile of a column using the percent_rank(), a Window function in pyspark. You can think of Window as a partition specification for your data. Since percent_rank() is a Window function, so you need to pass in the Window.

2. Categorize percentile to quantile boundaries and compute errors

The nearest quantile category to a percentile can be below, equal to or above it. Hence, I need to compute the errors twice: first to compare the percentile with the lower quantile bounds, and the second to compare it with the upper quantile bounds. Note the ≤ operator is used to check whether the percentile is less than or equal to the boundaries. After knowing the direct upper and lower quantile boundaries of a percentile, we can assign a percentile to the nearest quantile category by choosing either the quantile below-or-equal or above-or-equal category which has the lowest error.

3. Approximate quantile values

Once we know all the closest quantile categories per each percentile, we can then approx the quantile values: it’s the value which has the lowest errors at each quantile category. This approx quantile values can be computed using first() function at each category partition using Window. Next, to extract the quantile values, we can just select the unique percentileCategory-approxQuantileValue pairs from the dataframe.


After testing for my data (~10000 rows) with desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0], I found that my example implementation is quite close to approxQuantile results. Both result values get even closer as I decrease the error supplied to approxQuantile.

Using extract_quantiles(compute_quantile(df, col, quantiles)):

enter image description here

Using approxQuantile:

enter image description here

2. Use Bucketizer

After finding the quantile values, you can use pyspark’s Bucketizer to bucketize values based on the quantile. Bucketizer is available in both pyspark 1.6.x [1][2] and 2.x [3][4]

Here is an example of how you can perform bucketization:

from pyspark.ml.feature import Bucketizer

bucketedData = df
desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] # must be sorted

for col in df.columns:
  quantile_values = extract_quantiles(compute_quantiles(df, col, desired_quantiles))
  splits = [ boundary_values ] # replace this with quantile_values

  bucketizer = Bucketizer()
    .setInputCol(col)
    .setOutputCol("{}_quantile".format(col))
    .setSplits(splits)
  bucketedData = bucketizer.transform(bucketedData)

You can replace value_boundaries with the quantile values you found in step 1 or any bucket split range that you desire. When you’re using bucketizer, the whole column value range must be covered within the splits. Otherwise, values outside the splits specified will be treated as errors. Infinite values such as -float("inf"), float("inf") must be explicitly provided to cover all the floating values if you’re unsure about the value boundaries of your data.

User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement