I have a csv file which is comma separated. One of the columns has data which is again comma separated. Each row in that specific column has different no of words , hence different number of commas. When I access or perform any sort of operation like filtering (after splitting the data) it throws errors in pyspark. How shall I handle such kind of data? eg one of the columns is colors and say the data for each entry is different, 1. red,blue 2. red,blue,orange. After splitting ,the indices for the next columns change/shift for every row.
data is tabular form
|id| category|color |price| -------------------------------------- |1 | a |red,blue | 2000| -------------------------------------- |2 | b | black | 5000| -------------------------------------- |3 | c |green,black,blue | 3000|
Data is comma separated so it appears as shown below when opened through text editor.
id,category,color,price 1,a,red,blue,2000 2,b,black,5000 3,c,green,black,blue,3000
I tried doing the following operations, both don’t work. How to handle such data.
a.filter(lambda x : 'id' not in x).filter(lambda x: (x.split(",")[4]=='2000')).map(lambda x: x.split(",")[1]).collect() a.filter(lambda x : 'id' not in x).filter(lambda x: (x.split(",")[3]=='2000')).map(lambda x: x.split(",")[1]).collect()
Advertisement
Answer
Something like this should work:
# Read data and add a row index rdd = sc.textFile("example.txt").zipWithIndex() # Get first row - columns columns = rdd.filter(lambda x: x[1] == 0).map(lambda x: x[0].split(",")).collect()[0] # Get actual data - all the other rows data = rdd.filter(lambda x: x[1] > 0).map(lambda x: x[0].split(",")) # Split out data rows into fields and covert to a DF data = data.map(lambda x: (x[0], x[1], ",".join(x[2:-1]), x[-1])).toDF(schema=columns) data.show() +---+--------+----------------+-----+ | id|category| color|price| +---+--------+----------------+-----+ | 1| a| red,blue| 2000| | 2| b| black| 5000| | 3| c|green,black,blue| 3000| +---+--------+----------------+-----+