From cee460dfa7729df1b888c7ca8e4a48e2a22f64d9 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Fri, 17 Jan 2025 17:10:43 +0100 Subject: [PATCH] feat: implement subscription line adding on invoice creation (#2105) --- app/common/billing.go | 3 + app/common/openmeter_billingworker.go | 16 +++-- app/common/openmeter_server.go | 1 - cmd/billing-worker/wire_gen.go | 37 +++++++++- cmd/jobs/billing/advance/advance.go | 56 ++++++++++++++- cmd/server/wire_gen.go | 23 ++++--- openmeter/app/entity/base/app.go | 10 +-- openmeter/billing/adapter/validationissue.go | 4 +- openmeter/billing/events.go | 36 ++++++++++ openmeter/billing/profile.go | 4 +- openmeter/billing/service/invoice.go | 8 +++ openmeter/billing/service/invoiceline.go | 32 +++++++-- openmeter/billing/service/service.go | 8 +++ openmeter/billing/worker/subscription/sync.go | 69 +++++++++++++++---- .../billing/worker/subscription/sync_test.go | 7 +- openmeter/billing/worker/worker.go | 16 ++++- openmeter/customer/entity/customer.go | 2 +- openmeter/event/metadata/resourcepath.go | 1 + test/billing/suite.go | 2 + 19 files changed, 281 insertions(+), 54 deletions(-) create mode 100644 openmeter/billing/events.go diff --git a/app/common/billing.go b/app/common/billing.go index 85be84910..728882322 100644 --- a/app/common/billing.go +++ b/app/common/billing.go @@ -17,6 +17,7 @@ import ( "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" "github.com/openmeterio/openmeter/openmeter/streaming" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" ) var Billing = wire.NewSet( @@ -45,6 +46,7 @@ func BillingService( featureConnector feature.FeatureConnector, meterRepo meter.Repository, streamingConnector streaming.Connector, + eventPublisher eventbus.Publisher, ) (billing.Service, error) { if !billingConfig.Enabled { return nil, nil @@ -58,6 +60,7 @@ func BillingService( Logger: logger, MeterRepo: meterRepo, StreamingConnector: streamingConnector, + Publisher: eventPublisher, }) } diff --git a/app/common/openmeter_billingworker.go b/app/common/openmeter_billingworker.go index b9f073d92..4f61b91de 100644 --- a/app/common/openmeter_billingworker.go +++ b/app/common/openmeter_billingworker.go @@ -27,7 +27,9 @@ var BillingWorker = wire.NewSet( BillingWorkerProvisionTopics, BillingWorkerSubscriber, - NewFeatureConnector, + Subscription, + ProductCatalog, + Entitlement, BillingAdapter, BillingService, @@ -69,16 +71,18 @@ func NewBillingWorkerOptions( eventBus eventbus.Publisher, billingService billing.Service, billingAdapter billing.Adapter, + subscriptionServices SubscriptionServiceWithWorkflow, logger *slog.Logger, ) billingworker.WorkerOptions { return billingworker.WorkerOptions{ SystemEventsTopic: eventConfig.SystemEvents.Topic, - Router: routerOptions, - EventBus: eventBus, - BillingService: billingService, - BillingAdapter: billingAdapter, - Logger: logger, + Router: routerOptions, + EventBus: eventBus, + BillingService: billingService, + BillingAdapter: billingAdapter, + SubscriptionService: subscriptionServices.Service, + Logger: logger, } } diff --git a/app/common/openmeter_server.go b/app/common/openmeter_server.go index 48b127007..8d4344490 100644 --- a/app/common/openmeter_server.go +++ b/app/common/openmeter_server.go @@ -79,7 +79,6 @@ func NewIngestCollector( func NewServerPublisher( ctx context.Context, - conf config.EventsConfiguration, options watermillkafka.PublisherOptions, logger *slog.Logger, ) (message.Publisher, func(), error) { diff --git a/cmd/billing-worker/wire_gen.go b/cmd/billing-worker/wire_gen.go index bf2d59ffd..06f18911e 100644 --- a/cmd/billing-worker/wire_gen.go +++ b/cmd/billing-worker/wire_gen.go @@ -213,7 +213,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - billingService, err := common.BillingService(logger, client, service, appstripeService, adapter, billingConfiguration, customerService, featureConnector, inMemoryRepository, connector) + billingService, err := common.BillingService(logger, client, service, appstripeService, adapter, billingConfiguration, customerService, featureConnector, inMemoryRepository, connector, eventbusPublisher) if err != nil { cleanup6() cleanup5() @@ -223,7 +223,40 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - workerOptions := common.NewBillingWorkerOptions(eventsConfiguration, options, eventbusPublisher, billingService, adapter, logger) + productCatalogConfiguration := conf.ProductCatalog + entitlementsConfiguration := conf.Entitlements + entitlement := common.NewEntitlementRegistry(logger, client, entitlementsConfiguration, connector, inMemoryRepository, eventbusPublisher) + planService, err := common.NewPlanService(logger, client, productCatalogConfiguration, featureConnector) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + validator, err := common.BillingSubscriptionValidator(billingService, billingConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + subscriptionServiceWithWorkflow, err := common.NewSubscriptionServices(logger, client, productCatalogConfiguration, entitlementsConfiguration, featureConnector, entitlement, customerService, planService, eventbusPublisher, validator) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + workerOptions := common.NewBillingWorkerOptions(eventsConfiguration, options, eventbusPublisher, billingService, adapter, subscriptionServiceWithWorkflow, logger) worker, err := common.NewBillingWorker(workerOptions) if err != nil { cleanup6() diff --git a/cmd/jobs/billing/advance/advance.go b/cmd/jobs/billing/advance/advance.go index f612a41fc..44cfd4346 100644 --- a/cmd/jobs/billing/advance/advance.go +++ b/cmd/jobs/billing/advance/advance.go @@ -6,6 +6,7 @@ import ( "log/slog" "github.com/spf13/cobra" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "github.com/openmeterio/openmeter/app/common" appconfig "github.com/openmeterio/openmeter/app/config" @@ -14,6 +15,7 @@ import ( billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter" billingservice "github.com/openmeterio/openmeter/openmeter/billing/service" billingworkerautoadvance "github.com/openmeterio/openmeter/openmeter/billing/worker/advance" + "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver" "github.com/openmeterio/openmeter/pkg/framework/pgdriver" ) @@ -33,7 +35,19 @@ func init() { Cmd.PersistentFlags().StringVar(&namespace, "namespace", "", "namespace the operation should be performed") } -func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger *slog.Logger) (*billingworkerautoadvance.AutoAdvancer, error) { +type autoAdvancer struct { + *billingworkerautoadvance.AutoAdvancer + + Shutdown func() +} + +func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger *slog.Logger) (*autoAdvancer, error) { + commonMetadata := common.NewMetadata(conf, "0.0.0", "billing-advancer") + + // We use a noop meter provider as we don't want to monitor cronjobs (for now) + meterProvider := sdkmetric.NewMeterProvider() + meter := meterProvider.Meter("billing-advancer") + // Initialize Postgres driver postgresDriver, err := pgdriver.NewPostgresDriver(ctx, conf.Postgres.URL) if err != nil { @@ -87,6 +101,34 @@ func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger * return nil, fmt.Errorf("failed to initialize streaming connection: %w", err) } + brokerOptions := common.NewBrokerConfiguration(conf.Ingest.Kafka.KafkaConfiguration, conf.Telemetry.Log, commonMetadata, logger, meter) + + adminClient, err := common.NewKafkaAdminClient(conf.Ingest.Kafka.KafkaConfiguration) + if err != nil { + return nil, fmt.Errorf("failed to initialize kafka admin client: %w", err) + } + + kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, conf.Ingest.Kafka.TopicProvisionerConfig) + + topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig) + if err != nil { + return nil, fmt.Errorf("failed to initialize kafka topic provisioner: %w", err) + } + + publisher, serverPublisherShutdown, err := common.NewServerPublisher(ctx, kafka.PublisherOptions{ + Broker: brokerOptions, + ProvisionTopics: common.ServerProvisionTopics(conf.Events), + TopicProvisioner: topicProvisioner, + }, logger) + if err != nil { + return nil, fmt.Errorf("failed to initialize server publisher: %w", err) + } + + ebPublisher, err := common.NewEventBusPublisher(publisher, conf.Events, logger) + if err != nil { + return nil, fmt.Errorf("failed to initialize event bus publisher: %w", err) + } + billingAdapter, err := billingadapter.New(billingadapter.Config{ Client: entPostgresDriver.Client(), Logger: logger, @@ -103,6 +145,7 @@ func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger * FeatureService: featureService, MeterRepo: meterRepository, StreamingConnector: streamingConnector, + Publisher: ebPublisher, }) if err != nil { return nil, fmt.Errorf("failed to initialize billing service: %w", err) @@ -116,7 +159,10 @@ func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger * return nil, fmt.Errorf("failed to initialize billing auto-advancer: %w", err) } - return a, nil + return &autoAdvancer{ + AutoAdvancer: a, + Shutdown: serverPublisherShutdown, + }, nil } var ListCmd = func() *cobra.Command { @@ -136,6 +182,8 @@ var ListCmd = func() *cobra.Command { return err } + defer a.Shutdown() + var ns []string if namespace != "" { ns = append(ns, namespace) @@ -175,6 +223,8 @@ var InvoiceCmd = func() *cobra.Command { return err } + defer a.Shutdown() + if namespace == "" { return fmt.Errorf("invoice namespace is required") } @@ -215,6 +265,8 @@ var AllCmd = func() *cobra.Command { return err } + defer a.Shutdown() + var ns []string if namespace != "" { ns = append(ns, namespace) diff --git a/cmd/server/wire_gen.go b/cmd/server/wire_gen.go index 80c897cde..8c3af20b4 100644 --- a/cmd/server/wire_gen.go +++ b/cmd/server/wire_gen.go @@ -213,24 +213,15 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl } billingConfiguration := conf.Billing featureConnector := common.NewFeatureConnector(logger, client, inMemoryRepository) - billingService, err := common.BillingService(logger, client, service, appstripeService, adapter, billingConfiguration, customerService, featureConnector, inMemoryRepository, connector) - if err != nil { - cleanup5() - cleanup4() - cleanup3() - cleanup2() - cleanup() - return Application{}, nil, err - } - eventsConfiguration := conf.Events brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) + eventsConfiguration := conf.Events v4 := common.ServerProvisionTopics(eventsConfiguration) publisherOptions := kafka.PublisherOptions{ Broker: brokerOptions, ProvisionTopics: v4, TopicProvisioner: topicProvisioner, } - publisher, cleanup6, err := common.NewServerPublisher(ctx, eventsConfiguration, publisherOptions, logger) + publisher, cleanup6, err := common.NewServerPublisher(ctx, publisherOptions, logger) if err != nil { cleanup5() cleanup4() @@ -249,6 +240,16 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } + billingService, err := common.BillingService(logger, client, service, appstripeService, adapter, billingConfiguration, customerService, featureConnector, inMemoryRepository, connector, eventbusPublisher) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } entitlementsConfiguration := conf.Entitlements entitlement := common.NewEntitlementRegistry(logger, client, entitlementsConfiguration, connector, inMemoryRepository, eventbusPublisher) producer, err := common.NewKafkaProducer(kafkaIngestConfiguration, logger) diff --git a/openmeter/app/entity/base/app.go b/openmeter/app/entity/base/app.go index 3f29686ce..0b5b1dbcc 100644 --- a/openmeter/app/entity/base/app.go +++ b/openmeter/app/entity/base/app.go @@ -37,11 +37,11 @@ const ( type AppBase struct { models.ManagedResource - Type AppType - Status AppStatus - Default bool - Listing MarketplaceListing - Metadata map[string]string + Type AppType `json:"type"` + Status AppStatus `json:"status"` + Default bool `json:"default"` + Listing MarketplaceListing `json:"listing"` + Metadata map[string]string `json:"metadata,omitempty"` } func (a AppBase) GetAppBase() AppBase { diff --git a/openmeter/billing/adapter/validationissue.go b/openmeter/billing/adapter/validationissue.go index a1b5616e8..4d565a67d 100644 --- a/openmeter/billing/adapter/validationissue.go +++ b/openmeter/billing/adapter/validationissue.go @@ -89,8 +89,8 @@ func (a *adapter) persistValidationIssues(ctx context.Context, invoice billing.I type ValidationIssueWithDBMeta struct { billing.ValidationIssue - ID string - DeletedAt *time.Time + ID string `json:"id"` + DeletedAt *time.Time `json:"deletedAt,omitempty"` } // IntropectValidationIssues returns the validation issues for the given invoice, this is not diff --git a/openmeter/billing/events.go b/openmeter/billing/events.go new file mode 100644 index 000000000..71f86f701 --- /dev/null +++ b/openmeter/billing/events.go @@ -0,0 +1,36 @@ +package billing + +import ( + "github.com/openmeterio/openmeter/openmeter/event/metadata" +) + +const ( + EventSubsystem metadata.EventSubsystem = "billing" +) + +type InvoiceCreatedEvent struct { + Invoice `json:",inline"` +} + +func NewInvoiceCreatedEvent(invoice Invoice) InvoiceCreatedEvent { + return InvoiceCreatedEvent{invoice.RemoveCircularReferences()} +} + +func (e InvoiceCreatedEvent) EventName() string { + return metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "invoice.created", + Version: "v1", + }) +} + +func (e InvoiceCreatedEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePath(e.Namespace, metadata.EntityInvoice, e.ID), + Subject: metadata.ComposeResourcePath(e.Namespace, metadata.EntityCustomer, e.Customer.CustomerID), + } +} + +func (e InvoiceCreatedEvent) Validate() error { + return e.Invoice.Validate() +} diff --git a/openmeter/billing/profile.go b/openmeter/billing/profile.go index 72d1f3510..4333989a8 100644 --- a/openmeter/billing/profile.go +++ b/openmeter/billing/profile.go @@ -39,7 +39,7 @@ type WorkflowConfig struct { CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` - DeletedAt *time.Time `json:"deletedAt"` + DeletedAt *time.Time `json:"deletedAt,omitempty"` Collection CollectionConfig `json:"collection"` Invoicing InvoicingConfig `json:"invoicing"` @@ -134,7 +134,7 @@ func (r GranularityResolution) Values() []string { } type PaymentConfig struct { - CollectionMethod CollectionMethod + CollectionMethod CollectionMethod `json:"collectionMethod"` } func (c *PaymentConfig) Validate() error { diff --git a/openmeter/billing/service/invoice.go b/openmeter/billing/service/invoice.go index 2506ec5ab..a90f3f9d6 100644 --- a/openmeter/billing/service/invoice.go +++ b/openmeter/billing/service/invoice.go @@ -302,6 +302,14 @@ func (s *Service) InvoicePendingLines(ctx context.Context, input billing.Invoice out = append(out, invoice) } + + for _, invoice := range out { + err := s.publisher.Publish(ctx, billing.NewInvoiceCreatedEvent(invoice)) + if err != nil { + return nil, fmt.Errorf("publishing event: %w", err) + } + } + return out, nil }) } diff --git a/openmeter/billing/service/invoiceline.go b/openmeter/billing/service/invoiceline.go index ab51fcc0c..6cca0060f 100644 --- a/openmeter/billing/service/invoiceline.go +++ b/openmeter/billing/service/invoiceline.go @@ -47,6 +47,7 @@ func (s *Service) CreatePendingInvoiceLines(ctx context.Context, input billing.C return transaction.Run(ctx, s.adapter, func(ctx context.Context) ([]*billing.Line, error) { out := make([]*billing.Line, 0, len(input.Lines)) + newInvoiceIDs := []string{} for customerID, lineByCustomer := range createByCustomerID { if err := s.validateCustomerForUpdate(ctx, customerentity.CustomerID{ @@ -82,6 +83,10 @@ func (s *Service) CreatePendingInvoiceLines(ctx context.Context, input billing.C return nil, fmt.Errorf("upserting line[%d]: %w", i, err) } + if updateResult.IsInvoiceNew { + newInvoiceIDs = append(newInvoiceIDs, updateResult.Invoice.ID) + } + lineService, err := s.lineService.FromEntity(&updateResult.Line) if err != nil { return nil, fmt.Errorf("creating line service[%d]: %w", i, err) @@ -117,13 +122,31 @@ func (s *Service) CreatePendingInvoiceLines(ctx context.Context, input billing.C out = append(out, createdLines...) } + for _, invoiceID := range newInvoiceIDs { + invoice, err := s.GetInvoiceByID(ctx, billing.GetInvoiceByIdInput{ + Invoice: billing.InvoiceID{ + Namespace: input.Namespace, + ID: invoiceID, + }, + Expand: billing.InvoiceExpandAll, + }) + if err != nil { + return nil, fmt.Errorf("fetching invoice[%s]: %w", invoiceID, err) + } + + if err := s.publisher.Publish(ctx, billing.NewInvoiceCreatedEvent(invoice)); err != nil { + return nil, fmt.Errorf("publishing invoice[%s] created event: %w", invoiceID, err) + } + } + return out, nil }) } type upsertLineInvoiceResponse struct { - Line billing.Line - Invoice *billing.Invoice + Line billing.Line + Invoice *billing.Invoice + IsInvoiceNew bool } func (s *Service) upsertLineInvoice(ctx context.Context, line billing.Line, input billing.CreateInvoiceLinesInput, customerProfile *billing.ProfileWithCustomerDetails) (*upsertLineInvoiceResponse, error) { @@ -161,8 +184,9 @@ func (s *Service) upsertLineInvoice(ctx context.Context, line billing.Line, inpu line.InvoiceID = invoice.ID return &upsertLineInvoiceResponse{ - Line: line, - Invoice: &invoice, + Line: line, + Invoice: &invoice, + IsInvoiceNew: true, }, nil } diff --git a/openmeter/billing/service/service.go b/openmeter/billing/service/service.go index 07f7fe447..52d0c2bfe 100644 --- a/openmeter/billing/service/service.go +++ b/openmeter/billing/service/service.go @@ -15,6 +15,7 @@ import ( "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" "github.com/openmeterio/openmeter/openmeter/streaming" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/framework/transaction" ) @@ -31,6 +32,7 @@ type Service struct { streamingConnector streaming.Connector lineService *lineservice.Service + publisher eventbus.Publisher } type Config struct { @@ -41,6 +43,7 @@ type Config struct { FeatureService feature.FeatureConnector MeterRepo meter.Repository StreamingConnector streaming.Connector + Publisher eventbus.Publisher } func (c Config) Validate() error { @@ -72,6 +75,10 @@ func (c Config) Validate() error { return errors.New("streaming connector cannot be null") } + if c.Publisher == nil { + return errors.New("publisher cannot be null") + } + return nil } @@ -88,6 +95,7 @@ func New(config Config) (*Service, error) { featureService: config.FeatureService, meterRepo: config.MeterRepo, streamingConnector: config.StreamingConnector, + publisher: config.Publisher, } lineSvc, err := lineservice.New(lineservice.Config{ diff --git a/openmeter/billing/worker/subscription/sync.go b/openmeter/billing/worker/subscription/sync.go index 365fb300d..8e98ba02b 100644 --- a/openmeter/billing/worker/subscription/sync.go +++ b/openmeter/billing/worker/subscription/sync.go @@ -16,6 +16,7 @@ import ( "github.com/openmeterio/openmeter/pkg/clock" "github.com/openmeterio/openmeter/pkg/currencyx" "github.com/openmeterio/openmeter/pkg/framework/transaction" + "github.com/openmeterio/openmeter/pkg/models" "github.com/openmeterio/openmeter/pkg/slicesx" "github.com/openmeterio/openmeter/pkg/timex" ) @@ -30,10 +31,11 @@ type FeatureFlags struct { } type Config struct { - BillingService billing.Service - TxCreator transaction.Creator - Logger *slog.Logger - FeatureFlags FeatureFlags + BillingService billing.Service + SubscriptionService subscription.Service + TxCreator transaction.Creator + Logger *slog.Logger + FeatureFlags FeatureFlags } func (c Config) Validate() error { @@ -41,6 +43,10 @@ func (c Config) Validate() error { return fmt.Errorf("billing service is required") } + if c.SubscriptionService == nil { + return fmt.Errorf("subscription service is required") + } + if c.TxCreator == nil { return fmt.Errorf("transaction creator is required") } @@ -53,10 +59,11 @@ func (c Config) Validate() error { } type Handler struct { - billingService billing.Service - txCreator transaction.Creator - logger *slog.Logger - featureFlags FeatureFlags + billingService billing.Service + subscriptionService subscription.Service + txCreator transaction.Creator + logger *slog.Logger + featureFlags FeatureFlags } func New(config Config) (*Handler, error) { @@ -64,10 +71,11 @@ func New(config Config) (*Handler, error) { return nil, err } return &Handler{ - billingService: config.BillingService, - txCreator: config.TxCreator, - logger: config.Logger, - featureFlags: config.FeatureFlags, + billingService: config.BillingService, + txCreator: config.TxCreator, + logger: config.Logger, + featureFlags: config.FeatureFlags, + subscriptionService: config.SubscriptionService, }, nil } @@ -766,3 +774,40 @@ func (h *Handler) cloneLineForUpsert(line *billing.Line) *billing.Line { clone.ParentLine = line.ParentLine return clone } + +// HandleInvoiceCreation is a handler for the invoice creation event, it will make sure that +// we are backfilling the items consumed by invoice creation into the gathering invoice. +func (h *Handler) HandleInvoiceCreation(ctx context.Context, invoice billing.Invoice) error { + if invoice.Status == billing.InvoiceStatusGathering { + return nil + } + + affectedSubscriptions := lo.Uniq( + lo.Map( + lo.Filter(invoice.Lines.OrEmpty(), func(line *billing.Line, _ int) bool { + return line.Status == billing.InvoiceLineStatusValid && + line.Subscription != nil + }), + func(line *billing.Line, _ int) string { + return line.Subscription.SubscriptionID + }), + ) + + for _, subscriptionID := range affectedSubscriptions { + subsView, err := h.subscriptionService.GetView(ctx, models.NamespacedID{ + Namespace: invoice.Namespace, + ID: subscriptionID, + }) + if err != nil { + return fmt.Errorf("getting subscription view[%s]: %w", subscriptionID, err) + } + + // We use the current time as reference point instead of the invoice, as if we are delayed + // we might want to provision more lines + if err := h.SyncronizeSubscription(ctx, subsView, clock.Now()); err != nil { + return fmt.Errorf("syncing subscription[%s]: %w", subscriptionID, err) + } + } + + return nil +} diff --git a/openmeter/billing/worker/subscription/sync_test.go b/openmeter/billing/worker/subscription/sync_test.go index 78c0db905..c92549fbb 100644 --- a/openmeter/billing/worker/subscription/sync_test.go +++ b/openmeter/billing/worker/subscription/sync_test.go @@ -114,9 +114,10 @@ func (s *SubscriptionHandlerTestSuite) SetupSuite() { }) handler, err := New(Config{ - BillingService: s.BillingService, - Logger: slog.Default(), - TxCreator: s.BillingAdapter, + BillingService: s.BillingService, + Logger: slog.Default(), + TxCreator: s.BillingAdapter, + SubscriptionService: s.SubscriptionService, }) s.NoError(err) diff --git a/openmeter/billing/worker/worker.go b/openmeter/billing/worker/worker.go index 0eaf2f331..6e0b981a1 100644 --- a/openmeter/billing/worker/worker.go +++ b/openmeter/billing/worker/worker.go @@ -27,6 +27,8 @@ type WorkerOptions struct { BillingAdapter billing.Adapter BillingService billing.Service // External connectors + + SubscriptionService subscription.Service } func (w WorkerOptions) Validate() error { @@ -54,6 +56,10 @@ func (w WorkerOptions) Validate() error { return fmt.Errorf("billing adapter is required") } + if w.SubscriptionService == nil { + return fmt.Errorf("subscription service is required") + } + return nil } @@ -70,9 +76,10 @@ func New(opts WorkerOptions) (*Worker, error) { } handler, err := billingworkersubscription.New(billingworkersubscription.Config{ - BillingService: opts.BillingService, - Logger: opts.Logger, - TxCreator: opts.BillingAdapter, + BillingService: opts.BillingService, + Logger: opts.Logger, + TxCreator: opts.BillingAdapter, + SubscriptionService: opts.SubscriptionService, }) if err != nil { return nil, err @@ -122,6 +129,9 @@ func (w *Worker) eventHandler(opts WorkerOptions) (message.NoPublishHandlerFunc, grouphandler.NewGroupEventHandler(func(ctx context.Context, event *subscription.UpdatedEvent) error { return w.subscriptionHandler.SyncronizeSubscription(ctx, event.UpdatedView, time.Now()) }), + grouphandler.NewGroupEventHandler(func(ctx context.Context, event *billing.InvoiceCreatedEvent) error { + return w.subscriptionHandler.HandleInvoiceCreation(ctx, event.Invoice) + }), ) if err != nil { return nil, fmt.Errorf("failed to create event handler: %w", err) diff --git a/openmeter/customer/entity/customer.go b/openmeter/customer/entity/customer.go index 84ab14e0a..9af89bc07 100644 --- a/openmeter/customer/entity/customer.go +++ b/openmeter/customer/entity/customer.go @@ -89,7 +89,7 @@ func (i CustomerID) Validate() error { // CustomerUsageAttribution represents the usage attribution for a customer type CustomerUsageAttribution struct { - SubjectKeys []string + SubjectKeys []string `json:"subjectKeys"` } // ListCustomersInput represents the input for the ListCustomers method diff --git a/openmeter/event/metadata/resourcepath.go b/openmeter/event/metadata/resourcepath.go index d112e8e03..5b9821e2e 100644 --- a/openmeter/event/metadata/resourcepath.go +++ b/openmeter/event/metadata/resourcepath.go @@ -9,6 +9,7 @@ import ( const ( EntityEntitlement = "entitlement" EntitySubscription = "subscription" + EntityInvoice = "invoice" EntityCustomer = "customer" EntitySubjectKey = "subjectKey" EntityGrant = "grant" diff --git a/test/billing/suite.go b/test/billing/suite.go index d961fce91..5967f972c 100644 --- a/test/billing/suite.go +++ b/test/billing/suite.go @@ -33,6 +33,7 @@ import ( "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" streamingtestutils "github.com/openmeterio/openmeter/openmeter/streaming/testutils" "github.com/openmeterio/openmeter/openmeter/testutils" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/currencyx" "github.com/openmeterio/openmeter/pkg/models" "github.com/openmeterio/openmeter/tools/migrate" @@ -145,6 +146,7 @@ func (s *BaseSuite) SetupSuite() { FeatureService: s.FeatureService, MeterRepo: s.MeterRepo, StreamingConnector: s.MockStreamingConnector, + Publisher: eventbus.NewMock(s.T()), }) require.NoError(t, err)