I have a Spark dataframe sdf with GPS points that looks like this:
d = {'user': ['A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'C', 'C', 'C', 'C', 'C', 'A', 'A'],
'lat': [37.75243634842733, 37.75344580658182, 37.75405656449232, 37.753649393112181,37.75409897804892, 37.753937806404586, 37.72767062183685, 37.72710631810977, 37.72605407110467, 37.71141865080228, 37.712199505873926, 37.713285899241896, 37.71428740401767, 37.712810604103346, 37.75405656449232, 37.753649393112181],
'lon': [-122.41924881935118, -122.42006421089171, -122.419216632843, -122.41784334182738, -122.4169099330902, -122.41549372673035, -122.3878937959671, -122.3884356021881, -122.38841414451599, -122.44688630104064, -122.44474053382874, -122.44361400604248, -122.44260549545288, -122.44156479835509, -122.4169099330902, -122.41549372673035],
'date': ['2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-04', '2018-02-04'],
'radius': [10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10]}
pdf = pd.DataFrame(data=d)
sdf = spark.createDataFrame(pdf)
+----+------------------+-------------------+----------+------+
|user| lat| lon| date|radius|
+----+------------------+-------------------+----------+------+
| A| 37.75243634842733|-122.41924881935118|2018-02-03| 10|
| A| 37.75344580658182|-122.42006421089171|2018-02-03| 10|
| A| 37.75405656449232| -122.419216632843|2018-02-03| 10|
| A|37.753649393112184|-122.41784334182738|2018-02-03| 10|
| A| 37.75409897804892| -122.4169099330902|2018-02-03| 10|
| A|37.753937806404586|-122.41549372673035|2018-02-03| 10|
| B| 37.72767062183685| -122.3878937959671|2018-02-03| 10|
| B| 37.72710631810977| -122.3884356021881|2018-02-03| 10|
| B| 37.72605407110467|-122.38841414451599|2018-02-03| 10|
| C| 37.71141865080228|-122.44688630104064|2018-02-03| 10|
| C|37.712199505873926|-122.44474053382874|2018-02-03| 10|
| C|37.713285899241896|-122.44361400604248|2018-02-03| 10|
| C| 37.71428740401767|-122.44260549545288|2018-02-03| 10|
| C|37.712810604103346|-122.44156479835509|2018-02-03| 10|
| A| 37.75405656449232| -122.4169099330902|2018-02-04| 10|
| A|37.753649393112184|-122.41549372673035|2018-02-04| 10|
+----+------------------+-------------------+----------+------+
Since the spark dataframe contains different GPS trajectories generated by different users on different days, I want to write a function that loops through this df and feeds the corresponding set of coordinates to the (OSRM) request per date and per user group and not all at once.
from typing import Dict, Any, List, Tuple
import pyspark.sql.functions as F
import requests
# Format coordinates into a concatenated string formatted for the OSRM server
def format_coords(df):
coords = df.agg(F.concat_ws(';', F.collect_list(F.format_string('%f,%f', 'lon', 'lat')))).head()[0]
return(coords)
# Format dictionary of additional options to the OSRM request into a concatenated string format.
def format_options(options: Dict[str, str]) -> str:
options = "&".join([f"{k}={v}" for k, v in options.items()])
return options
# Format radiuses into a concatenated string formatted for the OSRM server
def format_radiuses(df):
radiuses = "&radiuses=" + df.agg(F.concat_ws(';', F.collect_list(F.format_string('%d', 'radius')))).head()[0]
return(radiuses)
# Make request
def make_request(coords, radiuses, options):
coords = format_coords(coords)
radiuses = format_radiuses(radiuses)
options = format_options(options) if options else ""
url = f"http://router.project-osrm.org/match/v1/car/{coords}?{options}{radiuses}"
r = requests.get(url)
return r.json()
Unfortunately, running the code blow returns a TypeError: 'GroupedData' object is not iterable. What am I missing:
output = {}
for trip, g in sdf.groupBy('date', 'user'):
output[trip] = make_request(coords = sdf[['lat', 'lon']],
radiuses = sdf[['radius']],
options = {'overview':'full',
'geometries': 'polyline6',
'annotations': 'nodes'})
Advertisement
Answer
You can try aggregating the string after group by:
import pyspark.sql.functions as F
import requests
def format_options(options):
options = "&".join([f"{k}={v}" for k, v in options.items()])
return options
def make_request(coords, radiuses, options):
options = format_options(options) if options else ""
url = f"http://router.project-osrm.org/match/v1/car/{coords}?{options}&radiuses={radiuses}"
r = requests.get(url)
print(url)
return r.json()
coords = sdf.groupBy('date', 'user').agg(
F.concat_ws(';',
F.collect_list(F.format_string('%f,%f', 'lon', 'lat'))
).alias('coords'),
F.concat_ws(';',
F.collect_list(F.format_string('%d', 'radius'))
).alias('radius')
).collect()
options = {'overview':'full', 'geometries': 'polyline6', 'annotations': 'nodes'}
output = {(c[0], c[1]): make_request(c[2], c[3], options) for c in coords}
"""
{('2018-02-03', 'A'): {'code': 'Ok',
'matchings': [{'confidence': 0.374625,
'distance': 325.2,
'duration': 50.6,
'geometry': 'y{h_gAh~znhF}@k[OmFMoFcAea@IeD[uMAYKsDMsDAe@}@u_@g@aTMwFMwFwAqq@',
'legs': [{'annotation': {'nodes': [1974590926,
4763953263,
65359046,
4763953265,
5443374298,
2007343352]},
'distance': 116.7,
'duration': 18.8,
'steps': [],
'summary': '',
'weight': 18.8},
{'annotation': {'nodes': [5443374298,
2007343352,
4763953266,
65359043,
4763953269,
2007343354,
4763953270]},
'distance': 85.6,
'duration': 12.2,
'steps': [],
'summary': '',
'weight': 12.2},
{'annotation': {'nodes': [2007343354,
4763953270,
65334199,
4763953274,
2007343347]},
'distance': 122.9,
'duration': 19.6,
'steps': [],
'summary': '',
'weight': 19.6}],
'weight': 50.6,
'weight_name': 'routability'}],
'tracepoints': [None,
None,
{'alternatives_count': 0,
'distance': 28.078003,
'hint': '20nBh2NdHwA2AAAAOgAAAAwAAAAPAAAAiVMWQq2VIEIAuABB7FgoQTYAAAA6AAAADAAAAA8AAABDRAAACwi0-M0TQALvB7T4yRRAAgEAXwUADb92',
'location': [-122.419189, 37.753805],
'matchings_index': 0,
'name': '23rd Street',
'waypoint_index': 0},
{'alternatives_count': 0,
'distance': 26.825184,
'hint': 'Ew3BBzFbH4AdAAAACwAAAA0AAAAAAAAAIxmmQTSs6kCiuRFBAAAAAB0AAAALAAAADQAAAAAAAABDRAAANg20-CIUQAJNDbT4MRNAAgIAnxAADb92',
'location': [-122.417866, 37.75389],
'matchings_index': 0,
'name': '23rd Street',
'waypoint_index': 1},
{'alternatives_count': 0,
'distance': 16.583412,
'hint': 'DQ3BBxQNwYcqAAAAQwAAABAAAAANAAAA0i_uQb3SOEKKPC9BG1EaQSoAAABDAAAAEAAAAA0AAABDRAAAABG0-F4UQALyELT48xRAAgEAnxAADb92',
'location': [-122.416896, 37.75395],
'matchings_index': 0,
'name': '23rd Street',
'waypoint_index': 2},
{'alternatives_count': 7,
'distance': 10.013916,
'hint': 'Dg3Bh1WcyQBmAAAACAAAABAAAAANAAAAQOKOQg89nkCKPC9BEMcOQWYAAAAIAAAAEAAAAA0AAABDRAAAcha0-KwUQAJ6FrT4UhRAAgEAbwUADb92',
'location': [-122.415502, 37.754028],
'matchings_index': 0,
'name': '23rd Street',
'waypoint_index': 3}]},
('2018-02-03', 'B'): {'code': 'Ok',
'matchings': [{'confidence': 1e-06,
'distance': 270.4,
'duration': 50,
'geometry': 'euu}fAd_~lhFoAlCMTuAvCvC|Bh@`@hXbUnAdADBhDzCzClCXVzZnW\X~CnC~@qBLWnWej@',
'legs': [{'annotation': {'nodes': [5443147626,
6360865540,
6360865536,
65307580,
6360865535,
6360865539,
6360865531]},
'distance': 84.8,
'duration': 17.8,
'steps': [],
'summary': '',
'weight': 17.8},
{'annotation': {'nodes': [6360865539,
6360865531,
6360865525,
65343521,
6360865527,
6360865529,
6360865523,
6360865520,
65321110,
6360865519,
6360865522,
6376329343]},
'distance': 185.6,
'duration': 32.2,
'steps': [],
'summary': '',
'weight': 32.2}],
'weight': 50,
'weight_name': 'routability'}],
'tracepoints': [{'alternatives_count': 0,
'distance': 11.53267,
'hint': 'ZpfJAOSXyYALAAAArQAAAA4AAAAsAAAAnpH1QDVG8EJWgBdBa2v0QQsAAACtAAAADgAAACwAAABDRAAA_YG0-GOtPwJKgrT4t60_AgIA3wcADb92',
'location': [-122.387971, 37.727587],
'matchings_index': 0,
'name': 'Underwood Avenue',
'waypoint_index': 0},
{'alternatives_count': 0,
'distance': 13.565054,
'hint': 'ZZfJgALywAdPAAAACAAAABMAAAASAAAA7ONaQo4CrUDv7U1BJdFAQU8AAAAIAAAAEwAAABIAAABDRAAArX-0-MerPwIsgLT4gqs_AgIAbw0ADb92',
'location': [-122.388563, 37.727175],
'matchings_index': 0,
'name': 'Jennings Street',
'waypoint_index': 1},
{'alternatives_count': 1,
'distance': 9.601917,
'hint': 'WZfJAP7xwIecAAAAbAAAABEAAAALAAAAdujYQqu4lUJXHD1B9-ruQJwAAABsAAAAEQAAAAsAAABDRAAAAoC0-CCnPwJCgLT4Zqc_AgIAHxMADb92',
'location': [-122.388478, 37.725984],
'matchings_index': 0,
'name': 'Wallace Avenue',
'waypoint_index': 2}]},
('2018-02-03', 'C'): {'code': 'Ok',
'matchings': [{'confidence': 7.3e-05,
'distance': 420.1,
'duration': 64.1,
'geometry': 'kuy|fAbyjphFcBxEmE`FqJkKiBqBuP}Qgc@ie@eAiAcB}ArA_Eb@mAjKkDnBo@fe@mOrw@kW',
'legs': [{'annotation': {'nodes': [5440513673,
5440513674,
5440513675,
65363070,
1229920760,
65307726,
6906452420,
1229920717,
65361047,
1229920749,
554163599,
3978809925]},
'distance': 235.2,
'duration': 37.5,
'steps': [],
'summary': '',
'weight': 40.1},
{'annotation': {'nodes': [554163599,
3978809925,
65345518,
8256268328]},
'distance': 184.9,
'duration': 26.6,
'steps': [],
'summary': '',
'weight': 26.6}],
'weight': 66.7,
'weight_name': 'routability'}],
'tracepoints': [None,
None,
{'alternatives_count': 0,
'distance': 6.968076,
'hint': 'KLvAhyu7wAcAAAAANQAAAAAAAAAkAAAAAAAAAOCMMUEAAAAA_Z1yQQAAAAAbAAAAAAAAACQAAABDRAAAXqiz-GZ1PwKiqLP4hnU_AgAAzxIADb92',
'location': [-122.443682, 37.713254],
'matchings_index': 0,
'name': '',
'waypoint_index': 0},
{'alternatives_count': 0,
'distance': 16.488956,
'hint': '-rrAB_aPyYAJAAAAIgAAAGgAAAAUAAAA2RnSQL_5uUEPjI9CBTlaQQkAAAAiAAAAaAAAABQAAABDRAAARK2z-J95PwKTrLP4b3k_AgEAXxUADb92',
'location': [-122.442428, 37.714335],
'matchings_index': 0,
'name': 'Allison Street',
'waypoint_index': 1},
{'alternatives_count': 1,
'distance': 17.311636,
'hint': '_brAhwC7wAeZAAAANwAAAAAAAAAKAAAAH4vUQgKXFkIAAAAAXtbYQJkAAAA3AAAAAAAAAAoAAABDRAAA6a-z-HlzPwKjsLP4q3M_AgAAHwoADb92',
'location': [-122.441751, 37.712761],
'matchings_index': 0,
'name': 'Allison Street',
'waypoint_index': 2}]},
('2018-02-04', 'A'): {'code': 'Ok',
'matchings': [{'confidence': 0,
'distance': 205.5,
'duration': 46.4,
'geometry': '{di_gAfovnhFg@iTMwFbCMlXmApH[k@iJoB{l@uFH',
'legs': [{'annotation': {'nodes': [2007343354,
4763953270,
65334199,
4763953267,
5443374265,
5443374261,
5443374264,
5443374263,
5544172171]},
'distance': 205.5,
'duration': 46.4,
'steps': [],
'summary': '',
'weight': 46.4}],
'weight': 46.4,
'weight_name': 'routability'}],
'tracepoints': [{'alternatives_count': 0,
'distance': 11.908542,
'hint': 'DQ3BBxQNwYcrAAAAQgAAABAAAAANAAAAkv_wQeJqN0KKPC9BG1EaQSsAAABCAAAAEAAAAA0AAABDRAAA_BC0-F4UQALyELT4yRRAAgEAnxAADb92',
'location': [-122.4169, 37.75395],
'matchings_index': 0,
'name': '23rd Street',
'waypoint_index': 0},
{'alternatives_count': 6,
'distance': 11.065027,
'hint': 'kQ3Bh____38hAAAAIQAAAMMAAAAAAAAApopaQQAAAADsMaJCAAAAACEAAAAhAAAAwwAAAAAAAABDRAAAlxa0-NASQAJ6FrT4MRNAAgIAbxYADb92',
'location': [-122.415465, 37.753552],
'matchings_index': 0,
'name': '',
'waypoint_index': 1}]}}
"""