Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Random duplicates in Kusto table after ingestion from Spark (Azure Databricks) #412

Open
edobrynin-dodo opened this issue Dec 23, 2024 · 0 comments

Comments

@edobrynin-dodo
Copy link

Describe the bug
I have external Delta table in Azure Databricks based on Azure ADLS Gen 2. I perform full ingestion from this table to the related Kusto table daily. Initially the data is being ingested into the temp Kusto table, and them I perform moving extents operation between the temp & target table in Kusto. During this operation the duplicates of some random rows appear in the target Kusto table, though Databricks table doesn't contains this duplicates. This specific duplicates rows can dissapeat after the re-run the job, but other ones can appear.

To Reproduce
This is the full code snippet:

import json
import time
from pyspark.sql import SparkSession, functions as F
from pyspark.dbutils import DBUtils
from typing import Dict, Type

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoError
from pyspark.sql import DataFrame
from pyspark.sql.types import (
    ArrayType,
    BinaryType,
    BooleanType,
    ByteType,
    DataType,
    DateType,
    DecimalType,
    DoubleType,
    FloatType,
    IntegerType,
    LongType,
    MapType,
    ShortType,
    StringType,
    StructType,
    TimestampType,
)

reader_options = {}
source_path = "abfss://path_to_source_table"
kusto_connection_string = "azure_secret_with_connection"
kusto_table = "metrics_MP_CR_to_Menu"
table_name = source_path.split("/")[-1]

ingestion_mode = None
clientBatchingLimit = None

SparkTypeToKustoTypeMap: Dict[Type, str] = {
    StringType: "string",
    BooleanType: "bool",
    DateType: "datetime",
    TimestampType: "datetime",
    DecimalType: "decimal",
    DoubleType: "real",
    FloatType: "real",
    ByteType: "int",
    BinaryType: "string",
    IntegerType: "int",
    LongType: "long",
    ShortType: "int",
}


def create_kusto_client(kusto_conf: dict):
    kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
        kusto_conf["kustoCluster"],
        kusto_conf["kustoAadAppId"],
        kusto_conf["kustoAadAppSecret"],
        kusto_conf["kustoAadAuthorityID"],
    )
    return KustoClient(kcsb)


def replace_content(client, database, target_table, source_table):
    params = {"target_table": target_table, "source_table": source_table}
    query = """.replace extents in table {target_table} <|
             {{ .show table {target_table} extents }},
             {{ .show table {source_table} extents }}""".format(**params)

    client.execute(database, query)


def table_exists(client, database, table_name):
    try:
        res = client.execute(database, ".show tables")[0]
    except KustoError:
        return False

    tables = {table[0] for table in res}
    return table_name in tables


def drop_table(client: KustoClient, database: str, table_name: str):
    query = f".drop table {table_name} ifexists"
    client.execute(database, query)


def create_table(
    client: KustoClient,
    database: str,
    source: DataFrame,
    table_name: str,
    hidden: bool = False,
):
    table_schema_builder = []
    for field in source.schema.fields:
        field_type = get_spark_type_to_kusto_type_map(field.dataType)
        table_schema_builder.append(f"['{field.name}']:{field_type}")

    tmp_table_schema = ",".join(table_schema_builder)
    query = f".create table {table_name} ({tmp_table_schema})"
    if hidden:
        query += "with(hidden=true)"
    client.execute(database, query)


def get_spark_type_to_kusto_type_map(field_type: DataType):
    if isinstance(field_type, DecimalType):
        return "decimal"
    if isinstance(field_type, (ArrayType, StructType, MapType)):
        return "dynamic"
    return SparkTypeToKustoTypeMap[type(field_type)]


def get_max_sink_timestamp(client, database, table_name, overlap_column):
    query = f""" {table_name}
              | summarize LatestRecordTimestamp=max({overlap_column})
          """
    try:
        res = client.execute(database, query)
        return res.primary_results[0][0][0]
    except KustoError:
        return None


def get_kusto_options():
    dbutils = DBUtils(spark)
    connection = json.loads(dbutils.secrets.get(scope="keyvault", key=kusto_connection_string))

    return {
        "kustoCluster": connection["cluster"],
        "kustoDatabase": connection["database"],
        "kustoTable": "metrics_MP_CR_to_Menu",
        "kustoAadAppId": connection["aadAppId"],
        "kustoAadAppSecret": connection["aadAppSecret"],
        "kustoAadAuthorityID": connection["aadAuthorityID"],
        "clientBatchingLimit": clientBatchingLimit if clientBatchingLimit else "100",
        "writeEnableAsync": False,
        "tableCreateOptions": "CreateIfNotExist"
    }


spark = (
    SparkSession.builder.appName("serving.metrics_MP_CR_to_Menu")
    .getOrCreate()
)

options = get_kusto_options()
database = options["kustoDatabase"]
origin_table = options["kustoTable"]


try:
    print("Job has been started.")

    client = create_kusto_client(options)
    temp_table = f"_temp_{origin_table}_{int(time.time())}"

    source = (
        spark
        .read
        .format("delta")
        .options(**reader_options)
        .load(source_path)
        .withColumn("LoadDateTime", F.current_timestamp())
        .withColumn("LoadDate", F.current_date())
    )

    create_table(client, database, source, temp_table, hidden=True)

    (
        source
        .write
        .format("com.microsoft.kusto.spark.datasource")
        .options(**options)
        .option("kustoTable", temp_table)
        .mode("Append")
        .save()
    )

    drop_table(client, database, origin_table)
    create_table(client, database, source, origin_table)

    replace_content(client, database, origin_table, temp_table)
    drop_table(client, database, temp_table)

except KeyboardInterrupt:
    print("Job was cancelled by the user or because of re-deployment")
except BaseException as exc:
    import traceback
    tbc = traceback.format_exc()
    print("Job failed with an error:\n%s\n\n%s", exc, tbc)
    raise exc

Expected behavior
Target Kusto table contains exactly the same rows as Databricks source table.

Screenshots
Image
Image
Image
Image

Desktop (please complete the following information):

  • Databricks cluster: Driver: Standard_D4ads_v5 · Workers: Standard_D4ads_v5 · 1-8 workers · DBR: 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)
  • kusto-spark_3.0_2.12-5.0.6-jar-with-dependencies.jar
  • azure-kusto-data==4.2.0

Additional context
The first 2 screenshots show that tables have different count of rows after ingestion.
The second 2 screenshots show an example of single row in Databricks table which has duplicates in target Kusto table after ingestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant