a = ( domains_nameservers.select("*") .fillna("1970-01-01", ["first_seen"]) .fillna("2022-12-31", ["last_seen"]) .groupBy("domain_id", "last_seen", "first_seen") .agg(F.countDistinct("*").name("counter")) .orderBy(F.asc("counter")) ) import sys import pyspark.sql.functions as func from pyspark.sql.window import Window windowSpec = Window.partitionBy(F.col("domain_id")).orderBy( F.col("last_seen"), F.col("first_seen") ) b = a.select("*").withColumn( "grp", ((F.datediff(F.col("first_seen"), F.lag(F.col("last_seen"), 1).over(windowSpec)))), ) c = b.select("*").withColumn( "match", F.when( ( ( F.floor( F.datediff( F.col("first_seen"), F.lag(F.col("last_seen"), 1).over(windowSpec), ) / 91 ) ) ) > 1, "1", ).otherwise("0"), ) w = ( Window.partitionBy("domain_id") .orderBy("last_seen", "first_seen") .rowsBetween(Window.unboundedPreceding, Window.currentRow) ) newDF = c.select("*").withColumn("val_sum", F.sum(F.col("match")).over(w)) nom = ( newDF.select("*") .groupBy("domain_id", "val_sum") .agg(F.min("first_seen").name("first_seen"), F.max("last_seen").name("last_seen")) .where("first_seen!='1970-01-01' and last_seen!='2022-12-31'") )