Skip to content

Commit

Permalink
feat: implement subscription line adding on invoice creation (#2105)
Browse files Browse the repository at this point in the history
  • Loading branch information
turip authored Jan 17, 2025
1 parent 15dd0d9 commit cee460d
Show file tree
Hide file tree
Showing 19 changed files with 281 additions and 54 deletions.
3 changes: 3 additions & 0 deletions app/common/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/openmeterio/openmeter/openmeter/meter"
"github.com/openmeterio/openmeter/openmeter/productcatalog/feature"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
)

var Billing = wire.NewSet(
Expand Down Expand Up @@ -45,6 +46,7 @@ func BillingService(
featureConnector feature.FeatureConnector,
meterRepo meter.Repository,
streamingConnector streaming.Connector,
eventPublisher eventbus.Publisher,
) (billing.Service, error) {
if !billingConfig.Enabled {
return nil, nil
Expand All @@ -58,6 +60,7 @@ func BillingService(
Logger: logger,
MeterRepo: meterRepo,
StreamingConnector: streamingConnector,
Publisher: eventPublisher,
})
}

Expand Down
16 changes: 10 additions & 6 deletions app/common/openmeter_billingworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ var BillingWorker = wire.NewSet(
BillingWorkerProvisionTopics,
BillingWorkerSubscriber,

NewFeatureConnector,
Subscription,
ProductCatalog,
Entitlement,
BillingAdapter,
BillingService,

Expand Down Expand Up @@ -69,16 +71,18 @@ func NewBillingWorkerOptions(
eventBus eventbus.Publisher,
billingService billing.Service,
billingAdapter billing.Adapter,
subscriptionServices SubscriptionServiceWithWorkflow,
logger *slog.Logger,
) billingworker.WorkerOptions {
return billingworker.WorkerOptions{
SystemEventsTopic: eventConfig.SystemEvents.Topic,

Router: routerOptions,
EventBus: eventBus,
BillingService: billingService,
BillingAdapter: billingAdapter,
Logger: logger,
Router: routerOptions,
EventBus: eventBus,
BillingService: billingService,
BillingAdapter: billingAdapter,
SubscriptionService: subscriptionServices.Service,
Logger: logger,
}
}

Expand Down
1 change: 0 additions & 1 deletion app/common/openmeter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func NewIngestCollector(

func NewServerPublisher(
ctx context.Context,
conf config.EventsConfiguration,
options watermillkafka.PublisherOptions,
logger *slog.Logger,
) (message.Publisher, func(), error) {
Expand Down
37 changes: 35 additions & 2 deletions cmd/billing-worker/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 54 additions & 2 deletions cmd/jobs/billing/advance/advance.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"

"github.com/spf13/cobra"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"

"github.com/openmeterio/openmeter/app/common"
appconfig "github.com/openmeterio/openmeter/app/config"
Expand All @@ -14,6 +15,7 @@ import (
billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter"
billingservice "github.com/openmeterio/openmeter/openmeter/billing/service"
billingworkerautoadvance "github.com/openmeterio/openmeter/openmeter/billing/worker/advance"
"github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver"
"github.com/openmeterio/openmeter/pkg/framework/pgdriver"
)
Expand All @@ -33,7 +35,19 @@ func init() {
Cmd.PersistentFlags().StringVar(&namespace, "namespace", "", "namespace the operation should be performed")
}

func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger *slog.Logger) (*billingworkerautoadvance.AutoAdvancer, error) {
type autoAdvancer struct {
*billingworkerautoadvance.AutoAdvancer

Shutdown func()
}

func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger *slog.Logger) (*autoAdvancer, error) {
commonMetadata := common.NewMetadata(conf, "0.0.0", "billing-advancer")

// We use a noop meter provider as we don't want to monitor cronjobs (for now)
meterProvider := sdkmetric.NewMeterProvider()
meter := meterProvider.Meter("billing-advancer")

// Initialize Postgres driver
postgresDriver, err := pgdriver.NewPostgresDriver(ctx, conf.Postgres.URL)
if err != nil {
Expand Down Expand Up @@ -87,6 +101,34 @@ func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger *
return nil, fmt.Errorf("failed to initialize streaming connection: %w", err)
}

brokerOptions := common.NewBrokerConfiguration(conf.Ingest.Kafka.KafkaConfiguration, conf.Telemetry.Log, commonMetadata, logger, meter)

adminClient, err := common.NewKafkaAdminClient(conf.Ingest.Kafka.KafkaConfiguration)
if err != nil {
return nil, fmt.Errorf("failed to initialize kafka admin client: %w", err)
}

kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, conf.Ingest.Kafka.TopicProvisionerConfig)

topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize kafka topic provisioner: %w", err)
}

publisher, serverPublisherShutdown, err := common.NewServerPublisher(ctx, kafka.PublisherOptions{
Broker: brokerOptions,
ProvisionTopics: common.ServerProvisionTopics(conf.Events),
TopicProvisioner: topicProvisioner,
}, logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize server publisher: %w", err)
}

ebPublisher, err := common.NewEventBusPublisher(publisher, conf.Events, logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize event bus publisher: %w", err)
}

billingAdapter, err := billingadapter.New(billingadapter.Config{
Client: entPostgresDriver.Client(),
Logger: logger,
Expand All @@ -103,6 +145,7 @@ func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger *
FeatureService: featureService,
MeterRepo: meterRepository,
StreamingConnector: streamingConnector,
Publisher: ebPublisher,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize billing service: %w", err)
Expand All @@ -116,7 +159,10 @@ func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger *
return nil, fmt.Errorf("failed to initialize billing auto-advancer: %w", err)
}

return a, nil
return &autoAdvancer{
AutoAdvancer: a,
Shutdown: serverPublisherShutdown,
}, nil
}

var ListCmd = func() *cobra.Command {
Expand All @@ -136,6 +182,8 @@ var ListCmd = func() *cobra.Command {
return err
}

defer a.Shutdown()

var ns []string
if namespace != "" {
ns = append(ns, namespace)
Expand Down Expand Up @@ -175,6 +223,8 @@ var InvoiceCmd = func() *cobra.Command {
return err
}

defer a.Shutdown()

if namespace == "" {
return fmt.Errorf("invoice namespace is required")
}
Expand Down Expand Up @@ -215,6 +265,8 @@ var AllCmd = func() *cobra.Command {
return err
}

defer a.Shutdown()

var ns []string
if namespace != "" {
ns = append(ns, namespace)
Expand Down
23 changes: 12 additions & 11 deletions cmd/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions openmeter/app/entity/base/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ const (
type AppBase struct {
models.ManagedResource

Type AppType
Status AppStatus
Default bool
Listing MarketplaceListing
Metadata map[string]string
Type AppType `json:"type"`
Status AppStatus `json:"status"`
Default bool `json:"default"`
Listing MarketplaceListing `json:"listing"`
Metadata map[string]string `json:"metadata,omitempty"`
}

func (a AppBase) GetAppBase() AppBase {
Expand Down
4 changes: 2 additions & 2 deletions openmeter/billing/adapter/validationissue.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func (a *adapter) persistValidationIssues(ctx context.Context, invoice billing.I
type ValidationIssueWithDBMeta struct {
billing.ValidationIssue

ID string
DeletedAt *time.Time
ID string `json:"id"`
DeletedAt *time.Time `json:"deletedAt,omitempty"`
}

// IntropectValidationIssues returns the validation issues for the given invoice, this is not
Expand Down
36 changes: 36 additions & 0 deletions openmeter/billing/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package billing

import (
"github.com/openmeterio/openmeter/openmeter/event/metadata"
)

const (
EventSubsystem metadata.EventSubsystem = "billing"
)

type InvoiceCreatedEvent struct {
Invoice `json:",inline"`
}

func NewInvoiceCreatedEvent(invoice Invoice) InvoiceCreatedEvent {
return InvoiceCreatedEvent{invoice.RemoveCircularReferences()}
}

func (e InvoiceCreatedEvent) EventName() string {
return metadata.GetEventName(metadata.EventType{
Subsystem: EventSubsystem,
Name: "invoice.created",
Version: "v1",
})
}

func (e InvoiceCreatedEvent) EventMetadata() metadata.EventMetadata {
return metadata.EventMetadata{
Source: metadata.ComposeResourcePath(e.Namespace, metadata.EntityInvoice, e.ID),
Subject: metadata.ComposeResourcePath(e.Namespace, metadata.EntityCustomer, e.Customer.CustomerID),
}
}

func (e InvoiceCreatedEvent) Validate() error {
return e.Invoice.Validate()
}
4 changes: 2 additions & 2 deletions openmeter/billing/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type WorkflowConfig struct {

CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
DeletedAt *time.Time `json:"deletedAt"`
DeletedAt *time.Time `json:"deletedAt,omitempty"`

Collection CollectionConfig `json:"collection"`
Invoicing InvoicingConfig `json:"invoicing"`
Expand Down Expand Up @@ -134,7 +134,7 @@ func (r GranularityResolution) Values() []string {
}

type PaymentConfig struct {
CollectionMethod CollectionMethod
CollectionMethod CollectionMethod `json:"collectionMethod"`
}

func (c *PaymentConfig) Validate() error {
Expand Down
8 changes: 8 additions & 0 deletions openmeter/billing/service/invoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ func (s *Service) InvoicePendingLines(ctx context.Context, input billing.Invoice

out = append(out, invoice)
}

for _, invoice := range out {
err := s.publisher.Publish(ctx, billing.NewInvoiceCreatedEvent(invoice))
if err != nil {
return nil, fmt.Errorf("publishing event: %w", err)
}
}

return out, nil
})
}
Expand Down
Loading

0 comments on commit cee460d

Please sign in to comment.