From 54313f2c7f57f27b70a4fda5266db549c89e8ca1 Mon Sep 17 00:00:00 2001 From: Xenia Nisskhen Date: Thu, 18 Jul 2024 23:37:38 +0500 Subject: [PATCH] feat(linter): WSL linter on cmd|docs|expression|image_store|templating|support --- .golangci.yml | 2 +- checker/metrics/conversion/trigger_metrics.go | 2 ++ cmd/checker/config.go | 4 +++ cmd/checker/main.go | 9 ++++++- cmd/cli/cluster.go | 4 +++ cmd/cli/extra.go | 6 +++++ cmd/cli/from_2.3_to_2.4.go | 5 ++++ cmd/cli/from_2.6_to_2.7.go | 4 +++ cmd/cli/from_2.7_to_2.8.go | 4 +++ cmd/cli/from_2.9_to_2.10.go | 4 +++ cmd/cli/main.go | 25 +++++++++++++++++++ cmd/cli/triggers.go | 7 ++++++ cmd/cli/user.go | 4 +++ cmd/config.go | 5 ++++ cmd/filter/main.go | 4 +++ cmd/image_store.go | 2 ++ cmd/notifier/config.go | 8 ++++++ cmd/notifier/main.go | 2 ++ cmd/source_provider.go | 4 +++ cmd/telemetry.go | 10 ++++++++ expression/expression.go | 9 +++++++ filter/matched_metrics/metrics.go | 4 +++ filter/metrics_parser.go | 3 +++ image_store/s3/init.go | 8 ++++-- image_store/s3/store.go | 3 +++ index/metrics.go | 2 ++ senders/read_image_store_config.go | 2 ++ support/trigger.go | 23 +++++++++++++++++ templating/templating.go | 2 ++ 29 files changed, 167 insertions(+), 4 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 860bebb3c..732a96cd2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -153,7 +153,7 @@ issues: exclude-files: - notifier/registrator.go exclude-rules: - - path-except: 'api/*/*' + - path-except: 'api|clock|cmd|docs|expression|image_store|templating|support/*/*' linters: - wsl - path: _test\.go diff --git a/checker/metrics/conversion/trigger_metrics.go b/checker/metrics/conversion/trigger_metrics.go index 80a1eb450..f4b21c6a8 100644 --- a/checker/metrics/conversion/trigger_metrics.go +++ b/checker/metrics/conversion/trigger_metrics.go @@ -202,11 +202,13 @@ func (m TriggerMetrics) FindMissingMetrics(declaredAloneMetrics set[string]) map if declaredAloneMetrics.contains(targetName) { continue } + diff := metricsSet.diff(fullMetrics) if len(diff) > 0 { result[targetName] = diff } } + return result } diff --git a/cmd/checker/config.go b/cmd/checker/config.go index 53430a4a4..a99c2a610 100644 --- a/cmd/checker/config.go +++ b/cmd/checker/config.go @@ -64,6 +64,7 @@ func (config *config) getSettings(logger moira.Logger) *checker.Config { for _, v := range config.Checker.SetLogLevel.TriggersToLevel { logTriggersToLevel[v.ID] = v.Level } + logger.Info(). Int("number_of_triggers", len(logTriggersToLevel)). Msg("Found dynamic log rules in config for some triggers") @@ -81,6 +82,7 @@ func (config *config) getSettings(logger moira.Logger) *checker.Config { String("cluster_id", "default"). Msg("MaxParallelChecks is not configured, set it to the number of CPU") } + sourceCheckConfigs[moira.DefaultLocalCluster] = localCheckConfig for _, remote := range config.Remotes.Graphite { @@ -95,6 +97,7 @@ func (config *config) getSettings(logger moira.Logger) *checker.Config { String("cluster_id", remote.ClusterID.String()). Msg("MaxParallelChecks is not configured, set it to the number of CPU") } + sourceCheckConfigs[moira.MakeClusterKey(moira.GraphiteRemote, remote.ClusterID)] = checkConfig } @@ -110,6 +113,7 @@ func (config *config) getSettings(logger moira.Logger) *checker.Config { String("cluster_id", remote.ClusterID.String()). Msg("MaxParallelChecks is not configured, set it to the number of CPU") } + sourceCheckConfigs[moira.MakeClusterKey(moira.PrometheusRemote, remote.ClusterID)] = checkConfig } diff --git a/cmd/checker/main.go b/cmd/checker/main.go index bcdb88cf6..0c27e92cf 100644 --- a/cmd/checker/main.go +++ b/cmd/checker/main.go @@ -40,6 +40,7 @@ var ( func main() { flag.Parse() + if *printVersion { fmt.Println("Moira Checker") fmt.Println("Version:", MoiraVersion) @@ -107,12 +108,14 @@ func main() { LazyTriggersCache: cache.New(time.Minute*10, time.Minute*60), //nolint PatternCache: cache.New(cacheExpiration, time.Minute*60), //nolint } + err = checkerWorkerManager.StartWorkers() if err != nil { logger.Fatal(). Error(err). Msg("Failed to start worker check") } + defer stopChecker(checkerWorkerManager) logger.Info(). @@ -129,20 +132,23 @@ func main() { } func checkSingleTrigger(database moira.Database, metrics *metrics.CheckerMetrics, settings *checker.Config, sourceProvider *metricSource.SourceProvider) { - triggerChecker, err := checker.MakeTriggerChecker(*triggerID, database, logger, settings, sourceProvider, metrics) logger.String(moira.LogFieldNameTriggerID, *triggerID) + + triggerChecker, err := checker.MakeTriggerChecker(*triggerID, database, logger, settings, sourceProvider, metrics) if err != nil { logger.Error(). Error(err). Msg("Failed initialize trigger checker") os.Exit(1) } + if err = triggerChecker.Check(); err != nil { logger.Error(). Error(err). Msg("Failed check trigger") os.Exit(1) } + os.Exit(0) } @@ -159,5 +165,6 @@ func clusterKeyList(provider *metricSource.SourceProvider) []moira.ClusterKey { for ck := range provider.GetAllSources() { keys = append(keys, ck) } + return keys } diff --git a/cmd/cli/cluster.go b/cmd/cli/cluster.go index efabf399a..d3aeceda6 100644 --- a/cmd/cli/cluster.go +++ b/cmd/cli/cluster.go @@ -25,11 +25,13 @@ func renameKey(database moira.Database, oldValue, newValue string) error { case *redis.DbConnector: pipe := d.Client().TxPipeline() iter := d.Client().Scan(d.Context(), 0, oldValue, 0).Iterator() + for iter.Next(d.Context()) { oldKey := iter.Val() newKey := strings.Replace(iter.Val(), oldValue, newValue, 1) pipe.Rename(d.Context(), oldKey, newKey) } + _, err := pipe.Exec(d.Context()) if err != nil { return err @@ -46,11 +48,13 @@ func changeKeysPrefix(database moira.Database, oldPrefix string, newPrefix strin case *redis.DbConnector: pipe := d.Client().TxPipeline() iter := d.Client().Scan(d.Context(), 0, oldPrefix+"*", 0).Iterator() + for iter.Next(d.Context()) { oldKey := iter.Val() newKey := strings.Replace(iter.Val(), oldPrefix, newPrefix, 1) pipe.Rename(d.Context(), oldKey, newKey) } + _, err := pipe.Exec(d.Context()) if err != nil { return err diff --git a/cmd/cli/extra.go b/cmd/cli/extra.go index 3e64075ad..f8eea81f6 100644 --- a/cmd/cli/extra.go +++ b/cmd/cli/extra.go @@ -11,25 +11,31 @@ func enablePlottingInAllSubscriptions(logger moira.Logger, database moira.Databa if err != nil { return err } + allSubscriptions, err := database.GetTagsSubscriptions(allTags) if err != nil { return err } + for _, subscription := range allSubscriptions { if subscription == nil { continue } + subscription.Plotting = moira.PlottingData{ Enabled: true, Theme: "light", } + if err := database.SaveSubscription(subscription); err != nil { return err } + logger.Debug(). String("subscription_id", subscription.ID). String("contacts", strings.Join(subscription.Contacts, ", ")). Msg("Successfully enabled plotting") } + return nil } diff --git a/cmd/cli/from_2.3_to_2.4.go b/cmd/cli/from_2.3_to_2.4.go index 4377c97ff..759e1c17a 100644 --- a/cmd/cli/from_2.3_to_2.4.go +++ b/cmd/cli/from_2.3_to_2.4.go @@ -6,11 +6,13 @@ func updateFrom23(logger moira.Logger, dataBase moira.Database) error { logger.Info().Msg("Update 2.3 -> 2.4 start") logger.Info().Msg("Start marking unused triggers") + if err := resaveTriggers(dataBase); err != nil { return err } logger.Info().Msg("Update 2.3 -> 2.4 finish") + return nil } @@ -23,10 +25,12 @@ func resaveTriggers(database moira.Database) error { if err != nil { return err } + allTriggers, err := database.GetTriggers(allTriggerIDs) if err != nil { return err } + for _, trigger := range allTriggers { if trigger != nil { if err = database.SaveTrigger(trigger.ID, trigger); err != nil { @@ -34,5 +38,6 @@ func resaveTriggers(database moira.Database) error { } } } + return nil } diff --git a/cmd/cli/from_2.6_to_2.7.go b/cmd/cli/from_2.6_to_2.7.go index 93ab90ffb..f65864abb 100644 --- a/cmd/cli/from_2.6_to_2.7.go +++ b/cmd/cli/from_2.6_to_2.7.go @@ -6,11 +6,13 @@ func updateFrom26(logger moira.Logger, dataBase moira.Database) error { logger.Info().Msg("Update 2.6 -> 2.7 was started") logger.Info().Msg("Adding Redis Cluster support was started") + if err := addRedisClusterSupport(logger, dataBase); err != nil { return err } logger.Info().Msg("Update 2.6 -> 2.7 was finished") + return nil } @@ -18,10 +20,12 @@ func downgradeTo26(logger moira.Logger, dataBase moira.Database) error { logger.Info().Msg("Downgrade 2.7 -> 2.6 started") logger.Info().Msg("Removing Redis Cluster support was started") + if err := removeRedisClusterSupport(logger, dataBase); err != nil { return err } logger.Info().Msg("Downgrade 2.7 -> 2.6 was finished") + return nil } diff --git a/cmd/cli/from_2.7_to_2.8.go b/cmd/cli/from_2.7_to_2.8.go index 61cc02fdb..83161b1f5 100644 --- a/cmd/cli/from_2.7_to_2.8.go +++ b/cmd/cli/from_2.7_to_2.8.go @@ -10,6 +10,7 @@ func updateFrom27(logger moira.Logger, dataBase moira.Database) error { logger.Info().Msg("Update 2.7 -> 2.8 was started") logger.Info().Msg("Rename keys was started") + if err := updateSubscriptionKeyForAnonymous(logger, dataBase); err != nil { return fmt.Errorf("failed updateSubscriptionKeyForAnonymous, has error %w", err) } @@ -19,6 +20,7 @@ func updateFrom27(logger moira.Logger, dataBase moira.Database) error { } logger.Info().Msg("Update 2.7 -> 2.8 was finished") + return nil } @@ -26,6 +28,7 @@ func downgradeTo27(logger moira.Logger, dataBase moira.Database) error { logger.Info().Msg("Downgrade 2.8 -> 2.7 started") logger.Info().Msg("Rename keys was started") + if err := downgradeSubscriptionKeyForAnonymous(logger, dataBase); err != nil { return err } @@ -35,6 +38,7 @@ func downgradeTo27(logger moira.Logger, dataBase moira.Database) error { } logger.Info().Msg("Downgrade 2.8 -> 2.7 was finished") + return nil } diff --git a/cmd/cli/from_2.9_to_2.10.go b/cmd/cli/from_2.9_to_2.10.go index 5ea470070..b8fa84166 100644 --- a/cmd/cli/from_2.9_to_2.10.go +++ b/cmd/cli/from_2.9_to_2.10.go @@ -11,12 +11,14 @@ func updateFrom29(logger moira.Logger, database moira.Database) error { logger.Info().Msg("Update 2.9 -> 2.10 was started") ctx := context.Background() + err := createKeyForLocalTriggers(ctx, logger, database) if err != nil { return err } logger.Info().Msg("Update 2.9 -> 2.10 was finished") + return nil } @@ -24,12 +26,14 @@ func downgradeTo29(logger moira.Logger, database moira.Database) error { logger.Info().Msg("Downgrade 2.10 -> 2.9 started") ctx := context.Background() + err := revertCreateKeyForLocalTriggers(ctx, logger, database) if err != nil { return err } logger.Info().Msg("Downgrade 2.10 -> 2.9 was finished") + return nil } diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 3ff7d92a1..ce658376f 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -196,6 +196,7 @@ func main() { //nolint Error(err). Msg("Failed to remove metrics by prefix") } + log.Info(). String("prefix", *removeMetricsByPrefix). Msg("Removing metrics by prefix finished") @@ -204,11 +205,13 @@ func main() { //nolint if *removeAllMetrics { log := logger.String(moira.LogFieldNameContext, "cleanup") log.Info().Msg("Removing all metrics started") + if err := handleRemoveAllMetrics(database); err != nil { log.Error(). Error(err). Msg("Failed to remove all metrics") } + log.Info().Msg("Removing all metrics finished") } @@ -233,6 +236,7 @@ func main() { //nolint if *removeUnusedTriggersWithTTL != "" { log := logger.String(moira.LogFieldNameContext, "remove-unused-triggers-with-ttl") ttl := int64(to.Duration(*removeUnusedTriggersWithTTL).Seconds()) + if err := handleRemoveUnusedTriggersWithTTL(logger, database, ttl); err != nil { log.Error(). Error(err). @@ -307,6 +311,7 @@ func main() { //nolint log := logger.String(moira.LogFieldNameContext, "cleanup-last-checks") log.Info().Msg("Cleanup abandoned triggers last checks started") + err := handleCleanUpAbandonedTriggerLastCheck(database) if err != nil { log.Error(). @@ -320,12 +325,14 @@ func main() { //nolint if *cleanupTags { log := logger.String(moira.LogFieldNameContext, "cleanup-tags") log.Info().Msg("Cleanup abandoned tags started") + count, err := handleCleanUpAbandonedTags(database) if err != nil { log.Error(). Error(err). Msg("Failed to cleanup abandoned tags") } + log.Info(). Int("abandoned_tags_deleted", count). Msg("Cleanup abandoned tags finished") @@ -335,26 +342,31 @@ func main() { //nolint log := logger.String(moira.LogFieldNameContext, "cleanup-retentions") log.Info().Msg("Cleanup of abandoned retentions started") + err := handleCleanUpAbandonedRetentions(database) if err != nil { log.Error(). Error(err). Msg("Failed to cleanup abandoned retentions") } + log.Info().Msg("Cleanup of abandoned retentions finished") } if *pushTriggerDump { logger.Info().Msg("Dump push started") + f, err := openFile(*triggerDumpFile, os.O_RDONLY) if err != nil { logger.Fatal(). Error(err). Msg("Failed to open triggerDumpFile") } + defer closeFile(f, logger) dump := &dto.TriggerDump{} + err = json.NewDecoder(f).Decode(dump) if err != nil { logger.Fatal(). @@ -363,16 +375,19 @@ func main() { //nolint } logger.Info().Msg(GetDumpBriefInfo(dump)) + if err := support.HandlePushTrigger(logger, database, &dump.Trigger); err != nil { logger.Fatal(). Error(err). Msg("Failed to handle push trigger") } + if err := support.HandlePushTriggerMetrics(logger, database, dump.Trigger.ID, dump.Metrics); err != nil { logger.Fatal(). Error(err). Msg("Failed to handle push trigger metrics") } + if err := support.HandlePushTriggerLastCheck( logger, database, @@ -384,11 +399,13 @@ func main() { //nolint Error(err). Msg("Failed to handle push trigger last check") } + logger.Info().Msg("Dump was pushed") } if *removeSubscriptions != "" { logger.Info().Msg("Start deletion of subscriptions") + subscriptionIDs := strings.Split(*removeSubscriptions, ";") deleted := 0 @@ -402,8 +419,10 @@ func main() { //nolint Error(err). String("subscription_id", subscriptionID). Msg("Failed to remove subscription") + continue } + deleted++ } @@ -424,6 +443,7 @@ func GetDumpBriefInfo(dump *dto.TriggerDump) string { func initApp() (cleanupConfig, moira.Logger, moira.Database) { flag.Parse() + if *printVersion { fmt.Println("Moira - alerting system based on graphite or prometheus data") fmt.Println("Version:", MoiraVersion) @@ -451,6 +471,7 @@ func initApp() (cleanupConfig, moira.Logger, moira.Database) { databaseSettings := config.Redis.GetSettings() dataBase := redis.NewDatabase(logger, databaseSettings, redis.NotificationHistoryConfig{}, redis.NotificationConfig{}, redis.Cli) + return config.Cleanup, logger, dataBase } @@ -467,6 +488,7 @@ func checkValidVersion(logger moira.Logger, updateFromVersion *string, isUpdate String("your_version", *updateFromVersion). Msg("You must set valid flag") } + return moira.UseString(updateFromVersion) } @@ -476,6 +498,7 @@ func contains(s []string, e string) bool { return true } } + return false } @@ -483,10 +506,12 @@ func openFile(filePath string, mode int) (*os.File, error) { if filePath == "" { return nil, fmt.Errorf("file is not specified") } + file, err := os.OpenFile(filePath, mode, 0o666) //nolint:gofumpt if err != nil { return nil, fmt.Errorf("cannot open file: %w", err) } + return file, nil } diff --git a/cmd/cli/triggers.go b/cmd/cli/triggers.go index 60efc730b..d2a1e4801 100644 --- a/cmd/cli/triggers.go +++ b/cmd/cli/triggers.go @@ -24,10 +24,12 @@ func handleRemoveUnusedTriggersStartWith(logger moira.Logger, database moira.Dat if err != nil { return fmt.Errorf("can't get trigger IDs start with prefix %s: %w", prefix, err) } + unusedTriggers, err := database.GetUnusedTriggerIDs() if err != nil { return fmt.Errorf("can't get unused trigger IDs; err: %w", err) } + unusedTriggersMap := map[string]struct{}{} for _, id := range unusedTriggers { @@ -35,6 +37,7 @@ func handleRemoveUnusedTriggersStartWith(logger moira.Logger, database moira.Dat } triggersToDelete := make([]string, 0) + for _, id := range triggers { if _, ok := unusedTriggersMap[id]; ok { triggersToDelete = append(triggersToDelete, id) @@ -52,6 +55,7 @@ func handleRemoveUnusedTriggersWithTTL(logger moira.Logger, database moira.Datab triggersToDelete := make([]string, 0) nowInSec := time.Now().Unix() + for _, id := range unusedTriggers { unusedTrigger, err := database.GetTrigger(id) if err != nil { @@ -92,13 +96,16 @@ func deleteTriggers(logger moira.Logger, triggers []string, database moira.Datab Msg("Removing triggers start with has started") deletedTriggersCount := 0 + for _, id := range triggers { err := database.RemoveTrigger(id) if err != nil { return fmt.Errorf("can't remove trigger with id %s: %w", id, err) } + deletedTriggersCount++ } + logger.Info(). Int("deleted_triggers_count", len(triggers)). Interface("deleted_triggers", triggers). diff --git a/cmd/cli/user.go b/cmd/cli/user.go index b08235c70..0693350b0 100644 --- a/cmd/cli/user.go +++ b/cmd/cli/user.go @@ -41,6 +41,7 @@ func transferUserSubscriptionsAndContacts(database moira.Database, from, to stri } subscriptions := make([]*moira.SubscriptionData, 0, len(subscriptionsTmp)) + for _, subscription := range subscriptionsTmp { if subscription != nil { subscriptions = append(subscriptions, subscription) @@ -98,10 +99,12 @@ func usersCleanup(logger moira.Logger, database moira.Database, users []string, } usersMapLength := len(users) + len(config.Whitelist) + const usersMapMaxLength = 100000 if usersMapLength > usersMapMaxLength { return errors.New("users count is too large") } + usersMap := make(map[string]bool, usersMapLength) for _, user := range append(users, config.Whitelist...) { @@ -146,6 +149,7 @@ func usersCleanup(logger moira.Logger, database moira.Database, users []string, if err = deleteUser(database, user); err != nil { return err } + logger.Debug(). String("user", user). Msg("User was deleted") diff --git a/cmd/config.go b/cmd/config.go index 4cc775261..4418dd02b 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -180,6 +180,7 @@ func (remotes *RemotesConfig) Validate() error { if len(errs) == 0 { return nil } + return errors.Join(errs...) } @@ -187,6 +188,7 @@ func validateRemotes[T remoteCommon](remotes []T) []error { errs := make([]error, 0) keys := make(map[moira.ClusterID]int) + for _, remote := range remotes { common := remote.getRemoteCommon() if common.ClusterID == moira.ClusterNotSet { @@ -195,6 +197,7 @@ func validateRemotes[T remoteCommon](remotes []T) []error { ) errs = append(errs, err) } + keys[common.ClusterID]++ } @@ -302,10 +305,12 @@ func ReadConfig(configFileName string, config any) error { if err != nil { return fmt.Errorf("can't read file [%s] [%s]", configFileName, err.Error()) } + err = yaml.Unmarshal(configYaml, config) if err != nil { return fmt.Errorf("can't parse config file [%s] [%s]", configFileName, err.Error()) } + return nil } diff --git a/cmd/filter/main.go b/cmd/filter/main.go index 93ed637de..6cb68b171 100644 --- a/cmd/filter/main.go +++ b/cmd/filter/main.go @@ -40,6 +40,7 @@ var ( func main() { flag.Parse() + if *printVersion { fmt.Println("Moira Filter") fmt.Println("Version:", MoiraVersion) @@ -106,6 +107,7 @@ func main() { } filterPatternStorageCfg := config.Filter.PatternStorageCfg.toFilterPatternStorageConfig() + patternStorage, err := filter.NewPatternStorage(filterPatternStorageCfg, database, filterMetrics, logger, compatibility) if err != nil { logger.Fatal(). @@ -137,6 +139,7 @@ func main() { Error(err). Msg("Failed to start listening") } + lineChan := listener.Listen() patternMatcher := patterns.NewMatcher(logger, filterMetrics, patternStorage, to.Duration(config.Filter.DropMetricsTTL)) @@ -147,6 +150,7 @@ func main() { batchForcedSaveTimeout := to.Duration(config.Filter.BatchForcedSaveTimeout) metricsMatcher := matchedmetrics.NewMetricsMatcher(filterMetrics, logger, database, cacheStorage, cacheCapacity, batchForcedSaveTimeout) metricsMatcher.Start(metricsChan) + defer metricsMatcher.Wait() // First stop listener defer stopListener(listener) // Then waiting for metrics matcher handle all received events diff --git a/cmd/image_store.go b/cmd/image_store.go index 49a46c21f..5afbf3914 100644 --- a/cmd/image_store.go +++ b/cmd/image_store.go @@ -13,6 +13,7 @@ const ( // InitImageStores initializes the image storage provider with settings from the yaml config. func InitImageStores(imageStores ImageStoreConfig, logger moira.Logger) map[string]moira.ImageStore { var err error + imageStoreMap := make(map[string]moira.ImageStore) imageStore := &s3.ImageStore{} @@ -27,6 +28,7 @@ func InitImageStores(imageStores ImageStoreConfig, logger moira.Logger) map[stri Msg("Image store initialized") } } + imageStoreMap[s3ImageStore] = imageStore return imageStoreMap diff --git a/cmd/notifier/config.go b/cmd/notifier/config.go index a056a6fbb..1303d6998 100644 --- a/cmd/notifier/config.go +++ b/cmd/notifier/config.go @@ -141,6 +141,7 @@ func (config *notifierConfig) getSettings(logger moira.Logger) notifier.Config { String("timezone", config.Timezone). Error(err). Msg("Timezone load failed. Use UTC.") + location, _ = time.LoadLocation("UTC") } else { logger.Info(). @@ -166,12 +167,14 @@ func (config *notifierConfig) getSettings(logger moira.Logger) notifier.Config { if config.ReadBatchSize > 0 { readBatchSize = int64(config.ReadBatchSize) } + if config.ReadBatchSize <= 0 && int64(config.ReadBatchSize) != notifier.NotificationsLimitUnlimited { logger.Warning(). Int("read_batch_size", config.ReadBatchSize). Int64("notification_limit_unlimited", notifier.NotificationsLimitUnlimited). Msg("Current config's read_batch_size is invalid, value ignored") } + logger.Info(). Int64("read_batch_size", readBatchSize). Msg("Current read_batch_size") @@ -180,10 +183,13 @@ func (config *notifierConfig) getSettings(logger moira.Logger) notifier.Config { for _, v := range config.SetLogLevel.Contacts { contacts[v.ID] = v.Level } + subscriptions := map[string]string{} + for _, v := range config.SetLogLevel.Subscriptions { subscriptions[v.ID] = v.Level } + logger.Info(). Int("contacts_count", len(contacts)). Int("subscriptions_count", len(subscriptions)). @@ -208,10 +214,12 @@ func (config *notifierConfig) getSettings(logger moira.Logger) notifier.Config { func checkDateTimeFormat(format string) error { fallbackTime := time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC) + parsedTime, err := time.Parse(format, time.Now().Format(format)) if err != nil || parsedTime.Equal(fallbackTime) { return fmt.Errorf("could not parse date time format '%v', result: '%v', error: '%w'", format, parsedTime, err) } + return nil } diff --git a/cmd/notifier/main.go b/cmd/notifier/main.go index d03337430..5af866dcf 100644 --- a/cmd/notifier/main.go +++ b/cmd/notifier/main.go @@ -39,6 +39,7 @@ var ( func main() { flag.Parse() + if *printVersion { fmt.Println("Moira Notifier") fmt.Println("Version:", MoiraVersion) @@ -159,6 +160,7 @@ func main() { logger.Info(). String("moira_version", MoiraVersion). Msg("Moira Notifier Started") + ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) logger.Info().Msg(fmt.Sprint(<-ch)) diff --git a/cmd/source_provider.go b/cmd/source_provider.go index d5e8833ee..41d5b4c74 100644 --- a/cmd/source_provider.go +++ b/cmd/source_provider.go @@ -22,19 +22,23 @@ func InitMetricSources(remotes RemotesConfig, database moira.Database, logger mo for _, graphite := range remotes.Graphite { config := graphite.GetRemoteSourceSettings() + source, err := remote.Create(config) if err != nil { return nil, err } + provider.RegisterSource(moira.MakeClusterKey(moira.GraphiteRemote, graphite.ClusterID), source) } for _, prom := range remotes.Prometheus { config := prom.GetPrometheusSourceSettings() + source, err := prometheus.Create(config, logger) if err != nil { return nil, err } + provider.RegisterSource(moira.MakeClusterKey(moira.PrometheusRemote, prom.ClusterID), source) } diff --git a/cmd/telemetry.go b/cmd/telemetry.go index 072e6b277..3c3156073 100644 --- a/cmd/telemetry.go +++ b/cmd/telemetry.go @@ -28,12 +28,15 @@ func ConfigureTelemetry(logger moira.Logger, config TelemetryConfig, service str if err != nil { return nil, err } + prometheusRegistry := metrics.NewPrometheusRegistry() prometheusRegistryAdapter := metrics.NewPrometheusRegistryAdapter(prometheusRegistry, service) + stopServer, err := startTelemetryServer(logger, config.Listen, config.Pprof, prometheusRegistry) if err != nil { return nil, err } + return &Telemetry{Metrics: metrics.NewCompositeRegistry(graphiteRegistry, prometheusRegistryAdapter), stopFunc: stopServer}, nil } @@ -42,7 +45,9 @@ func startTelemetryServer(logger moira.Logger, listen string, pprofConfig Profil if err != nil { return nil, err } + serverMux := http.NewServeMux() + if pprofConfig.Enabled { serverMux.HandleFunc("/pprof/", pprof.Index) serverMux.HandleFunc("/pprof/cmdline", pprof.Cmdline) @@ -52,19 +57,24 @@ func startTelemetryServer(logger moira.Logger, listen string, pprofConfig Profil serverMux.HandleFunc("/pprof/heap", pprof.Handler("heap").ServeHTTP) serverMux.HandleFunc("/pprof/goroutine", pprof.Handler("goroutine").ServeHTTP) } + serverMux.Handle("/metrics", promhttp.InstrumentMetricHandler(prometheusRegistry, promhttp.HandlerFor(prometheusRegistry, promhttp.HandlerOpts{}))) server := &http.Server{Handler: serverMux} + go func() { server.Serve(listener) //nolint }() + stopServer := func() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) //nolint defer cancel() + if err := server.Shutdown(ctx); err != nil { logger.Error(). Error(err). Msg("Can't stop telemetry server correctly") } } + return stopServer, nil } diff --git a/expression/expression.go b/expression/expression.go index 57881ea92..f4cea637f 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -60,11 +60,13 @@ func (triggerExpression TriggerExpression) Get(name string) (any, error) { if triggerExpression.WarnValue == nil { return nil, fmt.Errorf("no value with name WARN_VALUE") } + return *triggerExpression.WarnValue, nil case "error_value": if triggerExpression.ErrorValue == nil { return nil, fmt.Errorf("no value with name ERROR_VALUE") } + return *triggerExpression.ErrorValue, nil case "t1": return triggerExpression.MainTargetValue, nil @@ -75,6 +77,7 @@ func (triggerExpression TriggerExpression) Get(name string) (any, error) { if !ok { return nil, fmt.Errorf("no value with name %s", name) } + return value, nil } } @@ -85,10 +88,12 @@ func (triggerExpression *TriggerExpression) Evaluate() (moira.State, error) { if err != nil { return "", ErrInvalidExpression{internalError: err} } + result, err := expr.Eval(triggerExpression) if err != nil { return "", ErrInvalidExpression{internalError: err} } + switch res := result.(type) { case moira.State: return res, nil @@ -103,6 +108,7 @@ func validateUserExpression(triggerExpression *TriggerExpression, userExpression return nil, fmt.Errorf("invalid variable value: %w", err) } } + return userExpression, nil } @@ -119,6 +125,7 @@ func getExpression(triggerExpression *TriggerExpression) (*govaluate.EvaluableEx return validateUserExpression(triggerExpression, userExpression) } + return getSimpleExpression(triggerExpression) } @@ -162,9 +169,11 @@ func getUserExpression(triggerExpression string) (*govaluate.EvaluableExpression if strings.Contains(err.Error(), "Undefined function") { return nil, fmt.Errorf("functions is forbidden") } + return nil, err } exprCache.Add(triggerExpression, expr, cache.NoExpiration) //nolint + return expr, nil } diff --git a/filter/matched_metrics/metrics.go b/filter/matched_metrics/metrics.go index 5de457ba6..d2d4cfc06 100644 --- a/filter/matched_metrics/metrics.go +++ b/filter/matched_metrics/metrics.go @@ -46,6 +46,7 @@ func NewMetricsMatcher( // Start process matched metrics from channel and save it in cache storage. func (matcher *MetricsMatcher) Start(matchedMetricsChan chan *moira.MatchedMetric) { matcher.waitGroup.Add(1) + go func() { defer matcher.waitGroup.Done() @@ -66,8 +67,11 @@ func (matcher *MetricsMatcher) receiveBatch(metrics <-chan *moira.MatchedMetric) go func() { defer close(batchedMetrics) + batchTimer := time.NewTimer(matcher.batchForcedSaveTimeout) + defer batchTimer.Stop() + for { batch := make(map[string]*moira.MatchedMetric, matcher.cacheCapacity) retry: diff --git a/filter/metrics_parser.go b/filter/metrics_parser.go index 0dc4b5e6b..cb7d4939c 100644 --- a/filter/metrics_parser.go +++ b/filter/metrics_parser.go @@ -120,16 +120,19 @@ func parseNameAndLabels(metricBytes []byte) (string, map[string]string, error) { name := moira.UnsafeBytesToString(nameBytes) labels := make(map[string]string) + for metricBytesScanner.HasNext() { labelBytes := metricBytesScanner.Next() labelBytesScanner := moira.NewBytesScanner(labelBytes, '=') var labelNameBytes, labelValueBytes []byte + if !labelBytesScanner.HasNext() { return "", nil, fmt.Errorf("too few equal-separated items: '%s'", labelBytes) } labelNameBytes = labelBytesScanner.Next() + if !labelBytesScanner.HasNext() { return "", nil, fmt.Errorf("too few equal-separated items: '%s'", labelBytes) } diff --git a/image_store/s3/init.go b/image_store/s3/init.go index bd8aeb777..67151a1df 100644 --- a/image_store/s3/init.go +++ b/image_store/s3/init.go @@ -25,14 +25,17 @@ func (imageStore *ImageStore) Init(config Config) error { if config.AccessKeyID == "" { return fmt.Errorf("access key id not found while configuring s3 image store") } + if config.AccessKey == "" { return fmt.Errorf("access key not found while configuring s3 image store") } + awsconfig.Credentials = credentials.NewStaticCredentials(config.AccessKeyID, config.AccessKey, "") if config.Region == "" { return fmt.Errorf("region not found while configuring s3 image store") } + awsconfig.Region = aws.String(config.Region) imageStore.bucket = config.Bucket @@ -41,13 +44,14 @@ func (imageStore *ImageStore) Init(config Config) error { } var err error - imageStore.sess, err = session.NewSession(awsconfig) - if err != nil { + if imageStore.sess, err = session.NewSession(awsconfig); err != nil { return fmt.Errorf("could not configure s3 session: %w", err) } + imageStore.uploader = s3manager.NewUploader(imageStore.sess) imageStore.enabled = true + return nil } diff --git a/image_store/s3/store.go b/image_store/s3/store.go index ab634f149..34f477c65 100644 --- a/image_store/s3/store.go +++ b/image_store/s3/store.go @@ -16,6 +16,7 @@ func (imageStore *ImageStore) StoreImage(image []byte) (string, error) { if err != nil { return "", fmt.Errorf("error while creating upload input: %w", err) } + result, err := imageStore.uploader.Upload(uploadInput) if err != nil { return "", fmt.Errorf("error while uploading to s3: %w", err) @@ -29,7 +30,9 @@ func (imageStore *ImageStore) buildUploadInput(image []byte) (*s3manager.UploadI if err != nil { return nil, fmt.Errorf("failed to generate plotUUID: %w", err) } + key := "moira-plots/" + plotUUID.String() + return &s3manager.UploadInput{ Bucket: aws.String(imageStore.bucket), Key: aws.String(key), diff --git a/index/metrics.go b/index/metrics.go index 89898625e..70b104c05 100644 --- a/index/metrics.go +++ b/index/metrics.go @@ -6,6 +6,7 @@ import ( func (index *Index) checkIndexedTriggersCount() error { checkTicker := time.NewTicker(time.Millisecond * 100) //nolint + for { select { case <-index.tomb.Dying(): @@ -20,6 +21,7 @@ func (index *Index) checkIndexedTriggersCount() error { func (index *Index) checkIndexActualizationLag() error { checkTicker := time.NewTicker(time.Millisecond * 100) //nolint + for { select { case <-index.tomb.Dying(): diff --git a/senders/read_image_store_config.go b/senders/read_image_store_config.go index 0f7cd331c..66ce33b1e 100644 --- a/senders/read_image_store_config.go +++ b/senders/read_image_store_config.go @@ -30,8 +30,10 @@ func ReadImageStoreConfig(senderSettings any, imageStores map[string]moira.Image if ok && imageStore.IsEnabled() { return imageStoreIDStr, imageStore, true } + logger.Warning(). String("image_store_id", imageStoreIDStr). Msg("Image store specified has not been configured") + return "", nil, false } diff --git a/support/trigger.go b/support/trigger.go index 0dba834f3..80cc44829 100644 --- a/support/trigger.go +++ b/support/trigger.go @@ -20,6 +20,7 @@ func HandlePullTrigger(logger moira.Logger, database moira.Database, triggerID s if err != nil { return nil, fmt.Errorf("cannot get trigger: %w", err) } + return &trigger, nil } @@ -34,55 +35,69 @@ func HandlePullTriggerMetrics(logger moira.Logger, database moira.Database, trig if err != nil { return nil, fmt.Errorf("cannot get trigger: %w", err) } + ttl := database.GetMetricsTTLSeconds() until := time.Now().Unix() from := until - ttl result := []dto.PatternMetrics{} + for _, target := range trigger.Targets { fetchResult, errFetch := source.Fetch(target, from, until, trigger.IsSimple()) if errFetch != nil { return nil, fmt.Errorf("cannot fetch metrics for target %s: %w", target, errFetch) } + patterns, errPatterns := fetchResult.GetPatterns() + if errPatterns != nil { return nil, fmt.Errorf("cannot get patterns for target %s: %w", target, errPatterns) } + for _, pattern := range patterns { patternResult := dto.PatternMetrics{ Pattern: pattern, Retentions: make(map[string]int64), } + metrics, errMetrics := database.GetPatternMetrics(pattern) if errMetrics != nil { return nil, fmt.Errorf("cannot get metrics for pattern %s, target %s: %w", pattern, target, errMetrics) } + for _, metric := range metrics { retention, errRetention := database.GetMetricRetention(metric) if errRetention != nil { return nil, fmt.Errorf("cannot get metric %s retention: %w", metric, errRetention) } + patternResult.Retentions[metric] = retention } + values, errValues := database.GetMetricsValues(metrics, from, until) if errValues != nil { return nil, fmt.Errorf("cannot get values for pattern %s metrics, target %s: %w", pattern, target, errValues) } + patternResult.Metrics = values result = append(result, patternResult) } } + return result, nil } func HandlePushTrigger(logger moira.Logger, database moira.Database, trigger *moira.Trigger) error { logger.Info().Msg("Save trigger") + err := database.SaveTrigger(trigger.ID, trigger) if err != nil { return fmt.Errorf("cannot save trigger: %w", err) } + logger.Info(). String("trigger_id", trigger.ID). Msg("Trigger was saved") + return nil } @@ -96,14 +111,17 @@ func HandlePushTriggerMetrics( buffer := make(map[string]*moira.MatchedMetric, len(patternsMetrics)) i := 0 + for _, patternMetrics := range patternsMetrics { for metricName, metricValues := range patternMetrics.Metrics { for _, metricValue := range metricValues { i++ + retention, ok := patternMetrics.Retentions[metricName] if !ok { retention = defaultRetention } + matchedMetric := moira.MatchedMetric{ Patterns: []string{ patternMetrics.Pattern, @@ -118,10 +136,12 @@ func HandlePushTriggerMetrics( } } } + err := database.SaveMetrics(buffer) if err != nil { return fmt.Errorf("cannot save trigger metrics: %w", err) } + logger.Info(). String("trigger_id", triggerID). Msg("Trigger metrics was saved") @@ -137,9 +157,12 @@ func HandlePushTriggerLastCheck( clusterKey moira.ClusterKey, ) error { logger.Info().Msg("Save trigger last check") + if err := database.SetTriggerLastCheck(triggerID, lastCheck, clusterKey); err != nil { return fmt.Errorf("cannot set trigger last check: %w", err) } + logger.Info().Msg("Trigger last check was saved") + return nil } diff --git a/templating/templating.go b/templating/templating.go index efbe866c6..b5b6d24fe 100644 --- a/templating/templating.go +++ b/templating/templating.go @@ -22,11 +22,13 @@ func formatDate(unixTime int64, format string) string { func filterKeys(source template.FuncMap, keys []string) template.FuncMap { result := template.FuncMap{} + for _, key := range keys { if value, ok := source[key]; ok { result[key] = value } } + return result }