diff --git a/pkg/kine/prepared/db.go b/pkg/kine/prepared/db.go deleted file mode 100644 index fa5e99e5..00000000 --- a/pkg/kine/prepared/db.go +++ /dev/null @@ -1,135 +0,0 @@ -package prepared - -import ( - "context" - "database/sql" - "errors" - "fmt" - "sync" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -const otelName = "prepared" - -var otelTracer trace.Tracer - -func init() { - otelTracer = otel.Tracer(otelName) -} - -type DB struct { - underlying *sql.DB - mu sync.RWMutex - store map[string]*sql.Stmt -} - -func New(db *sql.DB) *DB { - return &DB{ - underlying: db, - store: make(map[string]*sql.Stmt), - } -} - -func (db *DB) Underlying() *sql.DB { return db.underlying } - -func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) { - ctx, span := otelTracer.Start(ctx, "DB.ExecContext") - defer func() { - span.RecordError(err) - span.End() - }() - - stmt, err := db.prepare(ctx, query) - if err != nil { - return nil, err - } - return stmt.ExecContext(ctx, args...) -} - -func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) { - ctx, span := otelTracer.Start(ctx, "DB.QueryContext") - defer func() { - span.RecordError(err) - span.End() - }() - - stmt, err := db.prepare(ctx, query) - if err != nil { - return nil, err - } - return stmt.QueryContext(ctx, args...) -} - -func (db *DB) Close() error { - db.mu.Lock() - defer db.mu.Unlock() - - errs := []error{} - for _, stmt := range db.store { - if err := stmt.Close(); err != nil { - errs = append(errs, err) - } - } - db.store = nil - - if err := db.underlying.Close(); err != nil { - errs = append(errs, err) - } - db.underlying = nil - - return errors.Join(errs...) -} - -func (db *DB) prepare(ctx context.Context, query string) (stmt *sql.Stmt, err error) { - ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.prepare", otelName)) - defer func() { - span.RecordError(err) - span.End() - }() - span.SetAttributes(attribute.String("query", query)) - - db.mu.RLock() - span.AddEvent("acquired read lock") - stmt = db.store[query] - db.mu.RUnlock() - if stmt != nil { - return stmt, nil - } - - db.mu.Lock() - span.AddEvent("acquired read-write lock") - defer db.mu.Unlock() - - if db.underlying == nil { - return nil, errors.New("database is closed") - } - - // Check again if the query was prepared during locking - stmt = db.store[query] - if stmt != nil { - return stmt, nil - } - - prepared, err := db.underlying.PrepareContext(ctx, query) - if err != nil { - return nil, err - } - - db.store[query] = prepared - return prepared, nil -} - -func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error) { - tx, err := db.underlying.BeginTx(ctx, opts) - if err != nil { - return nil, err - } - - return &Tx{ - db: db, - tx: tx, - }, nil -} diff --git a/pkg/kine/prepared/tx.go b/pkg/kine/prepared/tx.go deleted file mode 100644 index 2d80f4cb..00000000 --- a/pkg/kine/prepared/tx.go +++ /dev/null @@ -1,32 +0,0 @@ -package prepared - -import ( - "context" - "database/sql" -) - -type Tx struct { - db *DB - tx *sql.Tx -} - -func (tx *Tx) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { - stmt, err := tx.db.prepare(ctx, query) - if err != nil { - return nil, err - } - - return tx.tx.StmtContext(ctx, stmt).ExecContext(ctx, args...) -} - -func (tx *Tx) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { - stmt, err := tx.db.prepare(ctx, query) - if err != nil { - return nil, err - } - - return tx.tx.StmtContext(ctx, stmt).QueryContext(ctx, args...) -} - -func (tx *Tx) Commit() error { return tx.tx.Commit() } -func (tx *Tx) Rollback() error { return tx.tx.Rollback() }