Latest     About     Archive

Handling Duplicates in Spark - Multiple Use Cases

val teamsDs = spark.read
.option("inferSchema", "true")
.json("src/main/resources/data/teamsWithDuplicates.json")
.as[Team]

teamsDs.show()

As we can see our dataset has 6 rows, and for id 0 we have 3 duplicates.

2 exact duplicates and another that has a appointedTime later than the other two.

+-------------+-------+---+-----------------+---------+
|appointedTime|country|id |manager          |name     |
+-------------+-------+---+-----------------+---------+
|2016-10-01   |England|0  |Jurgen Klopp     |Liverpool|
|2016-10-01   |England|0  |Jurgen Klopp     |Liverpool|
|2016-10-02   |England|0  |Jurgen Klopp     |Liverpool|
|2021-05-01   |France |1  |Pochetino        |PSG      |
|2020-10-01   |Italy  |2  |Luciano Spalletti|Napoli   |
|2012-06-01   |England|3  |Pep Guardiola    |Man City |
+-------------+-------+---+-----------------+---------+

Use case 1a - Removing exact duplicates with distinct()

  // distinct is identical to dropDuplicates.
  // it is an alias as we see in this Spark source code dropDuplicates.
  // def distinct(): Dataset[T] = dropDuplicates()
  teamsDs.distinct().show()
+-------------+-------+---+-----------------+---------+
|appointedTime|country| id|          manager|     name|
+-------------+-------+---+-----------------+---------+
|   2016-10-01|England|  0|     Jurgen Klopp|Liverpool|
|   2012-06-01|England|  3|    Pep Guardiola| Man City|
|   2020-10-01|  Italy|  2|Luciano Spalletti|   Napoli|
|   2021-05-01| France|  1|        Pochetino|      PSG|
|   2016-10-02|England|  0|     Jurgen Klopp|Liverpool|
+-------------+-------+---+-----------------+---------+

Use case 1b - Removing exact duplicates with dropDuplicates()

  // returns a new DS that contains only unique rows
  // any 2 rows that are identical, (for every column), one will be dropped
  teamsDs.dropDuplicates().show()
+-------------+-------+---+-----------------+---------+
|appointedTime|country| id|          manager|     name|
+-------------+-------+---+-----------------+---------+
|   2016-10-01|England|  0|     Jurgen Klopp|Liverpool|
|   2012-06-01|England|  3|    Pep Guardiola| Man City|
|   2020-10-01|  Italy|  2|Luciano Spalletti|   Napoli|
|   2021-05-01| France|  1|        Pochetino|      PSG|
|   2016-10-02|England|  0|     Jurgen Klopp|Liverpool|
+-------------+-------+---+-----------------+---------+

Use case 2 - Removing duplicates based on a column(s) with dropDuplicates(colName: String)

  // will only drop records that have the unique id columns
  teamsDs.dropDuplicates("id").show()
+-------------+-------+---+-----------------+---------+
|appointedTime|country| id|          manager|     name|
+-------------+-------+---+-----------------+---------+
|   2016-10-01|England|  0|     Jurgen Klopp|Liverpool|
|   2021-05-01| France|  1|        Pochetino|      PSG|
|   2020-10-01|  Italy|  2|Luciano Spalletti|   Napoli|
|   2012-06-01|England|  3|    Pep Guardiola| Man City|
+-------------+-------+---+-----------------+---------+

But, what if you only the latest record in the system is the correct record? If we get 3 duplicates (by id), but one has a later timestamp than the others Usually, the latest record would be the correct version. In this case we need to create a better way to handle these duplicates

Use case 3 -

  // create a Window partitioned by id, ordered by latest (desc) appointedTime
  val window = Window.partitionBy("id").orderBy($"appointedTime".desc)
  // non duplicate rows will have rank = 1
  // duplicate rows will be ranked by appointedTime, with latest appointedTime = 1
  // and later duplicate records going from 2,3,4... etc
  val rankedByDuplicates = teamsDs.withColumn("rank", row_number().over(window))
  // only keep results that have a rank of 1
  // any results > 1 will be duplicates and filtered
  // leaving only latest duplicate record
  rankedByDuplicates.filter($"rank"  === 1).show()
+-------------+-------+---+-----------------+---------+----+
|appointedTime|country| id|          manager|     name|rank|
+-------------+-------+---+-----------------+---------+----+
|   2016-10-02|England|  0|     Jurgen Klopp|Liverpool|   1|
|   2021-05-01| France|  1|        Pochetino|      PSG|   1|
|   2020-10-01|  Italy|  2|Luciano Spalletti|   Napoli|   1|
|   2012-06-01|England|  3|    Pep Guardiola| Man City|   1|
+-------------+-------+---+-----------------+---------+----+