I have a Spark dataframe sdf
with GPS points that looks like this:
d = {'user': ['A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'C', 'C', 'C', 'C', 'C', 'A', 'A'], 'lat': [37.75243634842733, 37.75344580658182, 37.75405656449232, 37.753649393112181,37.75409897804892, 37.753937806404586, 37.72767062183685, 37.72710631810977, 37.72605407110467, 37.71141865080228, 37.712199505873926, 37.713285899241896, 37.71428740401767, 37.712810604103346, 37.75405656449232, 37.753649393112181], 'lon': [-122.41924881935118, -122.42006421089171, -122.419216632843, -122.41784334182738, -122.4169099330902, -122.41549372673035, -122.3878937959671, -122.3884356021881, -122.38841414451599, -122.44688630104064, -122.44474053382874, -122.44361400604248, -122.44260549545288, -122.44156479835509, -122.4169099330902, -122.41549372673035], 'date': ['2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-04', '2018-02-04'], 'radius': [10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10]} pdf = pd.DataFrame(data=d) sdf = spark.createDataFrame(pdf) +----+------------------+-------------------+----------+------+ |user| lat| lon| date|radius| +----+------------------+-------------------+----------+------+ | A| 37.75243634842733|-122.41924881935118|2018-02-03| 10| | A| 37.75344580658182|-122.42006421089171|2018-02-03| 10| | A| 37.75405656449232| -122.419216632843|2018-02-03| 10| | A|37.753649393112184|-122.41784334182738|2018-02-03| 10| | A| 37.75409897804892| -122.4169099330902|2018-02-03| 10| | A|37.753937806404586|-122.41549372673035|2018-02-03| 10| | B| 37.72767062183685| -122.3878937959671|2018-02-03| 10| | B| 37.72710631810977| -122.3884356021881|2018-02-03| 10| | B| 37.72605407110467|-122.38841414451599|2018-02-03| 10| | C| 37.71141865080228|-122.44688630104064|2018-02-03| 10| | C|37.712199505873926|-122.44474053382874|2018-02-03| 10| | C|37.713285899241896|-122.44361400604248|2018-02-03| 10| | C| 37.71428740401767|-122.44260549545288|2018-02-03| 10| | C|37.712810604103346|-122.44156479835509|2018-02-03| 10| | A| 37.75405656449232| -122.4169099330902|2018-02-04| 10| | A|37.753649393112184|-122.41549372673035|2018-02-04| 10| +----+------------------+-------------------+----------+------+
Since the spark dataframe contains different GPS trajectories generated by different users on different days, I want to write a function that loops through this df and feeds the corresponding set of coordinates to the (OSRM) request per date
and per user
group and not all at once.
from typing import Dict, Any, List, Tuple import pyspark.sql.functions as F import requests # Format coordinates into a concatenated string formatted for the OSRM server def format_coords(df): coords = df.agg(F.concat_ws(';', F.collect_list(F.format_string('%f,%f', 'lon', 'lat')))).head()[0] return(coords) # Format dictionary of additional options to the OSRM request into a concatenated string format. def format_options(options: Dict[str, str]) -> str: options = "&".join([f"{k}={v}" for k, v in options.items()]) return options # Format radiuses into a concatenated string formatted for the OSRM server def format_radiuses(df): radiuses = "&radiuses=" + df.agg(F.concat_ws(';', F.collect_list(F.format_string('%d', 'radius')))).head()[0] return(radiuses) # Make request def make_request(coords, radiuses, options): coords = format_coords(coords) radiuses = format_radiuses(radiuses) options = format_options(options) if options else "" url = f"http://router.project-osrm.org/match/v1/car/{coords}?{options}{radiuses}" r = requests.get(url) return r.json()
Unfortunately, running the code blow returns a TypeError: 'GroupedData' object is not iterable
. What am I missing:
output = {} for trip, g in sdf.groupBy('date', 'user'): output[trip] = make_request(coords = sdf[['lat', 'lon']], radiuses = sdf[['radius']], options = {'overview':'full', 'geometries': 'polyline6', 'annotations': 'nodes'})
Advertisement
Answer
You can try aggregating the string after group by:
import pyspark.sql.functions as F import requests def format_options(options): options = "&".join([f"{k}={v}" for k, v in options.items()]) return options def make_request(coords, radiuses, options): options = format_options(options) if options else "" url = f"http://router.project-osrm.org/match/v1/car/{coords}?{options}&radiuses={radiuses}" r = requests.get(url) print(url) return r.json() coords = sdf.groupBy('date', 'user').agg( F.concat_ws(';', F.collect_list(F.format_string('%f,%f', 'lon', 'lat')) ).alias('coords'), F.concat_ws(';', F.collect_list(F.format_string('%d', 'radius')) ).alias('radius') ).collect() options = {'overview':'full', 'geometries': 'polyline6', 'annotations': 'nodes'} output = {(c[0], c[1]): make_request(c[2], c[3], options) for c in coords} """ {('2018-02-03', 'A'): {'code': 'Ok', 'matchings': [{'confidence': 0.374625, 'distance': 325.2, 'duration': 50.6, 'geometry': 'y{h_gAh~znhF}@k[OmFMoFcAea@IeD[uMAYKsDMsDAe@}@u_@g@aTMwFMwFwAqq@', 'legs': [{'annotation': {'nodes': [1974590926, 4763953263, 65359046, 4763953265, 5443374298, 2007343352]}, 'distance': 116.7, 'duration': 18.8, 'steps': [], 'summary': '', 'weight': 18.8}, {'annotation': {'nodes': [5443374298, 2007343352, 4763953266, 65359043, 4763953269, 2007343354, 4763953270]}, 'distance': 85.6, 'duration': 12.2, 'steps': [], 'summary': '', 'weight': 12.2}, {'annotation': {'nodes': [2007343354, 4763953270, 65334199, 4763953274, 2007343347]}, 'distance': 122.9, 'duration': 19.6, 'steps': [], 'summary': '', 'weight': 19.6}], 'weight': 50.6, 'weight_name': 'routability'}], 'tracepoints': [None, None, {'alternatives_count': 0, 'distance': 28.078003, 'hint': '20nBh2NdHwA2AAAAOgAAAAwAAAAPAAAAiVMWQq2VIEIAuABB7FgoQTYAAAA6AAAADAAAAA8AAABDRAAACwi0-M0TQALvB7T4yRRAAgEAXwUADb92', 'location': [-122.419189, 37.753805], 'matchings_index': 0, 'name': '23rd Street', 'waypoint_index': 0}, {'alternatives_count': 0, 'distance': 26.825184, 'hint': 'Ew3BBzFbH4AdAAAACwAAAA0AAAAAAAAAIxmmQTSs6kCiuRFBAAAAAB0AAAALAAAADQAAAAAAAABDRAAANg20-CIUQAJNDbT4MRNAAgIAnxAADb92', 'location': [-122.417866, 37.75389], 'matchings_index': 0, 'name': '23rd Street', 'waypoint_index': 1}, {'alternatives_count': 0, 'distance': 16.583412, 'hint': 'DQ3BBxQNwYcqAAAAQwAAABAAAAANAAAA0i_uQb3SOEKKPC9BG1EaQSoAAABDAAAAEAAAAA0AAABDRAAAABG0-F4UQALyELT48xRAAgEAnxAADb92', 'location': [-122.416896, 37.75395], 'matchings_index': 0, 'name': '23rd Street', 'waypoint_index': 2}, {'alternatives_count': 7, 'distance': 10.013916, 'hint': 'Dg3Bh1WcyQBmAAAACAAAABAAAAANAAAAQOKOQg89nkCKPC9BEMcOQWYAAAAIAAAAEAAAAA0AAABDRAAAcha0-KwUQAJ6FrT4UhRAAgEAbwUADb92', 'location': [-122.415502, 37.754028], 'matchings_index': 0, 'name': '23rd Street', 'waypoint_index': 3}]}, ('2018-02-03', 'B'): {'code': 'Ok', 'matchings': [{'confidence': 1e-06, 'distance': 270.4, 'duration': 50, 'geometry': 'euu}fAd_~lhFoAlCMTuAvCvC|Bh@`@hXbUnAdADBhDzCzClCXVzZnW\X~CnC~@qBLWnWej@', 'legs': [{'annotation': {'nodes': [5443147626, 6360865540, 6360865536, 65307580, 6360865535, 6360865539, 6360865531]}, 'distance': 84.8, 'duration': 17.8, 'steps': [], 'summary': '', 'weight': 17.8}, {'annotation': {'nodes': [6360865539, 6360865531, 6360865525, 65343521, 6360865527, 6360865529, 6360865523, 6360865520, 65321110, 6360865519, 6360865522, 6376329343]}, 'distance': 185.6, 'duration': 32.2, 'steps': [], 'summary': '', 'weight': 32.2}], 'weight': 50, 'weight_name': 'routability'}], 'tracepoints': [{'alternatives_count': 0, 'distance': 11.53267, 'hint': 'ZpfJAOSXyYALAAAArQAAAA4AAAAsAAAAnpH1QDVG8EJWgBdBa2v0QQsAAACtAAAADgAAACwAAABDRAAA_YG0-GOtPwJKgrT4t60_AgIA3wcADb92', 'location': [-122.387971, 37.727587], 'matchings_index': 0, 'name': 'Underwood Avenue', 'waypoint_index': 0}, {'alternatives_count': 0, 'distance': 13.565054, 'hint': 'ZZfJgALywAdPAAAACAAAABMAAAASAAAA7ONaQo4CrUDv7U1BJdFAQU8AAAAIAAAAEwAAABIAAABDRAAArX-0-MerPwIsgLT4gqs_AgIAbw0ADb92', 'location': [-122.388563, 37.727175], 'matchings_index': 0, 'name': 'Jennings Street', 'waypoint_index': 1}, {'alternatives_count': 1, 'distance': 9.601917, 'hint': 'WZfJAP7xwIecAAAAbAAAABEAAAALAAAAdujYQqu4lUJXHD1B9-ruQJwAAABsAAAAEQAAAAsAAABDRAAAAoC0-CCnPwJCgLT4Zqc_AgIAHxMADb92', 'location': [-122.388478, 37.725984], 'matchings_index': 0, 'name': 'Wallace Avenue', 'waypoint_index': 2}]}, ('2018-02-03', 'C'): {'code': 'Ok', 'matchings': [{'confidence': 7.3e-05, 'distance': 420.1, 'duration': 64.1, 'geometry': 'kuy|fAbyjphFcBxEmE`FqJkKiBqBuP}Qgc@ie@eAiAcB}ArA_Eb@mAjKkDnBo@fe@mOrw@kW', 'legs': [{'annotation': {'nodes': [5440513673, 5440513674, 5440513675, 65363070, 1229920760, 65307726, 6906452420, 1229920717, 65361047, 1229920749, 554163599, 3978809925]}, 'distance': 235.2, 'duration': 37.5, 'steps': [], 'summary': '', 'weight': 40.1}, {'annotation': {'nodes': [554163599, 3978809925, 65345518, 8256268328]}, 'distance': 184.9, 'duration': 26.6, 'steps': [], 'summary': '', 'weight': 26.6}], 'weight': 66.7, 'weight_name': 'routability'}], 'tracepoints': [None, None, {'alternatives_count': 0, 'distance': 6.968076, 'hint': 'KLvAhyu7wAcAAAAANQAAAAAAAAAkAAAAAAAAAOCMMUEAAAAA_Z1yQQAAAAAbAAAAAAAAACQAAABDRAAAXqiz-GZ1PwKiqLP4hnU_AgAAzxIADb92', 'location': [-122.443682, 37.713254], 'matchings_index': 0, 'name': '', 'waypoint_index': 0}, {'alternatives_count': 0, 'distance': 16.488956, 'hint': '-rrAB_aPyYAJAAAAIgAAAGgAAAAUAAAA2RnSQL_5uUEPjI9CBTlaQQkAAAAiAAAAaAAAABQAAABDRAAARK2z-J95PwKTrLP4b3k_AgEAXxUADb92', 'location': [-122.442428, 37.714335], 'matchings_index': 0, 'name': 'Allison Street', 'waypoint_index': 1}, {'alternatives_count': 1, 'distance': 17.311636, 'hint': '_brAhwC7wAeZAAAANwAAAAAAAAAKAAAAH4vUQgKXFkIAAAAAXtbYQJkAAAA3AAAAAAAAAAoAAABDRAAA6a-z-HlzPwKjsLP4q3M_AgAAHwoADb92', 'location': [-122.441751, 37.712761], 'matchings_index': 0, 'name': 'Allison Street', 'waypoint_index': 2}]}, ('2018-02-04', 'A'): {'code': 'Ok', 'matchings': [{'confidence': 0, 'distance': 205.5, 'duration': 46.4, 'geometry': '{di_gAfovnhFg@iTMwFbCMlXmApH[k@iJoB{l@uFH', 'legs': [{'annotation': {'nodes': [2007343354, 4763953270, 65334199, 4763953267, 5443374265, 5443374261, 5443374264, 5443374263, 5544172171]}, 'distance': 205.5, 'duration': 46.4, 'steps': [], 'summary': '', 'weight': 46.4}], 'weight': 46.4, 'weight_name': 'routability'}], 'tracepoints': [{'alternatives_count': 0, 'distance': 11.908542, 'hint': 'DQ3BBxQNwYcrAAAAQgAAABAAAAANAAAAkv_wQeJqN0KKPC9BG1EaQSsAAABCAAAAEAAAAA0AAABDRAAA_BC0-F4UQALyELT4yRRAAgEAnxAADb92', 'location': [-122.4169, 37.75395], 'matchings_index': 0, 'name': '23rd Street', 'waypoint_index': 0}, {'alternatives_count': 6, 'distance': 11.065027, 'hint': 'kQ3Bh____38hAAAAIQAAAMMAAAAAAAAApopaQQAAAADsMaJCAAAAACEAAAAhAAAAwwAAAAAAAABDRAAAlxa0-NASQAJ6FrT4MRNAAgIAbxYADb92', 'location': [-122.415465, 37.753552], 'matchings_index': 0, 'name': '', 'waypoint_index': 1}]}} """