Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Pandera / PySpark utilities graphlet.etl to transform / validate multiple datasets into a uniform ontology #1

Open
rjurney opened this issue Aug 1, 2022 · 0 comments
Assignees
Labels
enhancement New feature or request etl Extract, Transform, Load (ETL)

Comments

@rjurney
Copy link
Contributor

rjurney commented Aug 1, 2022

Summary

This ticket is to create utilities - including node and edge base classes - that provide an object-oriented interface with runtime validation for defining data types in a property graph ontology to transform multiple datasets into via ETL in a way that works with Spark schemas to allow for code re-use rather than writing UDFs for each dataset individually.

How do datasets that map to a single class in an ontology vary?

Datasets that map to the same class or set of classes in a knowledge graph (property graph) ontology can vary in different ways:

  • Naming - Field names vary for the same properties of a given entity.

  • Structure - Records have different structures. Schemas can be flat or nested. One may use lists of objects while another uses one object with column names mapping to lists of values.


  • Formats - Datasets implement different standard data formats at the file or field level. One file is CSV, another Parquet. One timestamp field uses *nix timestamps and another ISO datetimes.

  • Values - The same field name and format may have values with different errors that require cleaning. Fixes for these errors must not break correct records in other datasets.

  • Logical - Datasets may exist in different logical forms when ingested. An application ontology may have the concept of (company)—officership—>(person), while a dataset at ingestion may be two files: companies and their officers (both an officership and a person).

Even in a distributed system like Spark, large datasets can be relatively slow to process. This creates a slow feedback loop to clean data and resolve format disparities. Sampling helps but problems may occur in any one record, requiring a full run over all the data to validate. Fixing issues among each dataset we ingest is a time-consuming problem.

What is pandera and how can it help?

Pandera explains itself as:

data validation library for scientists, engineers, and analysts seeking correctness.

pandera provides a flexible and expressive API for performing data validation on dataframe-like objects to make data processing pipelines more readable and robust.

Dataframes contain information that pandera explicitly validates at runtime. This is useful in production-critical data pipelines or reproducible research settings.

@pandera.decorators.check_types

The check_types decorator can be used to define the input and output schmas of a function that transforms a DataFrame.

Sources of improvements in productivity from our ETL

  1. Code re-use for validation and transformation: Without any help from Pandera, if we JUST create one class for each type in an ontology with validation and ETL code we write manually for each class rather than doing custom ETL scripts for each file we ingest like awards.csv and comedy.csv that has Movies in it, that alone makes things more efficient. Just because of the benefit of the pattern of organization in a standard interface and more importantly the code reuse. Fortunately we can do better than this with the features Pandera provides :)

  2. Pandera validation tools have powerful features: Pandera can define a Schema Model for each class in an ontology that has Checks that can help validate fields. Things get more efficient because now you have Pandera checks to validate your data once per class, rather than once per file.

  3. Pandera transformation functions

  4. Pandera column validation tools are based on pd.DataFrame and pd.Series vector methods, which are much more efficient via pandera.pyspark via PySpark pandas_udfs. This is much faster than trying to write your own validation code yourself at a record or row level.

  5. Pandera makes debugging data much more efficient with lazy evaluation - so records can wait to emit SchemaErrors if they are given the schema.validate(df, lazy=True) parameter.

  6. When you factor in that with Pandera you can get not just ONE error and the entire PySpark job dies... you can get ALL errors in ALL fields in ALL records... now you are talking about a SERIOUS upgrade to building a knowledge graph because right now the ENTIRE JOB fails and you get information about a single row.

You go through that over and over. You miss problems because your data for the new drama.csv file isn't really verified - and down your data pipeline when incorporating new custom movie reviews into the dataset you find a problem you missed during ETL of drama.csv. By building up a capability in Movie class, you get faster and faster as you add datasets to the class! Each one makes the ones that come after faster.

Let's say you start a Movie class and load horror.csv. Then someone sends you comedies.csv to add to the graph. You don't even have to start out by doing any ETL... you can just write code that fills out the fields for the Movie class with values from comedies.csv and... see if it validates. You do no work by default.

Let's say you find a problem... you solve it by improving validation, first off. That now works for action.csv - as does improving the robustness of your transformations.

PyDantic --> Pandera

Note: While an initial attempt at implementing this used Pydantic there were serialization issues with PySpark and it was not featureful for this problem. On the other hand Pandera is a much better fit as it already integrates with both Pandas, PySpark and Dask. Not all references have yet been updated in the issues and code, but they will be.

ETL is Hard: Problems and Solutions

Several challenges arise when transforming multiple large datasets into a single schema using PySpark making code re-use difficult to accomplish.

PROBLEM: Ingested Datasets Vary for the same Class in an Ontology

Datasets destined for the same entity in an ontology may vary greatly and require different transformation logic.

For example a Movie class in your ontology might look like this:

# TODO: update from Pydantic to Pandera!
    class Movie(NodeBase):
        """A film node in hollywood."""

        entity_type: str = "movie"
        genre: str
        title: str
        year: str
        length: int = 0
        gross: int = 0
        rating: str

        @validator("length", pre=True)
        def convert_hours_minutes_to_int_minutes(cls, x):
            if x and isinstance(x, str):
                x = text_runtime_to_minutes(x)
            return x

But two datasets you load in PySpark to add to your knowledge graph might look like this... they have movies in them but use a different concept - awards and a sub-class of movies called comedies.

# Movie awards
awards = spark.read.option("header", "true").csv("tests/data/awards.csv")
awards.show()

# A genre of movies
comedies = spark.read.option("header", "true").csv("tests/data/comedy.csv")
comedies.show()

You need to get Movies out of these things, but you need to write custom ETL code to do so! What a bummer :( Or do you...

SOLUTION: Create an Object-Oriented Interface for Code Reuse

While datasets at ingestion can vary dramatically, the goal of ETL for a property graph is to transform them into a single object-oriented form. We can optimize the process by centralizing all ETL by in instances of a single class that handles transformation, validation and summarization in a fully reusable way such that each dataset added makes the class more robust - meaning less work per dataset for each dataset that you add to a given type!

PROBLEM: What good are classes for ETL if they aren’t usable in PySpark?

Extending a base class for ETL for every entity in an ontology isn’t very useful if they can’t work with PySpark’s APIs.

SOLUTION: Pandera has PySpark/Pandas Support!

Fortunately Pandera has Pandas and PySpark support. For traditional PySpark support check out pandera.typing.pyspark. This seems like the way to go... but I am not sure how it will work with the Pandera Schema Models.

For an interface to essentially map a Python function into Spark to run it, check out Fugue.

PROBLEM: We had 10 nodes, now we did entity resolution and we have 2 nodes made up of 5 nodes each. How do we work with and represent this data efficiently?

You DEFINITELY need to re-validate the aggregate nodes when you summarize them.

SOLUTION: We extend Pandas classes with an interface for summarizing records using PySpark/Pandas/Pandera GROUP BY / aggregate functions!

Once you perform entity resolution and you GROUP BY, you start out with a list of nodes inside a big master node... the ones inside are all the same Person, say. Names of Russell Jurney, Russ Jurney, Russell H Jurney, Russ H Jurney, Russell Journey - these are actually real variations of my name in business registries across the US 🙂

So you need to take each field - sometimes multiple fields at once if you need access to the others for logical reasons - and summarize them in a way that creates a more accessible format for the master record. Sometimes you do want a list of records... but more commonly you DO NOT want a list of 5 records that have name fields with various versions of a name, you want a single record with a list field for name.

{
  "name": ["Russell Jurney", "Russell H. Jurney", "Russ H Jurney", ...],
  "address": [{"street": "365 ...", ...}, {"street": "102 ...", ...}, {"street": "365 ...", ...}],
  ...
}

How do we use or extend Pandera classes for our ontology with an interface and process to accomplish this?

PROBLEM: You don't actually just want to create lists of the values, you want to de-duplicate the values!

But you don't want the duplicates in field names like addresses, you want to de-duplicate the values!

{
  "name": ["Russell Jurney", "Russell H. Jurney", "Russ H Jurney", ...],
  "address": [{"street": "365 ...", ...}, {"street": "102 ...", ...}, {"street": "365 ...", ...}],
  ...
}

How do we use or extend Pandera classes for our ontology with an interface and process to accomplish this?

PROBLEM: Not all summarizations are just list-building or deduplication... some involve creating a single, summarized value of the same or a different type!

Sometimes you have a more complicated summarization method, such as when you really need ONE value for a field in an aggregate node. One such case is when you want to create a single value of a field to easily compare one aggregate node to another one, such as via a distance measure such as cosine similarity and a vector - an embedding.

What if you want to combine 5 names into a single vector representation using the average of 5 Word2Vec embedding vectors of the names? Before you balk, remember that they are the same name... this might work even across languages with the right embedding :)

What about a more sophisticated embedding technique such as using sentence transformers and mean or max pooling to create a single representation for a name?

PROBLEM: Do the individual and aggregate, summarized nodes have the same schema? The same class? How do you handle the fact that some are singular in their values and some are lists? Shit?

Are the aggregate nodes different classes? Shouldn't be. Crap. Should be?

SOLUTION: How do we handle this?

One strategy is to make all fields of nodes and edges contain individual items or lists, such as for an int field use a type hint instead of typing.Union[int, typing.List[int]]. There can be chains of consequences for complex type hints however, and we would need to consider this carefully.

What else do we need from these ontology classes?

See #3 for more information, but we need to be able to use the Schema Model to generate a "Ditto format" text version of a node or edge as in ditto/ditto_light/summarize.py:

    def transform(self, row, max_len=128):
        """Summarize one single example.
        Only retain tokens of the highest tf-idf
        Args:
            row (str): a matching example of two data entries and a binary label, separated by tab
            max_len (int, optional): the maximum sequence length to be summarized to
        Returns:
            str: the summarized example
        """
        sentA, sentB, label = row.strip().split('\t')
        res = ''
        cnt = Counter()
        for sent in [sentA, sentB]:
            tokens = sent.split(' ')
            for token in tokens:
                if token not in ['COL', 'VAL'] and \
                   token not in stopwords:
                    if token in self.vocab:
                        cnt[token] += self.idf[self.vocab[token]]

For the full code, see https://github.com/megagonlabs/ditto/blob/master/ditto_light/summarize.py#L63-L84

Ditto text encoding of structured records for embedding with a pre-trained language model like a sentence transformer

Building Knowledge Graphs as Property Graphs

Building a knowledge graph as a property graph from multiple datasets requires doing a lot of ETL as a pre-processing step to transform several schemas representing the same thing into a single schema within a uniform ontology so that records can be merged into a pair of tables for nodes and edges and processed with code or statistical models specific to that type of entity rather than each dataset that make it up.

Bro, why not RDF? SPARQL!

While there are systems and algorithms specific to RDF triples that avoid ETL up front, they put ETL off until query time where it must be repeated once per query per dataset involved and require property oriented query languages like SPARQL which many people find difficult to work with as humans think most easily in terms of objects with properties while SPARQL requires reification or restoration of objects from properties. RDF is metadata, not data.

image

While some machine learning is specific to triples, doing machine learning at scale as part of knowledge graph construction is made easier once a graph is stored as an object as most tools, models and algorithms do not support triple form.

A Note on Medallion Tables

Calling Medallion Tables a Medallion Architecture is a bit of an exaggeration but the concepts are useful. Data is ingested into bronze tables where a unique identifier might be added but otherwise data is raw so it can be accessed in its original form at any time for operations like debugging downstream tables by examining the unaltered data. These are cleaned, enriched, combined and transformed into one or more intermediate silver tables that may be clean versions of the original data type or entirely new concepts. Silver tables chain into other silver tables. When data is prepared for display for a user or storage in an external system it is stored in a gold table. Finally, there are the palladium external tables in other systems that gold tables are transferred to such as in Graphlet AI in an OpemSearch table.

Dude… that’s a LOT of ETL

Part of building a property graph database out of myriad sources of data is transforming disparate formats representing the same type of entity into the schema of an entity in a uniform ontology.

That means several bronze tables for each entity in our ontology…

image

And at least one silver table for each entity type in our ontology.

image

Ok but… hey, that’s a lot of ETL! I want to play with graphs! Don’t you know that ETL of large datasets is hard! Now you get the motivation for this issue :)

@rjurney rjurney added enhancement New feature or request etl Extract, Transform, Load (ETL) labels Aug 1, 2022
@rjurney rjurney self-assigned this Aug 1, 2022
@rjurney rjurney changed the title Create a Pydantic/PySpark base class for transforming multiple entities into a uniform ontology Create a Pydantic / PySpark base class for transforming multiple entities into a uniform ontology Aug 1, 2022
@rjurney rjurney changed the title Create a Pydantic / PySpark base class for transforming multiple entities into a uniform ontology Create a Pydantic / PySpark base class in graphlet.etl for transforming multiple entities into a uniform ontology Aug 9, 2022
@rjurney rjurney changed the title Create a Pydantic / PySpark base class in graphlet.etl for transforming multiple entities into a uniform ontology Create Pandera / PySpark utilities graphlet.etl for transforming multiple datasets into a uniform ontology Aug 21, 2022
@rjurney rjurney changed the title Create a Pydantic / PySpark base class in graphlet.etl for transforming multiple entities into a uniform ontology Create Pandera / PySpark utilities graphlet.etl for transforming multiple datasets into a uniform ontology Aug 21, 2022
@rjurney rjurney changed the title Create Pandera / PySpark utilities graphlet.etl for transforming multiple datasets into a uniform ontology Create Pandera / PySpark utilities graphlet.etl for transforming / validating multiple datasets into a uniform ontology Aug 25, 2022
@rjurney rjurney changed the title Create Pandera / PySpark utilities graphlet.etl for transforming / validating multiple datasets into a uniform ontology Create Pandera / PySpark utilities graphlet.etl to transform / validate multiple datasets into a uniform ontology Aug 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request etl Extract, Transform, Load (ETL)
Projects
None yet
Development

No branches or pull requests

1 participant