Skip to content

Commit

Permalink
Add leader election to Functions processing (#480)
Browse files Browse the repository at this point in the history
* Add "suspended" and "observedGeneration" to the funtions CRD

The suspended property enables an operator to suspend processing of a CRD during some investigative period.

The ObservedGeneration property enables us to remain stateless with respect to our processing model. So instead of having to maintain a cache so that we can determine when a CRD has been updated, we can remove the cache and know if a CRD requires reconciling by comparing its ObservedGeneration to the current Generation.

* Delegate Function CRD handling to tasks

There's no need to be reconciling Functions in more than one place. We already periodically reconcile functions in our task handler, so there's no need to do so in the uploader, too.

* Combine the functionality of storage and crd

There is an extra bit of unecessary abstraction that complicates our Function handling where our storage package is acting as a glue between the crd and adx packages. By combining the storage and crd package it's far easier to understand the code's flow. Furthermore, the code that lives in pkg is generally meant to be shared by more than one other package, so until that's necessary, having a crd package doesn't fit the intent of pkg.

* Integrate new function store to task management

Several unit tests were removed for this commit due to the shift of responsibility with respect to ObservedGeneration and LastReconcileTime, which was moved from task management to the function store layer. The reason for moving the responsibility for these two fields is that it's a bit of an implementation leak for the kusto layer to have to understand how and why the ObservedGeneration and LastReconciledTime are utilized.

* Integrate the new function store

* Remove unused CRD tool

* Drop KQL Functions upon CRD deletion
  • Loading branch information
jessejlt authored Dec 17, 2024
1 parent 621658c commit 2f25b09
Show file tree
Hide file tree
Showing 19 changed files with 956 additions and 700 deletions.
28 changes: 10 additions & 18 deletions api/v1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package v1

import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -30,20 +28,15 @@ type FunctionSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

// To protect against specifying arbitrary KQL, we'll only allow
// specification of the Function's body, we'll generate the scaffold
// for `.create-or-alter function` statement
// This flag tells the controller to suspend subsequent executions, it does
// not apply to already started executions. Defaults to false.
// +optional
Suspend *bool `json:"suspend,omitempty"`

// Database is the name of the database in which the function will be created
Database string `json:"database"`
// Body is the KQL body of the function
Body string `json:"body"`

// TODO we need to know if a function is a view and we also need to know the function's name.
// We'll accomplish both by writing a parser and later a validator for the body of the function.
// For now, we'll just assume that the function name is the same as the name of the Function resource
// and if that name is the same as the table's name, then we'll assume it's a view. This is only a
// temporary measure until we can write a parser and validator for the body of the function.
}

// FunctionStatusEnum defines the possible status values for FunctionStatus
Expand All @@ -57,19 +50,18 @@ const (

// FunctionStatus defines the observed state of Function
type FunctionStatus struct {
// ObservedGeneration is the most recent generation observed for this Function
ObservedGeneration int64 `json:"observedGeneration"`
// LastTimeReconciled is the last time the Function was reconciled
LastTimeReconciled metav1.Time `json:"lastTimeReconciled"`
// Message is a human-readable message indicating details about the Function
Message string `json:"message,omitempty"`
// Error is a string that communicates any error message if one exists
Error string `json:"error,omitempty"`
// Reason is a string that communicates the reason for a transition
Reason string `json:"reason,omitempty"`
// Status is an enum that represents the status of the Function
Status FunctionStatusEnum `json:"status"`
}

func (fs FunctionStatus) String() string {
return fmt.Sprintf("LastTimeReconciled: %s, Message: %s, Error: %s, Status: %s",
fs.LastTimeReconciled, fs.Message, fs.Error, fs.Status)
// Error is a string that communicates any error message if one exists
Error string `json:"error,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
23 changes: 4 additions & 19 deletions cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import (
v1 "github.com/Azure/adx-mon/api/v1"
"github.com/Azure/adx-mon/ingestor"
"github.com/Azure/adx-mon/ingestor/adx"
"github.com/Azure/adx-mon/ingestor/storage"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/crd"
"github.com/Azure/adx-mon/pkg/limiter"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/tls"
Expand Down Expand Up @@ -272,18 +270,6 @@ func realMain(ctx *cli.Context) error {
dropMetrics = append(dropMetrics, metricRegex)
}

kqlFunctionsStore := storage.NewFunctions(ctrlCli)
kqlCRDOpts := crd.Options{
CtrlCli: ctrlCli,
List: &v1.FunctionList{},
Store: kqlFunctionsStore,
}
kqlFnOperator := crd.New(kqlCRDOpts)
if err := kqlFnOperator.Open(svcCtx); err != nil {
logger.Fatalf("Failed to start kql function operator: %s", err)
}
defer kqlFnOperator.Close()

var (
allowedDatabases []string
metricsUploaders, logsUploaders []adx.Uploader
Expand All @@ -294,7 +280,7 @@ func realMain(ctx *cli.Context) error {
if len(metricsKusto) > 0 {
metricsUploaders, metricsDatabases, err = newUploaders(
metricsKusto, storageDir, concurrentUploads,
defaultMapping, adx.PromMetrics, kqlFunctionsStore)
defaultMapping, adx.PromMetrics)
if err != nil {
logger.Fatalf("Failed to create uploader: %s", err)
}
Expand All @@ -309,7 +295,7 @@ func realMain(ctx *cli.Context) error {
if len(logsKusto) > 0 {
logsUploaders, logsDatabases, err = newUploaders(
logsKusto, storageDir, concurrentUploads,
schema.DefaultLogsMapping, adx.OTLPLogs, kqlFunctionsStore)
schema.DefaultLogsMapping, adx.OTLPLogs)
if err != nil {
logger.Fatalf("Failed to create uploaders for OTLP logs: %s", err)
}
Expand Down Expand Up @@ -341,6 +327,7 @@ func realMain(ctx *cli.Context) error {

svc, err := ingestor.NewService(ingestor.ServiceOpts{
K8sCli: k8scli,
K8sCtrlCli: ctrlCli,
LogsKustoCli: logsKustoCli,
MetricsKustoCli: metricsKustoCli,
MetricsDatabases: metricsDatabases,
Expand All @@ -363,7 +350,6 @@ func realMain(ctx *cli.Context) error {
LiftedColumns: sortedLiftedLabels,
DropLabels: dropLabels,
DropMetrics: dropMetrics,
FunctionStore: kqlFunctionsStore,
})
if err != nil {
logger.Fatalf("Failed to create service: %s", err)
Expand Down Expand Up @@ -520,7 +506,7 @@ func parseKustoEndpoint(kustoEndpoint string) (string, string, error) {
}

func newUploaders(endpoints []string, storageDir string, concurrentUploads int,
defaultMapping schema.SchemaMapping, sampleType adx.SampleType, fnStore *storage.Functions) ([]adx.Uploader, []string, error) {
defaultMapping schema.SchemaMapping, sampleType adx.SampleType) ([]adx.Uploader, []string, error) {

var uploaders []adx.Uploader
var uploadDatabaseNames []string
Expand All @@ -540,7 +526,6 @@ func newUploaders(endpoints []string, storageDir string, concurrentUploads int,
ConcurrentUploads: concurrentUploads,
DefaultMapping: defaultMapping,
SampleType: sampleType,
FnStore: fnStore,
}))

uploadDatabaseNames = append(uploadDatabaseNames, database)
Expand Down
58 changes: 1 addition & 57 deletions ingestor/adx/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ import (
"sync"
"time"

v1 "github.com/Azure/adx-mon/api/v1"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/schema"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/kql"
"github.com/Azure/azure-kusto-go/kusto/unsafe"
"github.com/cespare/xxhash"
)
Expand All @@ -37,7 +34,6 @@ type mgmt interface {

type Syncer struct {
KustoCli mgmt
fnStore FunctionStore

database string

Expand All @@ -46,7 +42,6 @@ type Syncer struct {
st SampleType

tables map[string]struct{}
views map[string]*v1.Function

defaultMapping schema.SchemaMapping
cancelFn context.CancelFunc
Expand All @@ -65,17 +60,15 @@ type Table struct {
TableName string `kusto:"TableName"`
}

func NewSyncer(kustoCli mgmt, database string, defaultMapping schema.SchemaMapping, st SampleType, fns FunctionStore) *Syncer {
func NewSyncer(kustoCli mgmt, database string, defaultMapping schema.SchemaMapping, st SampleType) *Syncer {
return &Syncer{
KustoCli: kustoCli,
fnStore: fns,

database: database,
defaultMapping: defaultMapping,
mappings: make(map[string]schema.SchemaMapping),
st: st,
tables: make(map[string]struct{}),
views: make(map[string]*v1.Function),
}
}

Expand Down Expand Up @@ -272,55 +265,6 @@ func (s *Syncer) EnsureMapping(table string, mapping schema.SchemaMapping) (stri
return name, nil
}

// EnsureView will create or update a KQL View for the specified Table if one exists.
func (s *Syncer) EnsureView(ctx context.Context, table string) error {
view, ok := s.fnStore.View(s.database, table)
if !ok {
if logger.IsDebug() {
logger.Debugf("No view found for %s.%s", s.database, table)
}
return nil
}

s.mu.Lock()
defer s.mu.Unlock()

cached, ok := s.views[s.database+table]
if ok {
// Is our cached version out of date?
if cached.ResourceVersion != view.ResourceVersion {
// Invalidate cache
delete(s.views, s.database+table)
} else {
// Cache is valid, nothing to do
if logger.IsDebug() {
logger.Debugf("View %s.%s is up to date", s.database, view.Name)
}
return nil
}
}

stmt := kql.New("").AddUnsafe(view.Spec.Body)
if _, err := s.KustoCli.Mgmt(ctx, s.database, stmt); err != nil {
if !errors.Retry(err) {
updateKQLFunctionStatus(ctx, s.fnStore, view, v1.PermanentFailure, err)
logger.Errorf("Permanent failure to create view %s.%s: %v", s.database, table, err)
// We want to fall through here so that we can cache this object, there's no need
// to retry creating it. If it's updated, we'll detect the change in the cached
// object and try again after invalidating the cache.
} else {
updateKQLFunctionStatus(ctx, s.fnStore, view, v1.Failed, err)
logger.Warnf("Transient failure to create view %s.%s: %v", s.database, table, err)
return nil
}
}

updateKQLFunctionStatus(ctx, s.fnStore, view, v1.Success, nil)
s.views[s.database+table] = view

return nil
}

func (s *Syncer) ensureFunctions(ctx context.Context) error {
switch s.st {
case PromMetrics:
Expand Down
5 changes: 2 additions & 3 deletions ingestor/adx/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"testing"

"github.com/Azure/adx-mon/ingestor/storage"
"github.com/Azure/adx-mon/schema"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/stretchr/testify/require"
Expand All @@ -13,7 +12,7 @@ import (
func TestSyncer_EnsureMapping(t *testing.T) {
kcli := kusto.NewMockClient()

s := NewSyncer(kcli, "db", schema.SchemaMapping{}, PromMetrics, &storage.Functions{})
s := NewSyncer(kcli, "db", schema.SchemaMapping{}, PromMetrics)
name, err := s.EnsureDefaultMapping("Test")
require.NoError(t, err)
require.Equal(t, "Test_15745692345339290292", name)
Expand All @@ -24,7 +23,7 @@ func TestSyncer_EnsureTable(t *testing.T) {
expectedQuery: ".create-merge table ['Test'] ()",
}

s := NewSyncer(kcli, "db", schema.SchemaMapping{}, PromMetrics, &storage.Functions{})
s := NewSyncer(kcli, "db", schema.SchemaMapping{}, PromMetrics)
require.NoError(t, s.EnsureDefaultTable("Test"))
kcli.Verify(t)
}
Expand Down
59 changes: 23 additions & 36 deletions ingestor/adx/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
"fmt"
"io"
"sync"
"time"

v1 "github.com/Azure/adx-mon/api/v1"
"github.com/Azure/adx-mon/ingestor/storage"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/kql"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type TableDetail struct {
Expand Down Expand Up @@ -103,81 +102,69 @@ func (t *DropUnusedTablesTask) loadTableDetails(ctx context.Context) ([]TableDet
}
}

type FunctionStore interface {
Functions() []*v1.Function
View(database, table string) (*v1.Function, bool)
UpdateStatus(ctx context.Context, fn *v1.Function) error
}

type SyncFunctionsTask struct {
cache map[string]*v1.Function
mu sync.RWMutex

store FunctionStore
store storage.Functions
kustoCli StatementExecutor
}

func NewSyncFunctionsTask(store FunctionStore, kustoCli StatementExecutor) *SyncFunctionsTask {
func NewSyncFunctionsTask(store storage.Functions, kustoCli StatementExecutor) *SyncFunctionsTask {
return &SyncFunctionsTask{
cache: make(map[string]*v1.Function),
store: store,
kustoCli: kustoCli,
}
}

func (t *SyncFunctionsTask) Run(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()

functions := t.store.Functions()
functions, err := t.store.List(ctx)
if err != nil {
return fmt.Errorf("failed to list functions: %w", err)
}
for _, function := range functions {

if function.Spec.Database != t.kustoCli.Database() {
continue
}

cacheKey := function.Namespace + function.Name
if fn, ok := t.cache[cacheKey]; ok {
if function.Generation != fn.Generation {
// invalidate our cache
delete(t.cache, cacheKey)
} else {
// function is up to date
continue
if !function.DeletionTimestamp.IsZero() {
// Until we can parse KQL we don't actually know the function's
// name as described in this CRD; however, we'll make the assumption
// that the CRD name is the same as the function name in Kusto and
// attempt a delete.
stmt := kql.New(".drop function ").AddUnsafe(function.Name).AddLiteral(" ifexists")
if _, err := t.kustoCli.Mgmt(ctx, stmt); err != nil {
logger.Errorf("Failed to delete function %s.%s: %v", function.Spec.Database, function.Name, err)
// Deletion is best-effort, especially while we still can't parse KQL
}
t.updateKQLFunctionStatus(ctx, function, v1.Success, nil)
return nil
}

stmt := kql.New("").AddUnsafe(function.Spec.Body)
if _, err := t.kustoCli.Mgmt(ctx, stmt); err != nil {
if !errors.Retry(err) {
logger.Errorf("Permanent failure to create function %s.%s: %v", function.Spec.Database, function.Name, err)
if err = updateKQLFunctionStatus(ctx, t.store, function, v1.PermanentFailure, err); err != nil {
if err = t.updateKQLFunctionStatus(ctx, function, v1.PermanentFailure, err); err != nil {
logger.Errorf("Failed to update permanent failure status: %v", err)
} else {
// Cache this permanent failure. Only retry if the generation changes.
t.cache[cacheKey] = function
}
continue
} else {
updateKQLFunctionStatus(ctx, t.store, function, v1.Failed, err)
t.updateKQLFunctionStatus(ctx, function, v1.Failed, err)
logger.Warnf("Transient failure to create function %s.%s: %v", function.Spec.Database, function.Name, err)
continue
}
}

logger.Infof("Successfully created function %s.%s", function.Spec.Database, function.Name)
if err := updateKQLFunctionStatus(ctx, t.store, function, v1.Success, nil); err != nil {
if err := t.updateKQLFunctionStatus(ctx, function, v1.Success, nil); err != nil {
logger.Errorf("Failed to update success status: %v", err)
} else {
t.cache[cacheKey] = function
}
}

return nil
}

func updateKQLFunctionStatus(ctx context.Context, store FunctionStore, fn *v1.Function, status v1.FunctionStatusEnum, err error) error {
fn.Status.LastTimeReconciled = metav1.Time{Time: time.Now()}
func (t *SyncFunctionsTask) updateKQLFunctionStatus(ctx context.Context, fn *v1.Function, status v1.FunctionStatusEnum, err error) error {
fn.Status.Status = status
if err != nil {
errMsg := err.Error()
Expand All @@ -197,7 +184,7 @@ func updateKQLFunctionStatus(ctx context.Context, store FunctionStore, fn *v1.Fu
}
fn.Status.Error = errMsg
}
if err := store.UpdateStatus(ctx, fn); err != nil {
if err := t.store.UpdateStatus(ctx, fn); err != nil {
return fmt.Errorf("Failed to update status for function %s.%s: %w", fn.Spec.Database, fn.Name, err)
}
return nil
Expand Down
Loading

0 comments on commit 2f25b09

Please sign in to comment.