Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to get attribute level lineage "across" spark-jobs. through Spline-UI/AQL ? #1088

Closed
adcb6gt opened this issue May 13, 2022 · 4 comments
Closed

Comments

@adcb6gt
Copy link

adcb6gt commented May 13, 2022

I've an application that sources data from upstream, processes it and then passes it to downstream.
The application does this using an in-house concept called "Workflows".
Workflows are nothing but a set of activities (that can be sequential or parallel). There is a start activity, then set of transformation activities (which can be sequential or parallel) and then an end activity.
Each activity behind the scene translates to a spark-submit job.

As an example, one of our simple workflow would look like:
Source a file --> Select few columns from file --> Apply a filter condition --> Modify the value of a column based on a condition --> Save the modified file

Each of the above activity mentioned above is submitted as a pypsark job. At each step we create a dataframe and then persist to HDFS.

Scroll below frame for complete data view

Activity#      Activity (= pyspark job)   			    Source Action   							Target Action
1	       Source a File             		            df=spark.read.csv('/path/to/source/file/on/hdfs/sourceA.csv') df.write('/path/to/workflow-run-folder/step1.csv')
2	       Select few columns from file    		            df=df.select("col1","col2")				          df.write('/path/to/workflow-run-folder/step2.csv')
3	       Apply a filter condition				    df=df.filter(df.col1='xyz')					  df.write('/path/to/workflow-run-folder/step3.csv')
4	       Modify the value of a column based on a condition    df=df.withColumn("col2", df.col2*3)				 df.write('/path/to/workflow-run-folder/step4.csv')
5	       Save the modified filter												  df.write('/path/to/workflow-run-folder/step5.csv')

We then had requirement to get data lineage.

To achieve this objective, I explored the Spline framework and incorporated it in my application.

We placed the spark-spline-agent bundle on the spark driver (my --deploy-mode is client and --master is yarn) (placed the jar under jars folder of spark-home).
We also brought up the spark-rest-gateway and spline-ui along with arango db (used the spline 0.7 version).

On submitting our spark-jobs, we did the initialized via code by calling the enableLineageTracking on SparkLineageInitializer (for some reason codeless initialization via --packages didn't work).

I can see the spark-jobs listed on Spline-UI under ExecutionEvents screen (one record per spark-job; so for above example, we have 5 spark-jobs listed on Spline-UI).

On clicking on the Execution Plan hyperlink against each job, I can see the lineage that Spline-UI brings through ArangoDB.

Also in ArangoDB, I can see the vertices and edges in graph-view.

However, what I'm looking for is:

We now have a requirement to figure out data lineage "across" the spark jobs we submit.
For above example, I need some way where I provide "col2" as input and it gives me a lineage that should logically translate as:
col2 (Activity-1) -> col2 (Activity-2) -> col2 (Activity-3) -> col2*3 (Activity-4) -> col2*3 (Activity-5)

So if my col2 value at source is, say, value 1, then lineage should be:
1 -> 1 -> 1 -> 3 -> 3

Also against each value, I need to know the activity applied (if it was a pass-through (i.e., no transformations applied) or if there was any transformation on it (in above example, at activity 4, column value was multiplied by 3))

Is there a way I can achieve it by using AQL ?

I understand every spark job will get created as a Document in the document Collection (executionPlan) but if i've to get lineage of a specific attribute across these collections, how can I do it ? In other words, how can i get lineage of an attribute across different spark jobs (i.e., across different execution IDs) ?

Also can I capture the runtime value of the attribute as it goes through different transformations ?
I read through ArangoDB's traverse path graph but not able to get hold of the way i can achieve it via AQL (or throug any other approach in Spline)

Any pointers on this will be highly appreciated.

Thanks !

@wajda
Copy link
Contributor

wajda commented May 13, 2022

From what I understand you are looking for an attribute-level lineage. Spline collects all necessary information (#114) that is required to build attribute-level lineage, and actually shows the path in the boundaries of a single execution plan (go to the execution plan detail and click on any attribute in the operation details pane). However, the AQL query, API and the UI to build attribute-level lineage across jobs have yet to be implemented. The discussion #937 might give you a few hints on how it could be achieved. Basically, every operation has an output schema that consists of attributes. Each attribute has a reference to an expression from which it was created, or to another attribute. There is no link between attributes from different execution plans. So, to build a end-to-end attribute-level lineage you can reuse the same query that builds high-level lineage, build a partial attribute-level graph for every visited execution plan, and then connect input attributes of one graph with output attributes of another one on a later stage of the traversal.

Also can I capture the runtime value of the attribute as it goes through different transformations ?

No, Spark doesn't provide this to the listener, neither does Spline project have any aim to capture actual data.

@adcb6gt
Copy link
Author

adcb6gt commented May 19, 2022

Thanks @wajda for the pointers.
I did a detailed study on the data model you provided and also submitted multiple pyspark jobs and tried to understand how documents get stored in arango across different collections.
Below are the details I did and my follow-up queries based on it:

I've an input dataset consisting of 3 columns, say c1 (string), c2 (string) and c3 (int).
I've applied a lineage on it as follows:
df=spark.read(<input dataset path>) --> df.select(<columns c1,c2,c3>) --> df.withColumn("c2",F.concat(F.col("c2"),F.lit("_ABC"))) --> df.withColumn("c3", df.c3*3) --> df.write
As expected, I've
1 progress (event) document
1 executionPlan document
5 operations document
5 attributes document
and documents under other Vertices and Edges collections

For above lineage, following are the Ids of my operation collection documents:

operation/f3c32a85-ae94-5978-9611-187a7440c694:op-4 --> Read
operation/f3c32a85-ae94-5978-9611-187a7440c694:op-3 --> Transformation (SELECT)
operation/f3c32a85-ae94-5978-9611-187a7440c694:op-2 --> Transformation (on column c2)
operation/f3c32a85-ae94-5978-9611-187a7440c694:op-1 --> Transformation (on column c3)
operation/f3c32a85-ae94-5978-9611-187a7440c694:op-0 --> Write

and following are the Ids fo my attribute collection documents:

attribute/f3c32a85-ae94-5978-9611-187a7440c694:attr-0 --> Original c1 column
attribute/f3c32a85-ae94-5978-9611-187a7440c694:attr-1 --> Original c2 column
attribute/f3c32a85-ae94-5978-9611-187a7440c694:attr-2 --> Original c3 column
attribute/f3c32a85-ae94-5978-9611-187a7440c694:attr-3 --> Modified c2 column
attribute/f3c32a85-ae94-5978-9611-187a7440c694:attr-4 --> Modified c3 column

I'm now in process of building up an attribute level lineage wherein user will provide the attribute key for which lineage needs to be figured out.

LET intake_attr = DOCUMENT("attribute", "f3c32a85-ae94-5978-9611-187a7440c694:attr-4")
LET intake_op = FIRST(
    FOR op IN 1
        INBOUND intake_attr produces
        RETURN op._id
)

LET ancestors_op = (
    FOR op IN 1..999999
        OUTBOUND intake_op follows
        RETURN DISTINCT op._id
)

LET ancestors_attrs = (
    FOR v, e IN 1..999999
        OUTBOUND intake_attr derivesFrom
        LET this_ancestor_attr = {
            "_id": v._id,
            "name": v.name
        }

        RETURN [this_ancestor_attr]
)

LET nodes = (
    FOR a IN UNIQUE(ancestors_attrs[*][0])
        LET ancestor_op_id = FIRST(
            FOR op IN 1
                INBOUND a produces
                RETURN op._id
        )

        RETURN {
            "parent_attribute_id" : PARSE_IDENTIFIER(a._id).key,
            "parent_attribute_name" : a.name,
            "from_operation_id" : PARSE_IDENTIFIER(ancestor_op_id).key
        }
)


RETURN {
    "lineage":PUSH(nodes, {
        "intake_attribute_id" : "f3c32a85-ae94-5978-9611-187a7440c694:attr-4",
        "intake_attribute_name" : intake_attr.name,
        "from_operation_id" : PARSE_IDENTIFIER(intake_op).key
    })
}

Which gives me result as:

[
  {
    "lineage": [
      {
        "parent_attribute_id": "f3c32a85-ae94-5978-9611-187a7440c694:attr-2",
        "parent_attribute_name": "c3",
        "from_operation_id": "f3c32a85-ae94-5978-9611-187a7440c694:op-4"
      },
      {
        "intake_attribute_id": "f3c32a85-ae94-5978-9611-187a7440c694:attr-4",
        "intake_attribute_name": "c3",
        "from_operation_id": "f3c32a85-ae94-5978-9611-187a7440c694:op-1"
      }
    ]
  }
]

So here my starting point is f3c32a85-ae94-5978-9611-187a7440c694:attr-4 (modified column c3) and i'm able to get its lineage till top which is f3c32a85-ae94-5978-9611-187a7440c694:attr-2 (original column c3). I'm interested only on the stages where an attribute is produced. So even if attr-4 is used by op-0, but since its "operation of origin" is op-1, i'm interested in capturing only that operation and i believe is being captured correctly by above query.

I now need to do some addition to this query to capture the expressions/attributes that were used in each operation of origin in the lineage pipeline.

I understand that we have the uses edge collection that goes either to expression or attribute node (depending on how the column value was calculated).
So I need to capture that information as well in this lineage. Something like:

[
  {
    "lineage": [
      {
        "parent_attribute_id": "f3c32a85-ae94-5978-9611-187a7440c694:attr-2",
        "parent_attribute_name": "c3",
        "from_operation_id": "f3c32a85-ae94-5978-9611-187a7440c694:op-4",
        "derived_using":""
      },
      {
        "intake_attribute_id": "f3c32a85-ae94-5978-9611-187a7440c694:attr-4",
        "intake_attribute_name": "c3",
        "from_operation_id": "f3c32a85-ae94-5978-9611-187a7440c694:op-1",
        "derived_using":""
      }
    ]
  }
]

In above, I need to fill in the "derived_using" property and I need to capture value similar to the format that SplineUI does when showing lambda against a column (so if uses is with attribute, i need to get the attribute name and if uses is with expression, i need get the expression in the lambda format as displayed on SplineUI).
I understand that expressions are split into multiple documents based on operands and they need to be collated using takes edge.
Also if I understand correctly an operation can produce multiple attributes from another attributes/expressions. So I need a way to know "how" an attribute was produced by its operation of origin (i.e., what all expressions/attributes were used by the operation of origin using the computedBy and takes (and any other) edges and how to rearrange all of them to have a nice readable lambda).

Any help/pointers on enriching the above AQL will be highly appreciated.

Thanks !

@wajda
Copy link
Contributor

wajda commented Jun 8, 2022

@adcb6gt, I'm sorry for the late response.

According to your question:

"how" an attribute was produced by its operation of origin

In the Spline graph model, there is an edge computedBy that gives you an exact answer you are looking for.
While the derivedFrom edge shows the attribute-to-attribute dependency skipping (or jumping over) the expressions on the path, the computedBy edge points to the very expression that was used to calculate values for the given attribute. So you need to traverse attribute -> computedBy -> expression -> takes ... takes -> attribute to see the detailed (expression-level) attribute lineage.

@wajda
Copy link
Contributor

wajda commented Jun 8, 2022

Regarding the uses edge, it is irrelevant (almost) for your task. The uses edge depicts expressions involved in the Operation level computation, e.g. filtering expression in the "Filter" operation, or "orderBy" expressions in the "Sort" operation etc.
I said almost irrelevant because precisely speaking it depends if you are interested in data or control lineage. Currently Spline focuses on the data lineage, i.e. only tracks dependencies between attributes that are directly involved in the computation. This type of lineage does not take aggregations into account. See the feature request #791

@wajda wajda transferred this issue from AbsaOSS/spline-spark-agent Jun 8, 2022
@AbsaOSS AbsaOSS locked and limited conversation to collaborators Jun 8, 2022
@wajda wajda converted this issue into discussion #1089 Jun 8, 2022
Repository owner moved this from New to Done in Spline Jun 8, 2022

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
Status: Done
Development

No branches or pull requests

2 participants