Skip to content

Commit

Permalink
Merge pull request #33 from hasura/m-bilal/support-subtype-sorting-en…
Browse files Browse the repository at this point in the history
…t-124

support sorting by subtypes
  • Loading branch information
m-Bilal authored Nov 7, 2024
2 parents b50900b + 3a247ff commit f0a094d
Show file tree
Hide file tree
Showing 11 changed files with 900 additions and 117 deletions.
4 changes: 2 additions & 2 deletions connector/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func prepareAggregateQuery(ctx context.Context, aggregates schema.QueryAggregate
if ok {
aggregationColumn, path = joinFieldPath(state, fieldPath, aggregationColumn, collection)
}
validField := internal.ValidateFieldOperation(state, "aggregate", collection, aggregationColumn)
validField := internal.ValidateAggregateOperation(state.SupportedAggregateFields, collection, aggregationColumn)

if validField == "" {
return nil, schema.UnprocessableContentError("aggregation not supported on this field", map[string]any{
Expand Down Expand Up @@ -106,7 +106,7 @@ func prepareAggregateColumnCount(ctx context.Context, field string, path string,
// If the field is nested, it generates a nested query to perform the specified function on the field in the nested document.
func prepareAggregateSingleColumn(ctx context.Context, function, field string, path string, aggName string) (map[string]interface{}, error) {
// Validate the function
if !internal.Contains(validFunctions, function) {
if !internal.Contains(internal.ValidFunctions, function) {
return nil, schema.UnprocessableContentError("invalid aggregate function", map[string]any{
"value": function,
})
Expand Down
66 changes: 10 additions & 56 deletions connector/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/hasura/ndc-elasticsearch/types"
"github.com/hasura/ndc-elasticsearch/internal"
"github.com/hasura/ndc-sdk-go/schema"
)

Expand Down Expand Up @@ -32,7 +33,7 @@ func handleFieldTypeAggregateMetricDouble(fieldMap map[string]interface{}) {
metricFields[metricValue] = schema.ObjectField{Type: schema.NewNamedType("double").Encode()}
}
}
objectTypeMap[fieldType] = schema.ObjectType{
internal.ObjectTypeMap[fieldType] = schema.ObjectType{
Fields: metricFields,
}
}
Expand All @@ -48,15 +49,15 @@ func handleFieldTypeAggregateMetricDouble(fieldMap map[string]interface{}) {
// This compound scalar type supports a superset of comparison and aggregation operations of all its subtypes and the actualType
// This compund scalar type is added to the scalarTypeMap before being returned
func GetFieldType(fieldMap map[string]interface{}, state *types.State, indexName string, fieldName string) string {
fieldTypes := extractTypes(fieldMap)
fieldTypes := internal.ExtractTypes(fieldMap)
actualFieldType := fieldTypes[0] // actualFieldType is the type type of the field that the db has. It is the main type, not the subtype

if len(fieldTypes) > 1 {
// subtypes present
// we need to sort fields by priority
// because the fields that can represent the most format of data should be added at the end,
// so that their comparison operators are not overridden by the fields that can represent less formats and same for aggregate functions
sortTypesByPriority(fieldTypes)
internal.SortTypesByPriority(fieldTypes)
}

allSupportedComparisonOperations := make(map[string]schema.ComparisonOperatorDefinition)
Expand Down Expand Up @@ -118,31 +119,14 @@ func GetFieldType(fieldMap map[string]interface{}, state *types.State, indexName
return scalarType
}

// Given a fieldMap, this function extracts the type and all subtypes (if present)
func extractTypes(fieldMap map[string]interface{}) (fieldAndSubfields []string) {
if subFields, ok := hasSubfields(fieldMap); ok {
for _, subFieldData := range subFields {
fieldAndSubfields = append(fieldAndSubfields, extractTypes(subFieldData.(map[string]interface{}))...)
}
}

fieldType, _ := fieldIsScalar(fieldMap)
fieldAndSubfields = append([]string{fieldType}, fieldAndSubfields...)
return fieldAndSubfields
}

func appendCompoundTypeToStaticTypes(typeName string, sortOperations map[string]schema.ComparisonOperatorDefinition, aggegateOperations schema.ScalarTypeAggregateFunctions, actualFieldType string) {
scalarTypeMap[typeName] = schema.ScalarType{
internal.ScalarTypeMap[typeName] = schema.ScalarType{
AggregateFunctions: aggegateOperations,
ComparisonOperators: sortOperations,
Representation: scalarTypeMap[actualFieldType].Representation,
Representation: internal.ScalarTypeMap[actualFieldType].Representation,
}
}

func hasSubfields(fieldMap map[string]interface{}) (subFields map[string]interface{}, ok bool) {
subFields, ok = fieldMap["fields"].(map[string]interface{})
return subFields, ok
}

// This function takes a fieldType and checks whether it
// 1. supports comparison operations
Expand All @@ -161,11 +145,11 @@ func processFieldType(fieldMap map[string]interface{}, fieldType string) (suppor
fieldDataEnalbed = fieldData
}

if isSortSupported(fieldType, fieldDataEnalbed) {
supportedComparisionOperations = scalarTypeMap[fieldType].ComparisonOperators
if internal.IsSortSupported(fieldType, fieldDataEnalbed) {
supportedComparisionOperations = internal.ScalarTypeMap[fieldType].ComparisonOperators
}
if isAggregateSupported(fieldType, fieldDataEnalbed) {
supportedAggregationOperations = scalarTypeMap[fieldType].AggregateFunctions
if internal.IsAggregateSupported(fieldType, fieldDataEnalbed) {
supportedAggregationOperations = internal.ScalarTypeMap[fieldType].AggregateFunctions
}
if fieldType == "wildcard" {
unstructuredTextSupported = true
Expand All @@ -190,33 +174,3 @@ func appendAggregationOperations(supersetAggOps schema.ScalarTypeAggregateFuncti
}
return supersetAggOps
}

// isSortSupported checks if a field type is supported for sorting
// based on fielddata and unsupported sort data types.
func isSortSupported(fieldType string, fieldDataEnalbed bool) bool {
if fieldDataEnalbed {
return true
}
for _, unSupportedType := range unsupportedSortDataTypes {
if fieldType == unSupportedType {
return false
}
}
return true
}

// isAggregateSupported checks if a field type is supported for aggregation
// based on fielddata and unsupported aggregate data types.
func isAggregateSupported(fieldType string, fieldDataEnalbed bool) bool {
if fieldDataEnalbed {
return true
}

for _, unSupportedType := range unSupportedAggregateTypes {
if fieldType == unSupportedType {
return false
}
}

return true
}
6 changes: 3 additions & 3 deletions connector/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func executeQuery(ctx context.Context, configuration *types.Configuration, state
prepareContext, prepareSpan := state.Tracer.Start(ctx, "prepare_elasticsearch_query")
defer prepareSpan.End()

dslQuery, err := prepareElasticsearchQuery(prepareContext, request, state, index)
dslQuery, err := prepareElasticsearchQuery(prepareContext, request, state, index, configuration)
if err != nil {
prepareSpan.SetStatus(codes.Error, err.Error())
return nil, err
Expand Down Expand Up @@ -114,7 +114,7 @@ func executeQuery(ctx context.Context, configuration *types.Configuration, state
}

// prepareElasticsearchQuery prepares an Elasticsearch query based on the provided query request.
func prepareElasticsearchQuery(ctx context.Context, request *schema.QueryRequest, state *types.State, index string) (map[string]interface{}, error) {
func prepareElasticsearchQuery(ctx context.Context, request *schema.QueryRequest, state *types.State, index string, configuration *types.Configuration) (map[string]interface{}, error) {
// Set the user configured default result size in ctx
ctx = context.WithValue(ctx, elasticsearch.DEFAULT_RESULT_SIZE_KEY, elasticsearch.GetDefaultResultSize())

Expand Down Expand Up @@ -155,7 +155,7 @@ func prepareElasticsearchQuery(ctx context.Context, request *schema.QueryRequest
span.AddEvent("prepare_sort_query")
// Order by
if request.Query.OrderBy != nil && len(request.Query.OrderBy.Elements) != 0 {
sort, err := prepareSortQuery(request.Query.OrderBy, state, index)
sort, err := prepareSortQuery(request.Query.OrderBy, state, index, configuration)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit f0a094d

Please sign in to comment.