I have a list of origins and destinations along with their geo coordinates. I need to calculate the minimum distance for each origin to the destinations.
Below is my code:
import pyspark.sql.functions as F from haversine import haversine_vector, Unit data1 = [("A", (45.7597, 4.8422)), ("B", (46.7431, 5.8422))] columns1 = ["Origin", "Origin_Geo"] df1 = spark.createDataFrame(data=data1, schema=columns1) data2 = [("Destin1", (48.8567, 2.3508)), ("Destin2", (40.7033962, -74.2351462))] columns2 = ["Destination", "Destination_Geo"] df2 = spark.createDataFrame(data=data2, schema=columns2) df = df1.crossJoin(df2) df.withColumn( "Distance", haversine_vector(F.col("Origin_Geo"), F.col("Destination_Geo")) ).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
I got error like below:
IndexError: too many indices for array: array is 0-dimensional, but 2 were indexed
my question is:
it seems that there is something wrong with
withColumn('Distance', haversine_vector(F.col('Origin_Geo'), F.col('Destination_Geo')))
. I do not know why. (I’m new to pyspark..)I have a long list of origins and destinations (both over 30K). Cross join generate numerous combinations of origins and destinations. I wonder if there is any more efficient way to get the min distance?
Thanks a lot in advance.
Advertisement
Answer
You are applying the haversine
function to a column where it should be applied to a tuple or an array.
If you want to use this lib, you need to create an UDF and to install the haversine package on all your spark nodes.
from haversine import haversine from pyspark.sql import functions as F, types as T haversine_udf = F.udf(haversine, T.FloatType()) df.withColumn( "Distance", haversine_udf(F.col("Origin_Geo"), F.col("Destination_Geo")) ).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show()
If you cannot install the package on every node, then you can simply use the built-in version of the function (cf. Haversine Formula in Python (Bearing and Distance between two GPS points)) – The formula is heavily dependent on the radius of the earth you choose
from math import radians, cos, sin, asin, sqrt from pyspark.sql import functions as F, types as T @F.udf(T.FloatType()) def haversine_udf(point1, point2): """ Calculate the great circle distance between two points on the earth (specified in decimal degrees) """ # convert decimal degrees to radians lon1, lat1 = point1 lon2, lat2 = point2 lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) # haversine formula dlon = lon2 - lon1 dlat = lat2 - lat1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * asin(sqrt(a)) r = 6372.8 # Radius of earth in kilometers. Use 3956 for miles return c * r df.withColumn( "Distance", haversine_udf(F.col("Origin_Geo"), F.col("Destination_Geo")) ).groupBy("Origin").agg(F.min("Distance").alias("Min_Distance")).show() +------+------------+ |Origin|Min_Distance| +------+------------+ | B| 351.08905| | A| 392.32755| +------+------------+