Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Support lateral view explode function to create indexed views #1004

Closed
A-Gray-Cat opened this issue Jan 1, 2025 · 3 comments
Closed
Labels
enhancement New feature or request

Comments

@A-Gray-Cat
Copy link

Is your feature request related to a problem?
When creating indexed views (materialized views), the function to flatten arrays isn't supported. When creating visualizations for dashboards, arrays aren't very easy to work with since OpenSearch's visualization tooling doesn't support flatten arrays, extract values from arrays, or iterate arrays.

What solution would you like?
To support lateral view explode function and sub-queries.

What alternatives have you considered?
A clear and concise description of any alternative solutions or features you've considered.

Do you have any additional context?
Sample queries

SELECT finding_info.created_time_dt as created_time,
       finding_info.modified_time_dt as modified_time,
       accountid as account_id,
       region,
       class_name,
       finding_info.uid as finding_id,
       severity,
       finding_info.types as finding_types,
       finding_info.title as title,
       finding_info.desc as description,
       case when class_uid = 2002 then to_json(vulnerabilities)
            when class_uid = 2003 then to_json(compliance)
            when class_uid = 2004 then to_json(evidences) end as evidences,
       remediation,
       case when class_uid = 2004 then to_json(r)
            else to_json(resource) end as resource
FROM   table1 a lateral view explode(a.resources) as r
WHERE  time_dt between current_timestamp - interval '7' day
   and current_timestamp
with a as (SELECT finding_info.created_time_dt as created_time,
                  finding_info.modified_time_dt as modified_time,
                  accountid as account_id,
                  region,
                  class_name,
                  finding_info.uid as finding_id,
                  severity,
                  finding_info.types as finding_types,
                  finding_info.title as title,
                  finding_info.desc as description,
                  case when class_uid = 2002 then to_json(vulnerabilities)
                       when class_uid = 2003 then to_json(compliance)
                       when class_uid = 2004 then to_json(evidences) end as evidences,
                  remediation,
                  case when class_uid = 2004 then to_json(r)
                       else to_json(resource) end as resource
           FROM   {regional amazon security lake database name}.amazon_security_lake_table_{security lake region, e.g. us_east_1}_sh_findings_2_0 a lateral view explode(a.resources) as r
           WHERE  accountid in ('{account id}')
              and region = '{region, e.g. us-east-1}'
              and time_dt between current_timestamp - interval '{number of days, e.g. 1,3,7}' day
              and current_timestamp)
SELECT *
FROM   a
WHERE  get_json_object(resource, '$.type') = '{resource type, e.g. AwsEc2Instance}'
@A-Gray-Cat A-Gray-Cat added enhancement New feature or request untriaged labels Jan 1, 2025
@YANG-DB
Copy link
Member

YANG-DB commented Jan 6, 2025

@dai-chen can you please review and comments ?

@dai-chen
Copy link
Collaborator

dai-chen commented Jan 8, 2025

The problem is creating MV with lateral view is not supported, right? Could you share the error if any? I assume indexed view tried to auto-refresh MV and there is limitation in Spark structured streaming to support complex query.

@dai-chen
Copy link
Collaborator

Confirmed that both auto and manual refresh MV can support lateral view. Will close this and feel free to reopen if needed. Thanks!

  test("materialized view with lateral view manual refresh") {
    sql(
      "CREATE TABLE person (id INT, name STRING, age INT, class INT, address STRING);"
    )
    sql("""
        |INSERT INTO person VALUES
        |    (100, 'John', 30, 1, 'Street 1'),
        |    (200, 'Mary', NULL, 1, 'Street 2'),
        |    (300, 'Mike', 80, 3, 'Street 3'),
        |    (400, 'Dan', 50, 4, 'Street 4');
        |""".stripMargin)

    sql(s"""
           | CREATE MATERIALIZED VIEW $testMvName
           | AS
           | SELECT * FROM person
           |    LATERAL VIEW EXPLODE(ARRAY(30, 60)) tableName AS c_age
           |    LATERAL VIEW EXPLODE(ARRAY(40, 80)) AS d_age
           | WITH (
           |   auto_refresh = false
           | )
           |""".stripMargin)
    sql(s"REFRESH MATERIALIZED VIEW $testMvName")

    flint.queryIndex(testFlintIndex).show
  }

  test("materialized view with lateral view auto refresh") {
    sql(
      "CREATE TABLE person (id INT, name STRING, age INT, class INT, address STRING);"
    )
    sql("""
        |INSERT INTO person VALUES
        |    (100, 'John', 30, 1, 'Street 1'),
        |    (200, 'Mary', NULL, 1, 'Street 2'),
        |    (300, 'Mike', 80, 3, 'Street 3'),
        |    (400, 'Dan', 50, 4, 'Street 4');
        |""".stripMargin)

    withTempDir { checkpointDir =>
      sql(s"""
           | CREATE MATERIALIZED VIEW $testMvName
           | AS
           | SELECT * FROM person
           |    LATERAL VIEW EXPLODE(ARRAY(30, 60)) tableName AS c_age
           |    LATERAL VIEW EXPLODE(ARRAY(40, 80)) AS d_age
           | WITH (
           |   auto_refresh = true,
           |   checkpoint_location = '${checkpointDir.getAbsolutePath}'
           | )
           |""".stripMargin)

      // Wait for streaming job complete current micro batch
      val job = spark.streams.active.find(_.name == testFlintIndex)
      job shouldBe defined
      failAfter(streamingTimeout) {
        job.get.processAllAvailable()
      }

      flint.queryIndex(testFlintIndex).show
    }
  }

+----+-----+----+---+-----+--------+-----+
|name|c_age| age| id|d_age| address|class|
+----+-----+----+---+-----+--------+-----+
|John|   30|  30|100|   40|Street 1|    1|
|John|   30|  30|100|   80|Street 1|    1|
|John|   60|  30|100|   40|Street 1|    1|
|John|   60|  30|100|   80|Street 1|    1|
|Mary|   30|NULL|200|   40|Street 2|    1|
|Mary|   30|NULL|200|   80|Street 2|    1|
|Mary|   60|NULL|200|   40|Street 2|    1|
|Mary|   60|NULL|200|   80|Street 2|    1|
|Mike|   30|  80|300|   40|Street 3|    3|
|Mike|   30|  80|300|   80|Street 3|    3|
|Mike|   60|  80|300|   40|Street 3|    3|
|Mike|   60|  80|300|   80|Street 3|    3|
| Dan|   30|  50|400|   40|Street 4|    4|
| Dan|   30|  50|400|   80|Street 4|    4|
| Dan|   60|  50|400|   40|Street 4|    4|
| Dan|   60|  50|400|   80|Street 4|    4|
+----+-----+----+---+-----+--------+-----+

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants