I have two dataframes with partition level 2. Dataframes are small probably around 100 rows each.
df1 :
col1 columnindex null 1 null 2 null 3 null 4 100 5 101 6 102 7 103 8 104 9 105 10
df2:
col2 columnindex 100 1 200 2 null 3 null 4 100 5 101 6 null 7 103 8 null 9 105 10
my final df will be join of df1 and df2 based on columnindex.
col1 col2 columnindex null 100 1 null 200 2 null null 3 null null 4 100 100 5 101 101 6 102 null 7 103 103 8 104 null 9 105 105 10
But when I am joining two data frames as per below it looks it is shuffling and giving me the incorrect results. Is there any way I can do which avoid shuffling.
df1.join(df2, df1.columnindex == df2.columnindex, 'inner')
Advertisement
Answer
this depends on what do you mean by shuffling.
join1 = spark.createDataFrame([(None, 1), (None, 2), (None, 3), (100, 5), (101, 6), (105, 10)], ['col1', 'columnindex']) join2 = spark.createDataFrame([(100, 1), (200, 2), (None, 3), (100, 5), (101, 6), (None, 10)], ['col2', 'columnindex']) joined = join1.join(join2, ['columnindex'], 'inner').select(['columnindex', 'col1', 'col2']) joined.show()
results in:
+-----------+----+----+ |columnindex|col1|col2| +-----------+----+----+ | 2|null| 200| | 5| 100| 100| | 3|null|null| | 6| 101| 101| | 1|null| 100| | 10| 105|null| +-----------+----+----+
Which is a correct result – each columnindex corresponds to proper values from both dataframes and if you do any further computations, this shouldn’t be a problem.
However, if you want values to be ordered by columnindex, you can do it with orderBy
joined.orderBy('columnindex').show()
+-----------+----+----+ |columnindex|col1|col2| +-----------+----+----+ | 1|null| 100| | 2|null| 200| | 3|null|null| | 5| 100| 100| | 6| 101| 101| | 10| 105|null| +-----------+----+----+
A quick note on join – if you use df1.columnindex == df2.columnindex
, this is going to result in duplicated columnindex column, which you will have to solve before sorting it with orderBy
, that’s why it’s easier to pass column name as a list argument to join
as above.