Skip to content
Advertisement

Calculate the minimum distance to destinations for each origin in pyspark

I have a list of origins and destinations along with their geo coordinates. I need to calculate the minimum distance for each origin to the destinations.

Below is my code:

import pyspark.sql.functions as F
from haversine import haversine_vector, Unit

data1 = [("A", (45.7597, 4.8422)), ("B", (46.7431, 5.8422))]
columns1 = ["Origin", "Origin_Geo"]
df1 = spark.createDataFrame(data=data1, schema=columns1)
data2 = [("Destin1", (48.8567, 2.3508)), ("Destin2", (40.7033962, -74.2351462))]
columns2 = ["Destination", "Destination_Geo"]
df2 = spark.createDataFrame(data=data2, schema=columns2)

df = df1.crossJoin(df2)

df.withColumn(
    "Distance", haversine_vector(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()

I got error like below:

IndexError: too many indices for array: array is 0-dimensional, but 2 were indexed

my question is:

  1. it seems that there is something wrong with withColumn('Distance', haversine_vector(F.col('Origin_Geo'), F.col('Destination_Geo'))). I do not know why. (I’m new to pyspark..)

  2. I have a long list of origins and destinations (both over 30K). Cross join generate numerous combinations of origins and destinations. I wonder if there is any more efficient way to get the min distance?

Thanks a lot in advance.

Advertisement

Answer

You are applying the haversine function to a column where it should be applied to a tuple or an array.

If you want to use this lib, you need to create an UDF and to install the haversine package on all your spark nodes.

from haversine import haversine
from pyspark.sql import functions as F, types as T

haversine_udf = F.udf(haversine, T.FloatType())

df.withColumn(
    "Distance", haversine_udf(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()

If you cannot install the package on every node, then you can simply use the built-in version of the function (cf. Haversine Formula in Python (Bearing and Distance between two GPS points)) – The formula is heavily dependent on the radius of the earth you choose

from math import radians, cos, sin, asin, sqrt
from pyspark.sql import functions as F, types as T

@F.udf(T.FloatType())
def haversine_udf(point1, point2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1 = point1
    lon2, lat2 = point2
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6372.8  # Radius of earth in kilometers. Use 3956 for miles
    return c * r

df.withColumn(
    "Distance", haversine_udf(F.col("Origin_Geo"), F.col("Destination_Geo"))
).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
+------+------------+                                                           
|Origin|Min_Distance|
+------+------------+
|     B|   351.08905|
|     A|   392.32755|
+------+------------+
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement