a = (
domains_nameservers.select("*")
.fillna("1970-01-01", ["first_seen"])
.fillna("2022-12-31", ["last_seen"])
.groupBy("domain_id", "last_seen", "first_seen")
.agg(F.countDistinct("*").name("counter"))
.orderBy(F.asc("counter"))
)
import sys
import pyspark.sql.functions as func
from pyspark.sql.window import Window
windowSpec = Window.partitionBy(F.col("domain_id")).orderBy(
F.col("last_seen"), F.col("first_seen")
)
b = a.select("*").withColumn(
"grp",
((F.datediff(F.col("first_seen"), F.lag(F.col("last_seen"), 1).over(windowSpec)))),
)
c = b.select("*").withColumn(
"match",
F.when(
(
(
F.floor(
F.datediff(
F.col("first_seen"),
F.lag(F.col("last_seen"), 1).over(windowSpec),
)
/ 91
)
)
)
> 1,
"1",
).otherwise("0"),
)
w = (
Window.partitionBy("domain_id")
.orderBy("last_seen", "first_seen")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
newDF = c.select("*").withColumn("val_sum", F.sum(F.col("match")).over(w))
nom = (
newDF.select("*")
.groupBy("domain_id", "val_sum")
.agg(F.min("first_seen").name("first_seen"), F.max("last_seen").name("last_seen"))
.where("first_seen!='1970-01-01' and last_seen!='2022-12-31'")
)