I have a Spark dataframe sdf
with GPS points that looks like this:
JavaScript
x
29
29
1
d = {'user': ['A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'C', 'C', 'C', 'C', 'C', 'A', 'A'],
2
'lat': [37.75243634842733, 37.75344580658182, 37.75405656449232, 37.753649393112181,37.75409897804892, 37.753937806404586, 37.72767062183685, 37.72710631810977, 37.72605407110467, 37.71141865080228, 37.712199505873926, 37.713285899241896, 37.71428740401767, 37.712810604103346, 37.75405656449232, 37.753649393112181],
3
'lon': [-122.41924881935118, -122.42006421089171, -122.419216632843, -122.41784334182738, -122.4169099330902, -122.41549372673035, -122.3878937959671, -122.3884356021881, -122.38841414451599, -122.44688630104064, -122.44474053382874, -122.44361400604248, -122.44260549545288, -122.44156479835509, -122.4169099330902, -122.41549372673035],
4
'date': ['2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-04', '2018-02-04'],
5
'radius': [10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10]}
6
pdf = pd.DataFrame(data=d)
7
sdf = spark.createDataFrame(pdf)
8
9
+----+------------------+-------------------+----------+------+
10
|user| lat| lon| date|radius|
11
+----+------------------+-------------------+----------+------+
12
| A| 37.75243634842733|-122.41924881935118|2018-02-03| 10|
13
| A| 37.75344580658182|-122.42006421089171|2018-02-03| 10|
14
| A| 37.75405656449232| -122.419216632843|2018-02-03| 10|
15
| A|37.753649393112184|-122.41784334182738|2018-02-03| 10|
16
| A| 37.75409897804892| -122.4169099330902|2018-02-03| 10|
17
| A|37.753937806404586|-122.41549372673035|2018-02-03| 10|
18
| B| 37.72767062183685| -122.3878937959671|2018-02-03| 10|
19
| B| 37.72710631810977| -122.3884356021881|2018-02-03| 10|
20
| B| 37.72605407110467|-122.38841414451599|2018-02-03| 10|
21
| C| 37.71141865080228|-122.44688630104064|2018-02-03| 10|
22
| C|37.712199505873926|-122.44474053382874|2018-02-03| 10|
23
| C|37.713285899241896|-122.44361400604248|2018-02-03| 10|
24
| C| 37.71428740401767|-122.44260549545288|2018-02-03| 10|
25
| C|37.712810604103346|-122.44156479835509|2018-02-03| 10|
26
| A| 37.75405656449232| -122.4169099330902|2018-02-04| 10|
27
| A|37.753649393112184|-122.41549372673035|2018-02-04| 10|
28
+----+------------------+-------------------+----------+------+
29
Since the spark dataframe contains different GPS trajectories generated by different users on different days, I want to write a function that loops through this df and feeds the corresponding set of coordinates to the (OSRM) request per date
and per user
group and not all at once.
JavaScript
1
28
28
1
from typing import Dict, Any, List, Tuple
2
import pyspark.sql.functions as F
3
import requests
4
5
# Format coordinates into a concatenated string formatted for the OSRM server
6
def format_coords(df):
7
coords = df.agg(F.concat_ws(';', F.collect_list(F.format_string('%f,%f', 'lon', 'lat')))).head()[0]
8
return(coords)
9
10
# Format dictionary of additional options to the OSRM request into a concatenated string format.
11
def format_options(options: Dict[str, str]) -> str:
12
options = "&".join([f"{k}={v}" for k, v in options.items()])
13
return options
14
15
# Format radiuses into a concatenated string formatted for the OSRM server
16
def format_radiuses(df):
17
radiuses = "&radiuses=" + df.agg(F.concat_ws(';', F.collect_list(F.format_string('%d', 'radius')))).head()[0]
18
return(radiuses)
19
20
# Make request
21
def make_request(coords, radiuses, options):
22
coords = format_coords(coords)
23
radiuses = format_radiuses(radiuses)
24
options = format_options(options) if options else ""
25
url = f"http://router.project-osrm.org/match/v1/car/{coords}?{options}{radiuses}"
26
r = requests.get(url)
27
return r.json()
28
Unfortunately, running the code blow returns a TypeError: 'GroupedData' object is not iterable
. What am I missing:
JavaScript
1
8
1
output = {}
2
for trip, g in sdf.groupBy('date', 'user'):
3
output[trip] = make_request(coords = sdf[['lat', 'lon']],
4
radiuses = sdf[['radius']],
5
options = {'overview':'full',
6
'geometries': 'polyline6',
7
'annotations': 'nodes'})
8
Advertisement
Answer
You can try aggregating the string after group by:
JavaScript
1
247
247
1
import pyspark.sql.functions as F
2
import requests
3
4
def format_options(options):
5
options = "&".join([f"{k}={v}" for k, v in options.items()])
6
return options
7
8
def make_request(coords, radiuses, options):
9
options = format_options(options) if options else ""
10
url = f"http://router.project-osrm.org/match/v1/car/{coords}?{options}&radiuses={radiuses}"
11
r = requests.get(url)
12
print(url)
13
return r.json()
14
15
coords = sdf.groupBy('date', 'user').agg(
16
F.concat_ws(';',
17
F.collect_list(F.format_string('%f,%f', 'lon', 'lat'))
18
).alias('coords'),
19
F.concat_ws(';',
20
F.collect_list(F.format_string('%d', 'radius'))
21
).alias('radius')
22
).collect()
23
24
options = {'overview':'full', 'geometries': 'polyline6', 'annotations': 'nodes'}
25
output = {(c[0], c[1]): make_request(c[2], c[3], options) for c in coords}
26
27
"""
28
{('2018-02-03', 'A'): {'code': 'Ok',
29
'matchings': [{'confidence': 0.374625,
30
'distance': 325.2,
31
'duration': 50.6,
32
'geometry': 'y{h_gAh~znhF}@k[OmFMoFcAea@IeD[uMAYKsDMsDAe@}@u_@g@aTMwFMwFwAqq@',
33
'legs': [{'annotation': {'nodes': [1974590926,
34
4763953263,
35
65359046,
36
4763953265,
37
5443374298,
38
2007343352]},
39
'distance': 116.7,
40
'duration': 18.8,
41
'steps': [],
42
'summary': '',
43
'weight': 18.8},
44
{'annotation': {'nodes': [5443374298,
45
2007343352,
46
4763953266,
47
65359043,
48
4763953269,
49
2007343354,
50
4763953270]},
51
'distance': 85.6,
52
'duration': 12.2,
53
'steps': [],
54
'summary': '',
55
'weight': 12.2},
56
{'annotation': {'nodes': [2007343354,
57
4763953270,
58
65334199,
59
4763953274,
60
2007343347]},
61
'distance': 122.9,
62
'duration': 19.6,
63
'steps': [],
64
'summary': '',
65
'weight': 19.6}],
66
'weight': 50.6,
67
'weight_name': 'routability'}],
68
'tracepoints': [None,
69
None,
70
{'alternatives_count': 0,
71
'distance': 28.078003,
72
'hint': '20nBh2NdHwA2AAAAOgAAAAwAAAAPAAAAiVMWQq2VIEIAuABB7FgoQTYAAAA6AAAADAAAAA8AAABDRAAACwi0-M0TQALvB7T4yRRAAgEAXwUADb92',
73
'location': [-122.419189, 37.753805],
74
'matchings_index': 0,
75
'name': '23rd Street',
76
'waypoint_index': 0},
77
{'alternatives_count': 0,
78
'distance': 26.825184,
79
'hint': 'Ew3BBzFbH4AdAAAACwAAAA0AAAAAAAAAIxmmQTSs6kCiuRFBAAAAAB0AAAALAAAADQAAAAAAAABDRAAANg20-CIUQAJNDbT4MRNAAgIAnxAADb92',
80
'location': [-122.417866, 37.75389],
81
'matchings_index': 0,
82
'name': '23rd Street',
83
'waypoint_index': 1},
84
{'alternatives_count': 0,
85
'distance': 16.583412,
86
'hint': 'DQ3BBxQNwYcqAAAAQwAAABAAAAANAAAA0i_uQb3SOEKKPC9BG1EaQSoAAABDAAAAEAAAAA0AAABDRAAAABG0-F4UQALyELT48xRAAgEAnxAADb92',
87
'location': [-122.416896, 37.75395],
88
'matchings_index': 0,
89
'name': '23rd Street',
90
'waypoint_index': 2},
91
{'alternatives_count': 7,
92
'distance': 10.013916,
93
'hint': 'Dg3Bh1WcyQBmAAAACAAAABAAAAANAAAAQOKOQg89nkCKPC9BEMcOQWYAAAAIAAAAEAAAAA0AAABDRAAAcha0-KwUQAJ6FrT4UhRAAgEAbwUADb92',
94
'location': [-122.415502, 37.754028],
95
'matchings_index': 0,
96
'name': '23rd Street',
97
'waypoint_index': 3}]},
98
('2018-02-03', 'B'): {'code': 'Ok',
99
'matchings': [{'confidence': 1e-06,
100
'distance': 270.4,
101
'duration': 50,
102
'geometry': 'euu}fAd_~lhFoAlCMTuAvCvC|Bh@`@hXbUnAdADBhDzCzClCXVzZnW\X~CnC~@qBLWnWej@',
103
'legs': [{'annotation': {'nodes': [5443147626,
104
6360865540,
105
6360865536,
106
65307580,
107
6360865535,
108
6360865539,
109
6360865531]},
110
'distance': 84.8,
111
'duration': 17.8,
112
'steps': [],
113
'summary': '',
114
'weight': 17.8},
115
{'annotation': {'nodes': [6360865539,
116
6360865531,
117
6360865525,
118
65343521,
119
6360865527,
120
6360865529,
121
6360865523,
122
6360865520,
123
65321110,
124
6360865519,
125
6360865522,
126
6376329343]},
127
'distance': 185.6,
128
'duration': 32.2,
129
'steps': [],
130
'summary': '',
131
'weight': 32.2}],
132
'weight': 50,
133
'weight_name': 'routability'}],
134
'tracepoints': [{'alternatives_count': 0,
135
'distance': 11.53267,
136
'hint': 'ZpfJAOSXyYALAAAArQAAAA4AAAAsAAAAnpH1QDVG8EJWgBdBa2v0QQsAAACtAAAADgAAACwAAABDRAAA_YG0-GOtPwJKgrT4t60_AgIA3wcADb92',
137
'location': [-122.387971, 37.727587],
138
'matchings_index': 0,
139
'name': 'Underwood Avenue',
140
'waypoint_index': 0},
141
{'alternatives_count': 0,
142
'distance': 13.565054,
143
'hint': 'ZZfJgALywAdPAAAACAAAABMAAAASAAAA7ONaQo4CrUDv7U1BJdFAQU8AAAAIAAAAEwAAABIAAABDRAAArX-0-MerPwIsgLT4gqs_AgIAbw0ADb92',
144
'location': [-122.388563, 37.727175],
145
'matchings_index': 0,
146
'name': 'Jennings Street',
147
'waypoint_index': 1},
148
{'alternatives_count': 1,
149
'distance': 9.601917,
150
'hint': 'WZfJAP7xwIecAAAAbAAAABEAAAALAAAAdujYQqu4lUJXHD1B9-ruQJwAAABsAAAAEQAAAAsAAABDRAAAAoC0-CCnPwJCgLT4Zqc_AgIAHxMADb92',
151
'location': [-122.388478, 37.725984],
152
'matchings_index': 0,
153
'name': 'Wallace Avenue',
154
'waypoint_index': 2}]},
155
('2018-02-03', 'C'): {'code': 'Ok',
156
'matchings': [{'confidence': 7.3e-05,
157
'distance': 420.1,
158
'duration': 64.1,
159
'geometry': 'kuy|fAbyjphFcBxEmE`FqJkKiBqBuP}Qgc@ie@eAiAcB}ArA_Eb@mAjKkDnBo@fe@mOrw@kW',
160
'legs': [{'annotation': {'nodes': [5440513673,
161
5440513674,
162
5440513675,
163
65363070,
164
1229920760,
165
65307726,
166
6906452420,
167
1229920717,
168
65361047,
169
1229920749,
170
554163599,
171
3978809925]},
172
'distance': 235.2,
173
'duration': 37.5,
174
'steps': [],
175
'summary': '',
176
'weight': 40.1},
177
{'annotation': {'nodes': [554163599,
178
3978809925,
179
65345518,
180
8256268328]},
181
'distance': 184.9,
182
'duration': 26.6,
183
'steps': [],
184
'summary': '',
185
'weight': 26.6}],
186
'weight': 66.7,
187
'weight_name': 'routability'}],
188
'tracepoints': [None,
189
None,
190
{'alternatives_count': 0,
191
'distance': 6.968076,
192
'hint': 'KLvAhyu7wAcAAAAANQAAAAAAAAAkAAAAAAAAAOCMMUEAAAAA_Z1yQQAAAAAbAAAAAAAAACQAAABDRAAAXqiz-GZ1PwKiqLP4hnU_AgAAzxIADb92',
193
'location': [-122.443682, 37.713254],
194
'matchings_index': 0,
195
'name': '',
196
'waypoint_index': 0},
197
{'alternatives_count': 0,
198
'distance': 16.488956,
199
'hint': '-rrAB_aPyYAJAAAAIgAAAGgAAAAUAAAA2RnSQL_5uUEPjI9CBTlaQQkAAAAiAAAAaAAAABQAAABDRAAARK2z-J95PwKTrLP4b3k_AgEAXxUADb92',
200
'location': [-122.442428, 37.714335],
201
'matchings_index': 0,
202
'name': 'Allison Street',
203
'waypoint_index': 1},
204
{'alternatives_count': 1,
205
'distance': 17.311636,
206
'hint': '_brAhwC7wAeZAAAANwAAAAAAAAAKAAAAH4vUQgKXFkIAAAAAXtbYQJkAAAA3AAAAAAAAAAoAAABDRAAA6a-z-HlzPwKjsLP4q3M_AgAAHwoADb92',
207
'location': [-122.441751, 37.712761],
208
'matchings_index': 0,
209
'name': 'Allison Street',
210
'waypoint_index': 2}]},
211
('2018-02-04', 'A'): {'code': 'Ok',
212
'matchings': [{'confidence': 0,
213
'distance': 205.5,
214
'duration': 46.4,
215
'geometry': '{di_gAfovnhFg@iTMwFbCMlXmApH[k@iJoB{l@uFH',
216
'legs': [{'annotation': {'nodes': [2007343354,
217
4763953270,
218
65334199,
219
4763953267,
220
5443374265,
221
5443374261,
222
5443374264,
223
5443374263,
224
5544172171]},
225
'distance': 205.5,
226
'duration': 46.4,
227
'steps': [],
228
'summary': '',
229
'weight': 46.4}],
230
'weight': 46.4,
231
'weight_name': 'routability'}],
232
'tracepoints': [{'alternatives_count': 0,
233
'distance': 11.908542,
234
'hint': 'DQ3BBxQNwYcrAAAAQgAAABAAAAANAAAAkv_wQeJqN0KKPC9BG1EaQSsAAABCAAAAEAAAAA0AAABDRAAA_BC0-F4UQALyELT4yRRAAgEAnxAADb92',
235
'location': [-122.4169, 37.75395],
236
'matchings_index': 0,
237
'name': '23rd Street',
238
'waypoint_index': 0},
239
{'alternatives_count': 6,
240
'distance': 11.065027,
241
'hint': 'kQ3Bh____38hAAAAIQAAAMMAAAAAAAAApopaQQAAAADsMaJCAAAAACEAAAAhAAAAwwAAAAAAAABDRAAAlxa0-NASQAJ6FrT4MRNAAgIAbxYADb92',
242
'location': [-122.415465, 37.753552],
243
'matchings_index': 0,
244
'name': '',
245
'waypoint_index': 1}]}}
246
"""
247