diff --git a/CHANGELOG.md b/CHANGELOG.md index ce8977932..dd920d141 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added context to some internal methods for better tracing +* Added `trace.FunctionID` helper and `FunctionID` field to trace start info's * Replaced lazy initialization of ydb clients (table, topic, etc.) to explicit initialization on `ydb.Open` step ## v3.54.1 diff --git a/driver.go b/driver.go index d87c35b10..f35ee074f 100644 --- a/driver.go +++ b/driver.go @@ -26,6 +26,7 @@ import ( schemeConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme/config" internalScripting "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting" scriptingConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" internalTable "github.com/ydb-platform/ydb-go-sdk/v3/internal/table" tableConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicclientinternal" @@ -93,7 +94,12 @@ type Driver struct { //nolint:maligned } // Close closes Driver and clear resources -func (d *Driver) Close(ctx context.Context) error { +func (d *Driver) Close(ctx context.Context) (finalErr error) { + onDone := trace.DriverOnClose(d.config.Trace(), &ctx, stack.FunctionID(0)) + defer func() { + onDone(finalErr) + }() + d.mtx.Lock() defer d.mtx.Unlock() @@ -103,16 +109,16 @@ func (d *Driver) Close(ctx context.Context) error { } }() - closers := make([]func(context.Context) error, 0) + closes := make([]func(context.Context) error, 0) d.childrenMtx.WithLock(func() { for _, child := range d.children { - closers = append(closers, child.Close) + closes = append(closes, child.Close) } d.children = nil }) - closers = append( - closers, + closes = append( + closes, d.ratelimiter.Close, d.coordination.Close, d.scheme.Close, @@ -124,8 +130,8 @@ func (d *Driver) Close(ctx context.Context) error { ) var issues []error - for _, closer := range closers { - if err := closer(ctx); err != nil { + for _, f := range closes { + if err := f(ctx); err != nil { issues = append(issues, err) } } @@ -292,11 +298,7 @@ func connect(ctx context.Context, d *Driver) error { } onDone := trace.DriverOnInit( - d.config.Trace(), - &ctx, - d.config.Endpoint(), - d.config.Database(), - d.config.Secure(), + d.config.Trace(), &ctx, stack.FunctionID(2), d.config.Endpoint(), d.config.Database(), d.config.Secure(), ) defer func() { onDone(err) @@ -313,7 +315,7 @@ func connect(ctx context.Context, d *Driver) error { } if d.pool == nil { - d.pool = conn.NewPool(d.config) + d.pool = conn.NewPool(ctx, d.config) } d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...) diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index b0e66885e..cbc68a450 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -15,6 +15,7 @@ import ( discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" "github.com/ydb-platform/ydb-go-sdk/v3/internal/repeater" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" @@ -91,7 +92,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) { var ( address = "ydb:///" + b.driverConfig.Endpoint() onDone = trace.DriverOnBalancerClusterDiscoveryAttempt( - b.driverConfig.Trace(), &ctx, address, + b.driverConfig.Trace(), &ctx, stack.FunctionID(0), address, ) endpoints []endpoint.Endpoint localDC string @@ -127,9 +128,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) { func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) { onDone := trace.DriverOnBalancerUpdate( - b.driverConfig.Trace(), - &ctx, - b.balancerConfig.DetectlocalDC, + b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.balancerConfig.DetectlocalDC, ) defer func() { nodes := make([]trace.EndpointInfo, 0, len(endpoints)) @@ -163,8 +162,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end func (b *Balancer) Close(ctx context.Context) (err error) { onDone := trace.DriverOnBalancerClose( - b.driverConfig.Trace(), - &ctx, + b.driverConfig.Trace(), &ctx, stack.FunctionID(0), ) defer func() { onDone(err) @@ -189,8 +187,7 @@ func New( ) (b *Balancer, finalErr error) { var ( onDone = trace.DriverOnBalancerInit( - driverConfig.Trace(), - &ctx, + driverConfig.Trace(), &ctx, stack.FunctionID(0), ) discoveryConfig = discoveryConfig.New(append(opts, discoveryConfig.With(driverConfig.Common), @@ -235,7 +232,8 @@ func New( } // run background discovering if d := discoveryConfig.Interval(); d > 0 { - b.discoveryRepeater = repeater.New(d, b.clusterDiscoveryAttempt, + b.discoveryRepeater = repeater.New(xcontext.WithoutDeadline(ctx), + d, b.clusterDiscoveryAttempt, repeater.WithName("discovery"), repeater.WithTrace(b.driverConfig.Trace()), ) @@ -320,8 +318,7 @@ func (b *Balancer) connections() *connectionsState { func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) { onDone := trace.DriverOnBalancerChooseEndpoint( - b.driverConfig.Trace(), - &ctx, + b.driverConfig.Trace(), &ctx, stack.FunctionID(0), ) defer func() { if err == nil { diff --git a/internal/balancer/local_dc_test.go b/internal/balancer/local_dc_test.go index 678b55c3f..26eaf38a6 100644 --- a/internal/balancer/local_dc_test.go +++ b/internal/balancer/local_dc_test.go @@ -136,7 +136,7 @@ func TestLocalDCDiscovery(t *testing.T) { r := &Balancer{ driverConfig: cfg, balancerConfig: *cfg.Balancer(), - pool: conn.NewPool(cfg), + pool: conn.NewPool(context.Background(), cfg), discoveryClient: discoveryMock{endpoints: []endpoint.Endpoint{ &mock.Endpoint{AddrField: "a:123", LocationField: "a"}, &mock.Endpoint{AddrField: "b:234", LocationField: "b"}, diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 1b074a83e..3a2e14248 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -15,6 +15,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" "github.com/ydb-platform/ydb-go-sdk/v3/internal/response" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xatomic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -42,8 +43,8 @@ type Conn interface { Ping(ctx context.Context) error IsState(states ...State) bool GetState() State - SetState(State) State - Unban() State + SetState(context.Context, State) State + Unban(context.Context) State } type conn struct { @@ -93,9 +94,7 @@ func (c *conn) IsState(states ...State) bool { func (c *conn) park(ctx context.Context) (err error) { onDone := trace.DriverOnConnPark( - c.config.Trace(), - &ctx, - c.Endpoint(), + c.config.Trace(), &ctx, stack.FunctionID(0), c.Endpoint(), ) defer func() { onDone(err) @@ -112,7 +111,7 @@ func (c *conn) park(ctx context.Context) (err error) { return nil } - err = c.close() + err = c.close(ctx) if err != nil { return c.wrapError(err) @@ -135,20 +134,20 @@ func (c *conn) Endpoint() endpoint.Endpoint { return nil } -func (c *conn) SetState(s State) State { - return c.setState(s) +func (c *conn) SetState(ctx context.Context, s State) State { + return c.setState(ctx, s) } -func (c *conn) setState(s State) State { +func (c *conn) setState(ctx context.Context, s State) State { if state := State(c.state.Swap(uint32(s))); state != s { trace.DriverOnConnStateChange( - c.config.Trace(), c.endpoint.Copy(), state, + c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(), state, )(s) } return s } -func (c *conn) Unban() State { +func (c *conn) Unban(ctx context.Context) State { var newState State c.mtx.RLock() cc := c.cc @@ -159,7 +158,7 @@ func (c *conn) Unban() State { newState = Offline } - c.setState(newState) + c.setState(ctx, newState) return newState } @@ -186,9 +185,7 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) { } onDone := trace.DriverOnConnDial( - c.config.Trace(), - &ctx, - c.endpoint.Copy(), + c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(), ) defer func() { @@ -221,7 +218,7 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) { } c.cc = cc - c.setState(Online) + c.setState(ctx, Online) return c.cc, nil } @@ -243,13 +240,13 @@ func isAvailable(raw *grpc.ClientConn) bool { } // conn must be locked -func (c *conn) close() (err error) { +func (c *conn) close(ctx context.Context) (err error) { if c.cc == nil { return nil } err = c.cc.Close() c.cc = nil - c.setState(Offline) + c.setState(ctx, Offline) return c.wrapError(err) } @@ -268,9 +265,7 @@ func (c *conn) Close(ctx context.Context) (err error) { } onDone := trace.DriverOnConnClose( - c.config.Trace(), - &ctx, - c.Endpoint(), + c.config.Trace(), &ctx, stack.FunctionID(0), c.Endpoint(), ) defer func() { onDone(err) @@ -278,9 +273,9 @@ func (c *conn) Close(ctx context.Context) (err error) { c.closed = true - err = c.close() + err = c.close(ctx) - c.setState(Destroyed) + c.setState(ctx, Destroyed) for _, onClose := range c.onClose { onClose(c) @@ -301,10 +296,7 @@ func (c *conn) Invoke( issues []trace.Issue useWrapping = UseWrapping(ctx) onDone = trace.DriverOnConnInvoke( - c.config.Trace(), - &ctx, - c.endpoint, - trace.Method(method), + c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint, trace.Method(method), ) cc *grpc.ClientConn md = metadata.MD{} @@ -386,10 +378,7 @@ func (c *conn) NewStream( ) (_ grpc.ClientStream, err error) { var ( streamRecv = trace.DriverOnConnNewStream( - c.config.Trace(), - &ctx, - c.endpoint.Copy(), - trace.Method(method), + c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(), trace.Method(method), ) useWrapping = UseWrapping(ctx) cc *grpc.ClientConn diff --git a/internal/conn/pool.go b/internal/conn/pool.go index bd896da2a..cb43de1cc 100644 --- a/internal/conn/pool.go +++ b/internal/conn/pool.go @@ -10,6 +10,8 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -88,12 +90,8 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) { } trace.DriverOnConnBan( - p.config.Trace(), - &ctx, - e, - cc.GetState(), - cause, - )(cc.SetState(Banned)) + p.config.Trace(), &ctx, stack.FunctionID(0), e, cc.GetState(), cause, + )(cc.SetState(ctx, Banned)) } func (p *Pool) Allow(ctx context.Context, cc Conn) { @@ -111,11 +109,9 @@ func (p *Pool) Allow(ctx context.Context, cc Conn) { return } - trace.DriverOnConnAllow(p.config.Trace(), - &ctx, - e, - cc.GetState(), - )(cc.Unban()) + trace.DriverOnConnAllow( + p.config.Trace(), &ctx, stack.FunctionID(0), e, cc.GetState(), + )(cc.Unban(ctx)) } func (p *Pool) Take(context.Context) error { @@ -123,7 +119,12 @@ func (p *Pool) Take(context.Context) error { return nil } -func (p *Pool) Release(ctx context.Context) error { +func (p *Pool) Release(ctx context.Context) (finalErr error) { + onDone := trace.DriverOnPoolRelease(p.config.Trace(), &ctx, stack.FunctionID(0)) + defer func() { + onDone(finalErr) + }() + if atomic.AddInt64(&p.usages, -1) > 0 { return nil } @@ -167,7 +168,7 @@ func (p *Pool) Release(ctx context.Context) error { return nil } -func (p *Pool) connParker(ttl, interval time.Duration) { +func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { @@ -179,7 +180,7 @@ func (p *Pool) connParker(ttl, interval time.Duration) { if time.Since(c.LastUsage()) > ttl { switch c.GetState() { case Online, Banned: - _ = c.park(context.Background()) + _ = c.park(ctx) default: // nop } @@ -199,7 +200,10 @@ func (p *Pool) collectConns() []*conn { return conns } -func NewPool(config Config) *Pool { +func NewPool(ctx context.Context, config Config) *Pool { + onDone := trace.DriverOnPoolNew(config.Trace(), &ctx, stack.FunctionID(0)) + defer onDone() + p := &Pool{ usages: 1, config: config, @@ -208,7 +212,7 @@ func NewPool(config Config) *Pool { done: make(chan struct{}), } if ttl := config.ConnectionTTL(); ttl > 0 { - go p.connParker(ttl, ttl/2) + go p.connParker(xcontext.WithoutDeadline(ctx), ttl, ttl/2) } return p } diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index 864508846..e6b00a88e 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -14,6 +14,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/discovery" "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -37,7 +38,9 @@ type Client struct { // Discover cluster endpoints func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, err error) { var ( - onDone = trace.DiscoveryOnDiscover(c.config.Trace(), &ctx, c.config.Endpoint(), c.config.Database()) + onDone = trace.DiscoveryOnDiscover( + c.config.Trace(), &ctx, stack.FunctionID(0), c.config.Endpoint(), c.config.Database(), + ) request = Ydb_Discovery.ListEndpointsRequest{ Database: c.config.Database(), } @@ -97,7 +100,7 @@ func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e func (c *Client) WhoAmI(ctx context.Context) (whoAmI *discovery.WhoAmI, err error) { var ( - onDone = trace.DiscoveryOnWhoAmI(c.config.Trace(), &ctx) + onDone = trace.DiscoveryOnWhoAmI(c.config.Trace(), &ctx, stack.FunctionID(0)) request = Ydb_Discovery.WhoAmIRequest{} response *Ydb_Discovery.WhoAmIResponse whoAmIResultResult Ydb_Discovery.WhoAmIResult diff --git a/internal/meta/meta.go b/internal/meta/meta.go index 4a8f253b7..cf426c3ca 100644 --- a/internal/meta/meta.go +++ b/internal/meta/meta.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc/metadata" "github.com/ydb-platform/ydb-go-sdk/v3/internal/credentials" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -107,7 +108,7 @@ func (m *Meta) meta(ctx context.Context) (_ metadata.MD, err error) { var token string - done := trace.DriverOnGetCredentials(m.trace, &ctx) + done := trace.DriverOnGetCredentials(m.trace, &ctx, stack.FunctionID(0)) defer func() { done(token, err) }() diff --git a/internal/mock/conn.go b/internal/mock/conn.go index 0a8a076f8..1b9a66523 100644 --- a/internal/mock/conn.go +++ b/internal/mock/conn.go @@ -65,13 +65,13 @@ func (c *Conn) GetState() conn.State { return c.State } -func (c *Conn) SetState(state conn.State) conn.State { +func (c *Conn) SetState(ctx context.Context, state conn.State) conn.State { c.State = state return c.State } -func (c *Conn) Unban() conn.State { - c.SetState(conn.Online) +func (c *Conn) Unban(ctx context.Context) conn.State { + c.SetState(ctx, conn.Online) return conn.Online } diff --git a/internal/repeater/repeater.go b/internal/repeater/repeater.go index ca7481625..74f7b1c1c 100644 --- a/internal/repeater/repeater.go +++ b/internal/repeater/repeater.go @@ -7,6 +7,7 @@ import ( "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -89,11 +90,12 @@ func WithEvent(ctx context.Context, event Event) context.Context { // New creates and begins to execute task periodically. func New( + ctx context.Context, interval time.Duration, task func(ctx context.Context) (err error), opts ...option, ) *repeater { - ctx, cancel := xcontext.WithCancel(context.Background()) + ctx, cancel := xcontext.WithCancel(ctx) r := &repeater{ interval: interval, @@ -143,7 +145,7 @@ func (r *repeater) wakeUp(ctx context.Context, e Event) (err error) { ctx = WithEvent(ctx, e) - onDone := trace.DriverOnRepeaterWakeUp(r.trace, &ctx, r.name, e) + onDone := trace.DriverOnRepeaterWakeUp(r.trace, &ctx, stack.FunctionID(0), r.name, e) defer func() { onDone(err) diff --git a/internal/repeater/repeater_test.go b/internal/repeater/repeater_test.go index 72647a064..c6af53a58 100644 --- a/internal/repeater/repeater_test.go +++ b/internal/repeater/repeater_test.go @@ -17,7 +17,7 @@ func TestRepeaterNoWakeUpsAfterStop(t *testing.T) { wakeUpDone = make(chan struct{}) ) fakeClock := clockwork.NewFakeClock() - r := New(interval, func(ctx context.Context) (err error) { + r := New(context.Background(), interval, func(ctx context.Context) (err error) { wakeUpStart <- struct{}{} <-wakeUpDone return nil @@ -71,7 +71,7 @@ func TestRepeaterForceLogBackoff(t *testing.T) { ) repeaterDone := make(chan struct{}) - r := New(10*time.Minute, func(ctx context.Context) (err error) { + r := New(context.Background(), 10*time.Minute, func(ctx context.Context) (err error) { defer func() { repeaterDone <- struct{}{} }() diff --git a/internal/scheme/client.go b/internal/scheme/client.go index 487c1ee4d..e6de0904d 100644 --- a/internal/scheme/client.go +++ b/internal/scheme/client.go @@ -10,9 +10,11 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/scheme" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) //nolint:gofumpt @@ -42,10 +44,11 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) }, nil } -func (c *Client) MakeDirectory(ctx context.Context, path string) (err error) { - if c == nil { - return xerrors.WithStackTrace(errNilClient) - } +func (c *Client) MakeDirectory(ctx context.Context, path string) (finalErr error) { + onDone := trace.SchemeOnMakeDirectory(c.config.Trace(), &ctx, stack.FunctionID(0), path) + defer func() { + onDone(finalErr) + }() call := func(ctx context.Context) error { return xerrors.WithStackTrace(c.makeDirectory(ctx, path)) } @@ -75,10 +78,11 @@ func (c *Client) makeDirectory(ctx context.Context, path string) (err error) { return xerrors.WithStackTrace(err) } -func (c *Client) RemoveDirectory(ctx context.Context, path string) (err error) { - if c == nil { - return xerrors.WithStackTrace(errNilClient) - } +func (c *Client) RemoveDirectory(ctx context.Context, path string) (finalErr error) { + onDone := trace.SchemeOnRemoveDirectory(c.config.Trace(), &ctx, stack.FunctionID(0), path) + defer func() { + onDone(finalErr) + }() call := func(ctx context.Context) error { return xerrors.WithStackTrace(c.removeDirectory(ctx, path)) } @@ -108,10 +112,11 @@ func (c *Client) removeDirectory(ctx context.Context, path string) (err error) { return xerrors.WithStackTrace(err) } -func (c *Client) ListDirectory(ctx context.Context, path string) (d scheme.Directory, _ error) { - if c == nil { - return d, xerrors.WithStackTrace(errNilClient) - } +func (c *Client) ListDirectory(ctx context.Context, path string) (d scheme.Directory, finalErr error) { + onDone := trace.SchemeOnListDirectory(c.config.Trace(), &ctx, stack.FunctionID(0)) + defer func() { + onDone(finalErr) + }() call := func(ctx context.Context) (err error) { d, err = c.listDirectory(ctx, path) return xerrors.WithStackTrace(err) @@ -160,11 +165,12 @@ func (c *Client) listDirectory(ctx context.Context, path string) (scheme.Directo return d, nil } -func (c *Client) DescribePath(ctx context.Context, path string) (e scheme.Entry, err error) { - if c == nil { - return e, xerrors.WithStackTrace(errNilClient) - } - call := func(ctx context.Context) error { +func (c *Client) DescribePath(ctx context.Context, path string) (e scheme.Entry, finalErr error) { + onDone := trace.SchemeOnDescribePath(c.config.Trace(), &ctx, stack.FunctionID(0), path) + defer func() { + onDone(e.Type.String(), finalErr) + }() + call := func(ctx context.Context) (err error) { e, err = c.describePath(ctx, path) if err != nil { return xerrors.WithStackTrace(err) @@ -172,10 +178,10 @@ func (c *Client) DescribePath(ctx context.Context, path string) (e scheme.Entry, return nil } if !c.config.AutoRetry() { - err = call(ctx) - return + err := call(ctx) + return e, err } - err = retry.Retry(ctx, call, + err := retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace(), retry.WithTrace(c.config.TraceRetry()), @@ -211,12 +217,21 @@ func (c *Client) describePath(ctx context.Context, path string) (e scheme.Entry, return e, nil } -func (c *Client) ModifyPermissions(ctx context.Context, path string, opts ...scheme.PermissionsOption) (err error) { - if c == nil { - return xerrors.WithStackTrace(errNilClient) +func (c *Client) ModifyPermissions( + ctx context.Context, path string, opts ...scheme.PermissionsOption, +) (finalErr error) { + onDone := trace.SchemeOnModifyPermissions(c.config.Trace(), &ctx, stack.FunctionID(0), path) + defer func() { + onDone(finalErr) + }() + var desc permissionsDesc + for _, o := range opts { + if o != nil { + o(&desc) + } } call := func(ctx context.Context) error { - return xerrors.WithStackTrace(c.modifyPermissions(ctx, path, opts...)) + return xerrors.WithStackTrace(c.modifyPermissions(ctx, path, desc)) } if !c.config.AutoRetry() { return call(ctx) @@ -228,13 +243,7 @@ func (c *Client) ModifyPermissions(ctx context.Context, path string, opts ...sch ) } -func (c *Client) modifyPermissions(ctx context.Context, path string, opts ...scheme.PermissionsOption) (err error) { - var desc permissionsDesc - for _, o := range opts { - if o != nil { - o(&desc) - } - } +func (c *Client) modifyPermissions(ctx context.Context, path string, desc permissionsDesc) (err error) { _, err = c.service.ModifyPermissions( ctx, &Ydb_Scheme.ModifyPermissionsRequest{ diff --git a/internal/scripting/client.go b/internal/scripting/client.go index ed7ee2c10..ade7d9143 100644 --- a/internal/scripting/client.go +++ b/internal/scripting/client.go @@ -13,6 +13,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/scanner" "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" @@ -65,7 +66,7 @@ func (c *Client) execute( params *table.QueryParameters, ) (r result.Result, err error) { var ( - onDone = trace.ScriptingOnExecute(c.config.Trace(), &ctx, query, params) + onDone = trace.ScriptingOnExecute(c.config.Trace(), &ctx, stack.FunctionID(0), query, params) a = allocator.New() request = &Ydb_Scripting.ExecuteYqlRequest{ Script: query, @@ -137,7 +138,7 @@ func (c *Client) explain( mode scripting.ExplainMode, ) (e table.ScriptingYQLExplanation, err error) { var ( - onDone = trace.ScriptingOnExplain(c.config.Trace(), &ctx, query) + onDone = trace.ScriptingOnExplain(c.config.Trace(), &ctx, stack.FunctionID(0), query) request = &Ydb_Scripting.ExplainYqlRequest{ Script: query, Mode: mode2mode(mode), @@ -204,7 +205,7 @@ func (c *Client) streamExecute( params *table.QueryParameters, ) (r result.StreamResult, err error) { var ( - onIntermediate = trace.ScriptingOnStreamExecute(c.config.Trace(), &ctx, query, params) + onIntermediate = trace.ScriptingOnStreamExecute(c.config.Trace(), &ctx, stack.FunctionID(0), query, params) a = allocator.New() request = &Ydb_Scripting.ExecuteYqlRequest{ Script: query, @@ -266,7 +267,7 @@ func (c *Client) Close(ctx context.Context) (err error) { if c == nil { return xerrors.WithStackTrace(errNilClient) } - onDone := trace.ScriptingOnClose(c.config.Trace(), &ctx) + onDone := trace.ScriptingOnClose(c.config.Trace(), &ctx, stack.FunctionID(0)) defer func() { onDone(err) }() diff --git a/internal/stack/function_id.go b/internal/stack/function_id.go new file mode 100644 index 000000000..ef1dce4b9 --- /dev/null +++ b/internal/stack/function_id.go @@ -0,0 +1,5 @@ +package stack + +func FunctionID(depth int) call { + return Call(depth + 1) +} diff --git a/internal/stack/record.go b/internal/stack/record.go index 0cc531fa4..8e4bc21b6 100644 --- a/internal/stack/record.go +++ b/internal/stack/record.go @@ -62,16 +62,18 @@ func PackagePath(b bool) recordOption { } } -type call func() (function uintptr, file string, line int) +type call struct { + function uintptr + file string + line int +} -func Call(depth int) call { - return func() (function uintptr, file string, line int) { - function, file, line, _ = runtime.Caller(depth + 2) - return function, file, line - } +func Call(depth int) (c call) { + c.function, c.file, c.line, _ = runtime.Caller(depth + 1) + return c } -func (call call) Record(opts ...recordOption) string { +func (c call) Record(opts ...recordOption) string { optionsHolder := recordOptions{ packagePath: true, packageName: true, @@ -84,13 +86,13 @@ func (call call) Record(opts ...recordOption) string { for _, opt := range opts { opt(&optionsHolder) } - function, file, line := call() - name := runtime.FuncForPC(function).Name() + name := runtime.FuncForPC(c.function).Name() var ( pkgPath string pkgName string structName string funcName string + file = c.file ) if i := strings.LastIndex(file, "/"); i > -1 { file = file[i+1:] @@ -156,7 +158,7 @@ func (call call) Record(opts ...recordOption) string { buffer.WriteString(file) if optionsHolder.line { buffer.WriteByte(':') - fmt.Fprintf(buffer, "%d", line) + fmt.Fprintf(buffer, "%d", c.line) } if closeBrace { buffer.WriteByte(')') @@ -165,8 +167,8 @@ func (call call) Record(opts ...recordOption) string { return buffer.String() } -func (call call) FunctionID() string { - return call.Record(Lambda(false), FileName(false)) +func (c call) FunctionID() string { + return c.Record(Lambda(false), FileName(false)) } func Record(depth int, opts ...recordOption) string { diff --git a/internal/table/client.go b/internal/table/client.go index cd9645a32..28edfc235 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc" metaHeaders "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -45,7 +46,7 @@ func newClient( builder sessionBuilder, config *config.Config, ) (c *Client, finalErr error) { - onDone := trace.TableOnInit(config.Trace(), &ctx) + onDone := trace.TableOnInit(config.Trace(), &ctx, stack.FunctionID(0)) defer func() { onDone(config.SizeLimit(), finalErr) }() @@ -255,7 +256,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab retry.WithIdempotent(true), retry.WithTrace(&trace.Retry{ OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { - onIntermediate := trace.TableOnCreateSession(c.config.Trace(), info.Context) + onIntermediate := trace.TableOnCreateSession(c.config.Trace(), info.Context, stack.FunctionID(0)) return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { onDone := onIntermediate(info.Error) return func(info trace.RetryLoopDoneInfo) { @@ -371,7 +372,7 @@ func (c *Client) internalPoolGet(ctx context.Context, opts ...getOption) (s *ses } } - onDone := trace.TableOnPoolGet(o.t, &ctx) + onDone := trace.TableOnPoolGet(o.t, &ctx, stack.FunctionID(0)) defer func() { onDone(s, i, err) }() @@ -464,7 +465,7 @@ func (c *Client) internalPoolWaitFromCh(ctx context.Context, t *trace.Table) (s el = c.waitQ.PushBack(ch) }) - waitDone := trace.TableOnPoolWait(t, &ctx) + waitDone := trace.TableOnPoolWait(t, &ctx, stack.FunctionID(0)) defer func() { waitDone(s, err) @@ -521,7 +522,7 @@ func (c *Client) internalPoolWaitFromCh(ctx context.Context, t *trace.Table) (s // Get() or Take() calls. In other way it will produce unexpected behavior or // panic. func (c *Client) Put(ctx context.Context, s *session) (err error) { - onDone := trace.TableOnPoolPut(c.config.Trace(), &ctx, s) + onDone := trace.TableOnPoolPut(c.config.Trace(), &ctx, stack.FunctionID(0), s) defer func() { onDone(err) }() @@ -578,7 +579,7 @@ func (c *Client) Close(ctx context.Context) (err error) { default: close(c.done) - onDone := trace.TableOnClose(c.config.Trace(), &ctx) + onDone := trace.TableOnClose(c.config.Trace(), &ctx, stack.FunctionID(0)) defer func() { onDone(err) }() diff --git a/internal/table/retry.go b/internal/table/retry.go index 778a863ed..90ccc951c 100644 --- a/internal/table/retry.go +++ b/internal/table/retry.go @@ -3,6 +3,7 @@ package table import ( "context" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -32,7 +33,7 @@ func doTx( if opts.Trace == nil { opts.Trace = &trace.Table{} } - attempts, onIntermediate := 0, trace.TableOnDoTx(opts.Trace, &ctx, + attempts, onIntermediate := 0, trace.TableOnDoTx(opts.Trace, &ctx, stack.FunctionID(1), opts.Label, opts.Label, opts.Idempotent, xcontext.IsNestedCall(ctx), ) defer func() { @@ -101,7 +102,7 @@ func do( if opts.Trace == nil { opts.Trace = &trace.Table{} } - attempts, onIntermediate := 0, trace.TableOnDo(opts.Trace, &ctx, + attempts, onIntermediate := 0, trace.TableOnDo(opts.Trace, &ctx, stack.FunctionID(1), opts.Label, opts.Label, opts.Idempotent, xcontext.IsNestedCall(ctx), ) defer func() { diff --git a/internal/table/session.go b/internal/table/session.go index 894f9eb67..9e960d5f8 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -21,6 +21,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/feature" "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/scanner" "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" @@ -113,7 +114,7 @@ func (s *session) isClosing() bool { func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) ( s *session, err error, ) { - onDone := trace.TableOnSessionNew(config.Trace(), &ctx) + onDone := trace.TableOnSessionNew(config.Trace(), &ctx, stack.FunctionID(0)) defer func() { onDone(s, err) }() @@ -178,7 +179,7 @@ func (s *session) Close(ctx context.Context) (err error) { s.SetStatus(table.SessionClosed) }() - onDone := trace.TableOnSessionDelete(s.config.Trace(), &ctx, s) + onDone := trace.TableOnSessionDelete(s.config.Trace(), &ctx, stack.FunctionID(0), s) if time.Since(s.LastUsage()) < s.config.IdleThreshold() { _, err = s.tableService.DeleteSession(ctx, @@ -225,9 +226,7 @@ func (s *session) KeepAlive(ctx context.Context) (err error) { var ( result Ydb_Table.KeepAliveResult onDone = trace.TableOnSessionKeepAlive( - s.config.Trace(), - &ctx, - s, + s.config.Trace(), &ctx, stack.FunctionID(0), s, ) ) defer func() { @@ -246,12 +245,14 @@ func (s *session) KeepAlive(ctx context.Context) (err error) { }, ) if err != nil { - return + return xerrors.WithStackTrace(err) } + err = resp.GetOperation().GetResult().UnmarshalTo(&result) if err != nil { - return + return xerrors.WithStackTrace(err) } + switch result.SessionStatus { case Ydb_Table.KeepAliveResult_SESSION_STATUS_READY: s.SetStatus(table.SessionReady) @@ -598,10 +599,7 @@ func (s *session) Explain( result Ydb_Table.ExplainQueryResult response *Ydb_Table.ExplainDataQueryResponse onDone = trace.TableOnSessionQueryExplain( - s.config.Trace(), - &ctx, - s, - query, + s.config.Trace(), &ctx, stack.FunctionID(0), s, query, ) ) @@ -626,12 +624,14 @@ func (s *session) Explain( }, ) if err != nil { - return + return exp, xerrors.WithStackTrace(err) } + err = response.GetOperation().GetResult().UnmarshalTo(&result) if err != nil { - return + return exp, xerrors.WithStackTrace(err) } + return table.DataQueryExplanation{ Explanation: table.Explanation{ Plan: result.GetQueryPlan(), @@ -647,10 +647,7 @@ func (s *session) Prepare(ctx context.Context, queryText string) (_ table.Statem response *Ydb_Table.PrepareDataQueryResponse result Ydb_Table.PrepareQueryResult onDone = trace.TableOnSessionQueryPrepare( - s.config.Trace(), - &ctx, - s, - queryText, + s.config.Trace(), &ctx, stack.FunctionID(0), s, queryText, ) ) defer func() { @@ -674,12 +671,12 @@ func (s *session) Prepare(ctx context.Context, queryText string) (_ table.Statem }, ) if err != nil { - return + return nil, xerrors.WithStackTrace(err) } err = response.GetOperation().GetResult().UnmarshalTo(&result) if err != nil { - return + return nil, xerrors.WithStackTrace(err) } stmt = &statement{ @@ -731,7 +728,7 @@ func (s *session) Execute( } onDone := trace.TableOnSessionQueryExecute( - s.config.Trace(), &ctx, s, q, params, + s.config.Trace(), &ctx, stack.FunctionID(0), s, q, params, request.QueryCachePolicy.GetKeepInCache(), ) defer func() { @@ -842,12 +839,12 @@ func (s *session) DescribeTableOptions(ctx context.Context) ( } response, err = s.tableService.DescribeTableOptions(ctx, &request) if err != nil { - return + return desc, xerrors.WithStackTrace(err) } err = response.GetOperation().GetResult().UnmarshalTo(&result) if err != nil { - return + return desc, xerrors.WithStackTrace(err) } { @@ -952,6 +949,7 @@ func (s *session) DescribeTableOptions(ctx context.Context) ( } desc.CachingPolicyPresets = xs } + return desc, nil } @@ -966,7 +964,7 @@ func (s *session) StreamReadTable( opts ...options.ReadTableOption, ) (_ result.StreamResult, err error) { var ( - onIntermediate = trace.TableOnSessionQueryStreamRead(s.config.Trace(), &ctx, s) + onIntermediate = trace.TableOnSessionQueryStreamRead(s.config.Trace(), &ctx, stack.FunctionID(0), s) request = Ydb_Table.ReadTableRequest{ SessionId: s.id, Path: path, @@ -1086,11 +1084,7 @@ func (s *session) StreamExecuteScanQuery( a = allocator.New() q = queryFromText(query) onIntermediate = trace.TableOnSessionQueryStreamExecute( - s.config.Trace(), - &ctx, - s, - q, - params, + s.config.Trace(), &ctx, stack.FunctionID(0), s, q, params, ) request = Ydb_Table.ExecuteScanQueryRequest{ Query: q.toYDB(a), @@ -1160,8 +1154,14 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows types.Value var ( a = allocator.New() callOptions []grpc.CallOption + onDone = trace.TableOnSessionBulkUpsert( + s.config.Trace(), &ctx, stack.FunctionID(0), s, + ) ) - defer a.Free() + defer func() { + defer a.Free() + onDone(err) + }() for _, opt := range opts { callOptions = append(callOptions, opt.ApplyBulkUpsertOption()...) @@ -1197,9 +1197,7 @@ func (s *session) BeginTransaction( result Ydb_Table.BeginTransactionResult response *Ydb_Table.BeginTransactionResponse onDone = trace.TableOnSessionTransactionBegin( - s.config.Trace(), - &ctx, - s, + s.config.Trace(), &ctx, stack.FunctionID(0), s, ) ) defer func() { @@ -1223,7 +1221,7 @@ func (s *session) BeginTransaction( } err = response.GetOperation().GetResult().UnmarshalTo(&result) if err != nil { - return + return nil, xerrors.WithStackTrace(err) } tx := &transaction{ id: result.GetTxMeta().GetId(), diff --git a/internal/table/statement.go b/internal/table/statement.go index 1d21736fd..b0e2c68cb 100644 --- a/internal/table/statement.go +++ b/internal/table/statement.go @@ -9,6 +9,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" @@ -59,7 +60,7 @@ func (s *statement) Execute( } onDone := trace.TableOnSessionQueryExecute( - s.session.config.Trace(), &ctx, s.session, s.query, params, + s.session.config.Trace(), &ctx, stack.FunctionID(0), s.session, s.query, params, request.QueryCachePolicy.GetKeepInCache(), ) defer func() { diff --git a/internal/table/transaction.go b/internal/table/transaction.go index d16912bbc..c3bc19337 100644 --- a/internal/table/transaction.go +++ b/internal/table/transaction.go @@ -8,6 +8,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/scanner" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xatomic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -59,7 +60,9 @@ func (tx *transaction) Execute( query string, params *table.QueryParameters, opts ...options.ExecuteDataQueryOption, ) (r result.Result, err error) { - onDone := trace.TableOnSessionTransactionExecute(tx.s.config.Trace(), &ctx, tx.s, tx, queryFromText(query), params) + onDone := trace.TableOnSessionTransactionExecute( + tx.s.config.Trace(), &ctx, stack.FunctionID(0), tx.s, tx, queryFromText(query), params, + ) defer func() { onDone(r, err) }() @@ -96,12 +99,7 @@ func (tx *transaction) ExecuteStatement( defer a.Free() onDone := trace.TableOnSessionTransactionExecuteStatement( - tx.s.config.Trace(), - &ctx, - tx.s, - tx, - stmt.(*statement).query, - params, + tx.s.config.Trace(), &ctx, stack.FunctionID(0), tx.s, tx, stmt.(*statement).query, params, ) defer func() { onDone(r, err) @@ -132,10 +130,7 @@ func (tx *transaction) CommitTx( opts ...options.CommitTransactionOption, ) (r result.Result, err error) { onDone := trace.TableOnSessionTransactionCommit( - tx.s.config.Trace(), - &ctx, - tx.s, - tx, + tx.s.config.Trace(), &ctx, stack.FunctionID(0), tx.s, tx, ) defer func() { onDone(err) @@ -191,10 +186,7 @@ func (tx *transaction) CommitTx( // Rollback performs a rollback of the specified active transaction. func (tx *transaction) Rollback(ctx context.Context) (err error) { onDone := trace.TableOnSessionTransactionRollback( - tx.s.config.Trace(), - &ctx, - tx.s, - tx, + tx.s.config.Trace(), &ctx, stack.FunctionID(0), tx.s, tx, ) defer func() { onDone(err) diff --git a/internal/xresolver/xresolver.go b/internal/xresolver/xresolver.go index f2176a729..647a27613 100644 --- a/internal/xresolver/xresolver.go +++ b/internal/xresolver/xresolver.go @@ -5,6 +5,7 @@ import ( "google.golang.org/grpc/resolver" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -30,7 +31,7 @@ func (c *clientConn) Endpoint() string { } func (c *clientConn) UpdateState(state resolver.State) (err error) { - onDone := trace.DriverOnResolve(c.trace, c.Endpoint(), func() (addrs []string) { + onDone := trace.DriverOnResolve(c.trace, stack.FunctionID(0), c.Endpoint(), func() (addrs []string) { for i := range state.Addresses { addrs = append(addrs, state.Addresses[i].Addr) } diff --git a/internal/xsql/conn.go b/internal/xsql/conn.go index fe78d2f73..2811bbf67 100644 --- a/internal/xsql/conn.go +++ b/internal/xsql/conn.go @@ -11,6 +11,7 @@ import ( "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme/helpers" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xatomic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -66,6 +67,8 @@ func withTrace(t *trace.DatabaseSQL) connOption { type beginTxFunc func(ctx context.Context, txOptions driver.TxOptions) (currentTx, error) type conn struct { + openConnCtx context.Context + connector *Connector trace *trace.DatabaseSQL session table.ClosableSession // Immutable and r/o usage. @@ -101,6 +104,7 @@ type currentTx interface { driver.Tx driver.ExecerContext driver.QueryerContext + driver.ConnPrepareContext table.TransactionIdentifier } @@ -122,10 +126,11 @@ var ( _ driver.Result = resultNoRows{} ) -func newConn(c *Connector, s table.ClosableSession, opts ...connOption) *conn { +func newConn(ctx context.Context, c *Connector, s table.ClosableSession, opts ...connOption) *conn { cc := &conn{ - connector: c, - session: s, + openConnCtx: ctx, + connector: c, + session: s, } cc.beginTxFuncs = map[QueryMode]beginTxFunc{ DataQueryMode: cc.beginTx, @@ -144,6 +149,9 @@ func (c *conn) isReady() bool { } func (c *conn) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, finalErr error) { + if c.currentTx != nil { + return c.currentTx.PrepareContext(ctx, query) + } onDone := trace.DatabaseSQLOnConnPrepare(c.trace, &ctx, query) defer func() { onDone(finalErr) @@ -152,9 +160,11 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, return nil, badconn.Map(xerrors.WithStackTrace(errNotReadyConn)) } return &stmt{ - conn: c, - query: query, - trace: c.trace, + conn: c, + processor: c, + prepareCtx: ctx, + query: query, + trace: c.trace, }, nil } @@ -364,7 +374,7 @@ func (c *conn) Close() (finalErr error) { if c.currentTx != nil { _ = c.currentTx.Rollback() } - err := c.session.Close(context.Background()) + err := c.session.Close(xcontext.WithoutDeadline(c.openConnCtx)) if err != nil { return badconn.Map(xerrors.WithStackTrace(err)) } @@ -441,8 +451,12 @@ func (c *conn) Version(_ context.Context) (_ string, _ error) { return version, nil } -func (c *conn) IsTableExists(ctx context.Context, tableName string) (tableExists bool, _ error) { +func (c *conn) IsTableExists(ctx context.Context, tableName string) (tableExists bool, finalErr error) { tableName = c.normalizePath(tableName) + onDone := trace.DatabaseSQLOnConnIsTableExists(c.trace, &ctx, stack.FunctionID(0), tableName) + defer func() { + onDone(tableExists, finalErr) + }() tableExists, err := helpers.IsEntryExists(ctx, c.connector.parent.Scheme(), tableName, scheme.EntryTable, scheme.EntryColumnTable, diff --git a/internal/xsql/connector.go b/internal/xsql/connector.go index b81bcf8bd..10437e011 100644 --- a/internal/xsql/connector.go +++ b/internal/xsql/connector.go @@ -309,7 +309,7 @@ func (c *Connector) Connect(ctx context.Context) (_ driver.Conn, err error) { return nil, xerrors.WithStackTrace(err) } - return newConn(c, session, withDefaultTxControl(c.defaultTxControl), + return newConn(ctx, c, session, withDefaultTxControl(c.defaultTxControl), withDefaultQueryMode(c.defaultQueryMode), withDataOpts(c.defaultDataQueryOpts...), withScanOpts(c.defaultScanQueryOpts...), diff --git a/internal/xsql/stmt.go b/internal/xsql/stmt.go index 516b4c197..e3c5d10ee 100644 --- a/internal/xsql/stmt.go +++ b/internal/xsql/stmt.go @@ -11,8 +11,13 @@ import ( ) type stmt struct { - conn *conn - query string + conn *conn + processor interface { + driver.ExecerContext + driver.QueryerContext + } + query string + prepareCtx context.Context trace *trace.DatabaseSQL } @@ -24,7 +29,7 @@ var ( ) func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (_ driver.Rows, finalErr error) { - onDone := trace.DatabaseSQLOnStmtQuery(s.trace, &ctx, s.query) + onDone := trace.DatabaseSQLOnStmtQuery(s.trace, &ctx, &s.prepareCtx, s.query) defer func() { onDone(finalErr) }() @@ -33,14 +38,14 @@ func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (_ dr } switch m := queryModeFromContext(ctx, s.conn.defaultQueryMode); m { case DataQueryMode: - return s.conn.QueryContext(s.conn.withKeepInCache(ctx), s.query, args) + return s.processor.QueryContext(s.conn.withKeepInCache(ctx), s.query, args) default: return nil, fmt.Errorf("unsupported query mode '%s' for execute query on prepared statement", m) } } func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (_ driver.Result, finalErr error) { - onDone := trace.DatabaseSQLOnStmtExec(s.trace, &ctx, s.query) + onDone := trace.DatabaseSQLOnStmtExec(s.trace, &ctx, &s.prepareCtx, s.query) defer func() { onDone(finalErr) }() @@ -49,7 +54,7 @@ func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (_ dri } switch m := queryModeFromContext(ctx, s.conn.defaultQueryMode); m { case DataQueryMode: - return s.conn.ExecContext(s.conn.withKeepInCache(ctx), s.query, args) + return s.processor.ExecContext(s.conn.withKeepInCache(ctx), s.query, args) default: return nil, fmt.Errorf("unsupported query mode '%s' for execute query on prepared statement", m) } @@ -60,7 +65,7 @@ func (s *stmt) NumInput() int { } func (s *stmt) Close() (finalErr error) { - onDone := trace.DatabaseSQLOnStmtClose(s.trace) + onDone := trace.DatabaseSQLOnStmtClose(s.trace, &s.prepareCtx) defer func() { onDone(finalErr) }() diff --git a/internal/xsql/tx.go b/internal/xsql/tx.go index 48a065b02..bb4430201 100644 --- a/internal/xsql/tx.go +++ b/internal/xsql/tx.go @@ -5,7 +5,6 @@ import ( "database/sql/driver" "fmt" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/badconn" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/isolation" @@ -14,9 +13,9 @@ import ( ) type tx struct { - conn *conn - ctx context.Context - tx table.Transaction + conn *conn + beginCtx context.Context + tx table.Transaction } var ( @@ -45,9 +44,9 @@ func (c *conn) beginTx(ctx context.Context, txOptions driver.TxOptions) (current return nil, badconn.Map(xerrors.WithStackTrace(err)) } c.currentTx = &tx{ - conn: c, - ctx: ctx, - tx: transaction, + conn: c, + beginCtx: ctx, + tx: transaction, } return c.currentTx, nil } @@ -71,7 +70,7 @@ func (tx *tx) checkTxState() error { } func (tx *tx) Commit() (finalErr error) { - onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.ctx, tx) + onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.beginCtx, tx) defer func() { onDone(finalErr) }() @@ -81,7 +80,7 @@ func (tx *tx) Commit() (finalErr error) { defer func() { tx.conn.currentTx = nil }() - _, err := tx.tx.CommitTx(tx.ctx) + _, err := tx.tx.CommitTx(tx.beginCtx) if err != nil { return badconn.Map(xerrors.WithStackTrace(err)) } @@ -89,7 +88,7 @@ func (tx *tx) Commit() (finalErr error) { } func (tx *tx) Rollback() (finalErr error) { - onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.ctx, tx) + onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.beginCtx, tx) defer func() { onDone(finalErr) }() @@ -99,7 +98,7 @@ func (tx *tx) Rollback() (finalErr error) { defer func() { tx.conn.currentTx = nil }() - err := tx.tx.Rollback(tx.ctx) + err := tx.tx.Rollback(tx.beginCtx) if err != nil { return badconn.Map(xerrors.WithStackTrace(err)) } @@ -109,7 +108,7 @@ func (tx *tx) Rollback() (finalErr error) { func (tx *tx) QueryContext(ctx context.Context, query string, args []driver.NamedValue) ( _ driver.Rows, finalErr error, ) { - onDone := trace.DatabaseSQLOnTxQuery(tx.conn.trace, &ctx, tx.ctx, tx, query, xcontext.IsIdempotent(ctx)) + onDone := trace.DatabaseSQLOnTxQuery(tx.conn.trace, &ctx, &tx.beginCtx, tx, query) defer func() { onDone(finalErr) }() @@ -147,7 +146,7 @@ func (tx *tx) QueryContext(ctx context.Context, query string, args []driver.Name func (tx *tx) ExecContext(ctx context.Context, query string, args []driver.NamedValue) ( _ driver.Result, finalErr error, ) { - onDone := trace.DatabaseSQLOnTxExec(tx.conn.trace, &ctx, tx.ctx, tx, query, xcontext.IsIdempotent(ctx)) + onDone := trace.DatabaseSQLOnTxExec(tx.conn.trace, &ctx, &tx.beginCtx, tx, query) defer func() { onDone(finalErr) }() @@ -175,3 +174,20 @@ func (tx *tx) ExecContext(ctx context.Context, query string, args []driver.Named } return resultNoRows{}, nil } + +func (tx *tx) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, finalErr error) { + onDone := trace.DatabaseSQLOnTxPrepare(tx.conn.trace, &ctx, &tx.beginCtx, tx, query) + defer func() { + onDone(finalErr) + }() + if !tx.conn.isReady() { + return nil, badconn.Map(xerrors.WithStackTrace(errNotReadyConn)) + } + return &stmt{ + conn: tx.conn, + processor: tx, + prepareCtx: ctx, + query: query, + trace: tx.conn.trace, + }, nil +} diff --git a/internal/xsql/tx_fake.go b/internal/xsql/tx_fake.go index 3be4896dc..f84be9d76 100644 --- a/internal/xsql/tx_fake.go +++ b/internal/xsql/tx_fake.go @@ -4,7 +4,6 @@ import ( "context" "database/sql/driver" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/badconn" "github.com/ydb-platform/ydb-go-sdk/v3/table" @@ -12,8 +11,26 @@ import ( ) type txFake struct { - conn *conn - ctx context.Context + beginCtx context.Context + conn *conn + ctx context.Context +} + +func (tx *txFake) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, finalErr error) { + onDone := trace.DatabaseSQLOnTxPrepare(tx.conn.trace, &ctx, &tx.beginCtx, tx, query) + defer func() { + onDone(finalErr) + }() + if !tx.conn.isReady() { + return nil, badconn.Map(xerrors.WithStackTrace(errNotReadyConn)) + } + return &stmt{ + conn: tx.conn, + processor: tx, + prepareCtx: ctx, + query: query, + trace: tx.conn.trace, + }, nil } var ( @@ -65,7 +82,7 @@ func (tx *txFake) Rollback() (err error) { func (tx *txFake) QueryContext(ctx context.Context, query string, args []driver.NamedValue) ( rows driver.Rows, err error, ) { - onDone := trace.DatabaseSQLOnTxQuery(tx.conn.trace, &ctx, tx.ctx, tx, query, xcontext.IsIdempotent(ctx)) + onDone := trace.DatabaseSQLOnTxQuery(tx.conn.trace, &ctx, &tx.ctx, tx, query) defer func() { onDone(err) }() @@ -79,7 +96,7 @@ func (tx *txFake) QueryContext(ctx context.Context, query string, args []driver. func (tx *txFake) ExecContext(ctx context.Context, query string, args []driver.NamedValue) ( result driver.Result, err error, ) { - onDone := trace.DatabaseSQLOnTxExec(tx.conn.trace, &ctx, tx.ctx, tx, query, xcontext.IsIdempotent(ctx)) + onDone := trace.DatabaseSQLOnTxExec(tx.conn.trace, &ctx, &tx.ctx, tx, query) defer func() { onDone(err) }() diff --git a/log/sql.go b/log/sql.go index a89994b37..4004f6545 100644 --- a/log/sql.go +++ b/log/sql.go @@ -206,7 +206,7 @@ func internalDatabaseSQL(l *wrapper, d trace.Detailer) (t trace.DatabaseSQL) { if d.Details()&trace.DatabaseSQLTxEvents == 0 { return nil } - ctx := with(*info.Context, TRACE, "ydb", "database", "sql", "tx", "commit") + ctx := with(*info.TxContext, TRACE, "ydb", "database", "sql", "tx", "commit") l.Log(ctx, "start") start := time.Now() return func(info trace.DatabaseSQLTxCommitDoneInfo) { @@ -227,7 +227,7 @@ func internalDatabaseSQL(l *wrapper, d trace.Detailer) (t trace.DatabaseSQL) { if d.Details()&trace.DatabaseSQLTxEvents == 0 { return nil } - ctx := with(*info.Context, TRACE, "ydb", "database", "sql", "tx", "rollback") + ctx := with(*info.TxContext, TRACE, "ydb", "database", "sql", "tx", "rollback") l.Log(ctx, "start") start := time.Now() return func(info trace.DatabaseSQLTxRollbackDoneInfo) { diff --git a/retry/retry.go b/retry/retry.go index ec3a32150..4db2b3a8f 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/wait" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -18,6 +19,7 @@ type retryOperation func(context.Context) (err error) type retryOptions struct { label string + call call trace *trace.Retry idempotent bool stackTrace bool @@ -52,6 +54,32 @@ func WithLabel(label string) labelOption { return labelOption(label) } +var _ Option = (*callOption)(nil) + +type callOption struct { + call +} + +func (call callOption) ApplyDoOption(opts *doOptions) { + opts.retryOptions = append(opts.retryOptions, withCaller(call)) +} + +func (call callOption) ApplyDoTxOption(opts *doTxOptions) { + opts.retryOptions = append(opts.retryOptions, withCaller(call)) +} + +func (call callOption) ApplyRetryOption(opts *retryOptions) { + opts.call = call +} + +type call interface { + FunctionID() string +} + +func withCaller(call call) callOption { + return callOption{call} +} + var _ Option = stackTraceOption{} type stackTraceOption struct{} @@ -204,6 +232,7 @@ func WithPanicCallback(panicCallback func(e interface{})) panicCallbackOption { // If you need to retry your op func on some logic errors - you must return RetryableError() from retryOperation func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr error) { options := &retryOptions{ + call: stack.FunctionID(0), trace: &trace.Retry{}, fastBackoff: backoff.Fast, slowBackoff: backoff.Slow, @@ -229,7 +258,7 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr err code = int64(0) onIntermediate = trace.RetryOnRetry(options.trace, &ctx, - options.label, options.label, options.idempotent, xcontext.IsNestedCall(ctx), + options.label, options.call, options.label, options.idempotent, xcontext.IsNestedCall(ctx), ) ) defer func() { diff --git a/retry/sql.go b/retry/sql.go index b18472846..e82e5ffdc 100644 --- a/retry/sql.go +++ b/retry/sql.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -39,7 +40,11 @@ func WithDoRetryOptions(opts ...Option) doRetryOptionsOption { // Do is a retryer of database/sql Conn with fallbacks on errors func Do(ctx context.Context, db *sql.DB, op func(ctx context.Context, cc *sql.Conn) error, opts ...doOption) error { var ( - options = doOptions{} + options = doOptions{ + retryOptions: []Option{ + withCaller(stack.FunctionID(0)), + }, + } attempts = 0 ) if tracer, has := db.Driver().(interface { @@ -121,7 +126,9 @@ func WithTxOptions(txOptions *sql.TxOptions) txOptionsOption { func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) error, opts ...doTxOption) error { var ( options = doTxOptions{ - retryOptions: []Option{}, + retryOptions: []Option{ + withCaller(stack.FunctionID(0)), + }, txOptions: &sql.TxOptions{ Isolation: sql.LevelDefault, ReadOnly: false, diff --git a/trace/call.go b/trace/call.go new file mode 100644 index 000000000..eabd93084 --- /dev/null +++ b/trace/call.go @@ -0,0 +1,5 @@ +package trace + +type call interface { + FunctionID() string +} diff --git a/trace/discovery.go b/trace/discovery.go index cf9bcdfa5..5b0496eef 100644 --- a/trace/discovery.go +++ b/trace/discovery.go @@ -19,6 +19,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Address string Database string } @@ -33,6 +34,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } DiscoveryWhoAmIDoneInfo struct { User string diff --git a/trace/discovery_gtrace.go b/trace/discovery_gtrace.go index 2d6a6b9e5..362d5c0e5 100644 --- a/trace/discovery_gtrace.go +++ b/trace/discovery_gtrace.go @@ -132,9 +132,10 @@ func (t *Discovery) onWhoAmI(d DiscoveryWhoAmIStartInfo) func(DiscoveryWhoAmIDon } return res } -func DiscoveryOnDiscover(t *Discovery, c *context.Context, address string, database string) func(location string, endpoints []EndpointInfo, _ error) { +func DiscoveryOnDiscover(t *Discovery, c *context.Context, call call, address string, database string) func(location string, endpoints []EndpointInfo, _ error) { var p DiscoveryDiscoverStartInfo p.Context = c + p.Call = call p.Address = address p.Database = database res := t.onDiscover(p) @@ -146,9 +147,10 @@ func DiscoveryOnDiscover(t *Discovery, c *context.Context, address string, datab res(p) } } -func DiscoveryOnWhoAmI(t *Discovery, c *context.Context) func(user string, groups []string, _ error) { +func DiscoveryOnWhoAmI(t *Discovery, c *context.Context, call call) func(user string, groups []string, _ error) { var p DiscoveryWhoAmIStartInfo p.Context = c + p.Call = call res := t.onWhoAmI(p) return func(user string, groups []string, e error) { var p DiscoveryWhoAmIDoneInfo diff --git a/trace/driver.go b/trace/driver.go index 3acf53c3f..bb21321f9 100644 --- a/trace/driver.go +++ b/trace/driver.go @@ -19,6 +19,10 @@ type ( OnInit func(DriverInitStartInfo) func(DriverInitDoneInfo) OnClose func(DriverCloseStartInfo) func(DriverCloseDoneInfo) + // Pool of connections + OnPoolNew func(DriverConnPoolNewStartInfo) func(DriverConnPoolNewDoneInfo) + OnPoolRelease func(DriverConnPoolReleaseStartInfo) func(DriverConnPoolReleaseDoneInfo) + // Deprecated: driver not notificate about this event OnNetRead func(DriverNetReadStartInfo) func(DriverNetReadDoneInfo) // Deprecated: driver not notificate about this event @@ -130,6 +134,12 @@ type EndpointInfo interface { type ( DriverConnStateChangeStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call Endpoint EndpointInfo State ConnState } @@ -137,6 +147,7 @@ type ( State ConnState } DriverResolveStartInfo struct { + Call call Target string Resolved []string } @@ -149,6 +160,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call NeedLocalDC bool } DriverBalancerUpdateDoneInfo struct { @@ -163,12 +175,14 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Address string } DriverBalancerClusterDiscoveryAttemptDoneInfo struct { Error error } DriverNetReadStartInfo struct { + Call call Address string Buffer int } @@ -177,6 +191,7 @@ type ( Error error } DriverNetWriteStartInfo struct { + Call call Address string Bytes int } @@ -190,12 +205,14 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Address string } DriverNetDialDoneInfo struct { Error error } DriverNetCloseStartInfo struct { + Call call Address string } DriverNetCloseDoneInfo struct { @@ -207,6 +224,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Endpoint EndpointInfo } DriverConnTakeDoneInfo struct { @@ -218,6 +236,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Endpoint EndpointInfo } DriverConnDialDoneInfo struct { @@ -229,6 +248,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Endpoint EndpointInfo } DriverConnParkDoneInfo struct { @@ -240,6 +260,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Endpoint EndpointInfo } DriverConnCloseDoneInfo struct { @@ -251,6 +272,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Endpoint EndpointInfo State ConnState Cause error @@ -264,6 +286,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Endpoint EndpointInfo State ConnState } @@ -276,6 +299,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Endpoint EndpointInfo Method Method } @@ -292,6 +316,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Endpoint EndpointInfo Method Method } @@ -309,6 +334,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } DriverBalancerInitDoneInfo struct { Error error @@ -319,6 +345,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Address string } DriverBalancerDialEntrypointDoneInfo struct { @@ -330,6 +357,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } DriverBalancerCloseDoneInfo struct { Error error @@ -340,6 +368,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } DriverBalancerChooseEndpointDoneInfo struct { Endpoint EndpointInfo @@ -351,6 +380,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Name string Event string } @@ -363,6 +393,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } DriverGetCredentialsDoneInfo struct { Token string @@ -374,6 +405,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Endpoint string Database string Secure bool @@ -381,12 +413,33 @@ type ( DriverInitDoneInfo struct { Error error } + DriverConnPoolNewStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + } + DriverConnPoolNewDoneInfo struct{} + DriverConnPoolReleaseStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + } + DriverConnPoolReleaseDoneInfo struct { + Error error + } DriverCloseStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } DriverCloseDoneInfo struct { Error error diff --git a/trace/driver_gtrace.go b/trace/driver_gtrace.go index 6701eea4e..926541943 100644 --- a/trace/driver_gtrace.go +++ b/trace/driver_gtrace.go @@ -100,6 +100,76 @@ func (t *Driver) Compose(x *Driver, opts ...DriverComposeOption) *Driver { } } } + { + h1 := t.OnPoolNew + h2 := x.OnPoolNew + ret.OnPoolNew = func(d DriverConnPoolNewStartInfo) func(DriverConnPoolNewDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(DriverConnPoolNewDoneInfo) + if h1 != nil { + r = h1(d) + } + if h2 != nil { + r1 = h2(d) + } + return func(d DriverConnPoolNewDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(d) + } + if r1 != nil { + r1(d) + } + } + } + } + { + h1 := t.OnPoolRelease + h2 := x.OnPoolRelease + ret.OnPoolRelease = func(d DriverConnPoolReleaseStartInfo) func(DriverConnPoolReleaseDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(DriverConnPoolReleaseDoneInfo) + if h1 != nil { + r = h1(d) + } + if h2 != nil { + r1 = h2(d) + } + return func(d DriverConnPoolReleaseDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(d) + } + if r1 != nil { + r1(d) + } + } + } + } { h1 := t.OnNetRead h2 := x.OnNetRead @@ -918,6 +988,36 @@ func (t *Driver) onClose(d DriverCloseStartInfo) func(DriverCloseDoneInfo) { } return res } +func (t *Driver) onPoolNew(d DriverConnPoolNewStartInfo) func(DriverConnPoolNewDoneInfo) { + fn := t.OnPoolNew + if fn == nil { + return func(DriverConnPoolNewDoneInfo) { + return + } + } + res := fn(d) + if res == nil { + return func(DriverConnPoolNewDoneInfo) { + return + } + } + return res +} +func (t *Driver) onPoolRelease(d DriverConnPoolReleaseStartInfo) func(DriverConnPoolReleaseDoneInfo) { + fn := t.OnPoolRelease + if fn == nil { + return func(DriverConnPoolReleaseDoneInfo) { + return + } + } + res := fn(d) + if res == nil { + return func(DriverConnPoolReleaseDoneInfo) { + return + } + } + return res +} func (t *Driver) onNetRead(d DriverNetReadStartInfo) func(DriverNetReadDoneInfo) { fn := t.OnNetRead if fn == nil { @@ -1260,9 +1360,10 @@ func (t *Driver) onGetCredentials(d DriverGetCredentialsStartInfo) func(DriverGe } return res } -func DriverOnInit(t *Driver, c *context.Context, endpoint string, database string, secure bool) func(error) { +func DriverOnInit(t *Driver, c *context.Context, call call, endpoint string, database string, secure bool) func(error) { var p DriverInitStartInfo p.Context = c + p.Call = call p.Endpoint = endpoint p.Database = database p.Secure = secure @@ -1273,9 +1374,10 @@ func DriverOnInit(t *Driver, c *context.Context, endpoint string, database strin res(p) } } -func DriverOnClose(t *Driver, c *context.Context) func(error) { +func DriverOnClose(t *Driver, c *context.Context, call call) func(error) { var p DriverCloseStartInfo p.Context = c + p.Call = call res := t.onClose(p) return func(e error) { var p DriverCloseDoneInfo @@ -1283,8 +1385,30 @@ func DriverOnClose(t *Driver, c *context.Context) func(error) { res(p) } } -func DriverOnNetRead(t *Driver, address string, buffer int) func(received int, _ error) { +func DriverOnPoolNew(t *Driver, c *context.Context, call call) func() { + var p DriverConnPoolNewStartInfo + p.Context = c + p.Call = call + res := t.onPoolNew(p) + return func() { + var p DriverConnPoolNewDoneInfo + res(p) + } +} +func DriverOnPoolRelease(t *Driver, c *context.Context, call call) func(error) { + var p DriverConnPoolReleaseStartInfo + p.Context = c + p.Call = call + res := t.onPoolRelease(p) + return func(e error) { + var p DriverConnPoolReleaseDoneInfo + p.Error = e + res(p) + } +} +func DriverOnNetRead(t *Driver, call call, address string, buffer int) func(received int, _ error) { var p DriverNetReadStartInfo + p.Call = call p.Address = address p.Buffer = buffer res := t.onNetRead(p) @@ -1295,8 +1419,9 @@ func DriverOnNetRead(t *Driver, address string, buffer int) func(received int, _ res(p) } } -func DriverOnNetWrite(t *Driver, address string, bytes int) func(sent int, _ error) { +func DriverOnNetWrite(t *Driver, call call, address string, bytes int) func(sent int, _ error) { var p DriverNetWriteStartInfo + p.Call = call p.Address = address p.Bytes = bytes res := t.onNetWrite(p) @@ -1307,9 +1432,10 @@ func DriverOnNetWrite(t *Driver, address string, bytes int) func(sent int, _ err res(p) } } -func DriverOnNetDial(t *Driver, c *context.Context, address string) func(error) { +func DriverOnNetDial(t *Driver, c *context.Context, call call, address string) func(error) { var p DriverNetDialStartInfo p.Context = c + p.Call = call p.Address = address res := t.onNetDial(p) return func(e error) { @@ -1318,8 +1444,9 @@ func DriverOnNetDial(t *Driver, c *context.Context, address string) func(error) res(p) } } -func DriverOnNetClose(t *Driver, address string) func(error) { +func DriverOnNetClose(t *Driver, call call, address string) func(error) { var p DriverNetCloseStartInfo + p.Call = call p.Address = address res := t.onNetClose(p) return func(e error) { @@ -1328,8 +1455,9 @@ func DriverOnNetClose(t *Driver, address string) func(error) { res(p) } } -func DriverOnResolve(t *Driver, target string, resolved []string) func(error) { +func DriverOnResolve(t *Driver, call call, target string, resolved []string) func(error) { var p DriverResolveStartInfo + p.Call = call p.Target = target p.Resolved = resolved res := t.onResolve(p) @@ -1339,8 +1467,10 @@ func DriverOnResolve(t *Driver, target string, resolved []string) func(error) { res(p) } } -func DriverOnConnStateChange(t *Driver, endpoint EndpointInfo, state ConnState) func(state ConnState) { +func DriverOnConnStateChange(t *Driver, c *context.Context, call call, endpoint EndpointInfo, state ConnState) func(state ConnState) { var p DriverConnStateChangeStartInfo + p.Context = c + p.Call = call p.Endpoint = endpoint p.State = state res := t.onConnStateChange(p) @@ -1350,9 +1480,10 @@ func DriverOnConnStateChange(t *Driver, endpoint EndpointInfo, state ConnState) res(p) } } -func DriverOnConnInvoke(t *Driver, c *context.Context, endpoint EndpointInfo, m Method) func(_ error, issues []Issue, opID string, state ConnState, metadata map[string][]string) { +func DriverOnConnInvoke(t *Driver, c *context.Context, call call, endpoint EndpointInfo, m Method) func(_ error, issues []Issue, opID string, state ConnState, metadata map[string][]string) { var p DriverConnInvokeStartInfo p.Context = c + p.Call = call p.Endpoint = endpoint p.Method = m res := t.onConnInvoke(p) @@ -1366,9 +1497,10 @@ func DriverOnConnInvoke(t *Driver, c *context.Context, endpoint EndpointInfo, m res(p) } } -func DriverOnConnNewStream(t *Driver, c *context.Context, endpoint EndpointInfo, m Method) func(error) func(_ error, state ConnState, metadata map[string][]string) { +func DriverOnConnNewStream(t *Driver, c *context.Context, call call, endpoint EndpointInfo, m Method) func(error) func(_ error, state ConnState, metadata map[string][]string) { var p DriverConnNewStreamStartInfo p.Context = c + p.Call = call p.Endpoint = endpoint p.Method = m res := t.onConnNewStream(p) @@ -1385,9 +1517,10 @@ func DriverOnConnNewStream(t *Driver, c *context.Context, endpoint EndpointInfo, } } } -func DriverOnConnTake(t *Driver, c *context.Context, endpoint EndpointInfo) func(error) { +func DriverOnConnTake(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { var p DriverConnTakeStartInfo p.Context = c + p.Call = call p.Endpoint = endpoint res := t.onConnTake(p) return func(e error) { @@ -1396,9 +1529,10 @@ func DriverOnConnTake(t *Driver, c *context.Context, endpoint EndpointInfo) func res(p) } } -func DriverOnConnDial(t *Driver, c *context.Context, endpoint EndpointInfo) func(error) { +func DriverOnConnDial(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { var p DriverConnDialStartInfo p.Context = c + p.Call = call p.Endpoint = endpoint res := t.onConnDial(p) return func(e error) { @@ -1407,9 +1541,10 @@ func DriverOnConnDial(t *Driver, c *context.Context, endpoint EndpointInfo) func res(p) } } -func DriverOnConnPark(t *Driver, c *context.Context, endpoint EndpointInfo) func(error) { +func DriverOnConnPark(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { var p DriverConnParkStartInfo p.Context = c + p.Call = call p.Endpoint = endpoint res := t.onConnPark(p) return func(e error) { @@ -1418,9 +1553,10 @@ func DriverOnConnPark(t *Driver, c *context.Context, endpoint EndpointInfo) func res(p) } } -func DriverOnConnBan(t *Driver, c *context.Context, endpoint EndpointInfo, state ConnState, cause error) func(state ConnState) { +func DriverOnConnBan(t *Driver, c *context.Context, call call, endpoint EndpointInfo, state ConnState, cause error) func(state ConnState) { var p DriverConnBanStartInfo p.Context = c + p.Call = call p.Endpoint = endpoint p.State = state p.Cause = cause @@ -1431,9 +1567,10 @@ func DriverOnConnBan(t *Driver, c *context.Context, endpoint EndpointInfo, state res(p) } } -func DriverOnConnAllow(t *Driver, c *context.Context, endpoint EndpointInfo, state ConnState) func(state ConnState) { +func DriverOnConnAllow(t *Driver, c *context.Context, call call, endpoint EndpointInfo, state ConnState) func(state ConnState) { var p DriverConnAllowStartInfo p.Context = c + p.Call = call p.Endpoint = endpoint p.State = state res := t.onConnAllow(p) @@ -1443,9 +1580,10 @@ func DriverOnConnAllow(t *Driver, c *context.Context, endpoint EndpointInfo, sta res(p) } } -func DriverOnConnClose(t *Driver, c *context.Context, endpoint EndpointInfo) func(error) { +func DriverOnConnClose(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { var p DriverConnCloseStartInfo p.Context = c + p.Call = call p.Endpoint = endpoint res := t.onConnClose(p) return func(e error) { @@ -1454,9 +1592,10 @@ func DriverOnConnClose(t *Driver, c *context.Context, endpoint EndpointInfo) fun res(p) } } -func DriverOnRepeaterWakeUp(t *Driver, c *context.Context, name string, event string) func(error) { +func DriverOnRepeaterWakeUp(t *Driver, c *context.Context, call call, name string, event string) func(error) { var p DriverRepeaterWakeUpStartInfo p.Context = c + p.Call = call p.Name = name p.Event = event res := t.onRepeaterWakeUp(p) @@ -1466,9 +1605,10 @@ func DriverOnRepeaterWakeUp(t *Driver, c *context.Context, name string, event st res(p) } } -func DriverOnBalancerInit(t *Driver, c *context.Context) func(error) { +func DriverOnBalancerInit(t *Driver, c *context.Context, call call) func(error) { var p DriverBalancerInitStartInfo p.Context = c + p.Call = call res := t.onBalancerInit(p) return func(e error) { var p DriverBalancerInitDoneInfo @@ -1476,9 +1616,10 @@ func DriverOnBalancerInit(t *Driver, c *context.Context) func(error) { res(p) } } -func DriverOnBalancerDialEntrypoint(t *Driver, c *context.Context, address string) func(error) { +func DriverOnBalancerDialEntrypoint(t *Driver, c *context.Context, call call, address string) func(error) { var p DriverBalancerDialEntrypointStartInfo p.Context = c + p.Call = call p.Address = address res := t.onBalancerDialEntrypoint(p) return func(e error) { @@ -1487,9 +1628,10 @@ func DriverOnBalancerDialEntrypoint(t *Driver, c *context.Context, address strin res(p) } } -func DriverOnBalancerClose(t *Driver, c *context.Context) func(error) { +func DriverOnBalancerClose(t *Driver, c *context.Context, call call) func(error) { var p DriverBalancerCloseStartInfo p.Context = c + p.Call = call res := t.onBalancerClose(p) return func(e error) { var p DriverBalancerCloseDoneInfo @@ -1497,9 +1639,10 @@ func DriverOnBalancerClose(t *Driver, c *context.Context) func(error) { res(p) } } -func DriverOnBalancerChooseEndpoint(t *Driver, c *context.Context) func(endpoint EndpointInfo, _ error) { +func DriverOnBalancerChooseEndpoint(t *Driver, c *context.Context, call call) func(endpoint EndpointInfo, _ error) { var p DriverBalancerChooseEndpointStartInfo p.Context = c + p.Call = call res := t.onBalancerChooseEndpoint(p) return func(endpoint EndpointInfo, e error) { var p DriverBalancerChooseEndpointDoneInfo @@ -1508,9 +1651,10 @@ func DriverOnBalancerChooseEndpoint(t *Driver, c *context.Context) func(endpoint res(p) } } -func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, address string) func(error) { +func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, call call, address string) func(error) { var p DriverBalancerClusterDiscoveryAttemptStartInfo p.Context = c + p.Call = call p.Address = address res := t.onBalancerClusterDiscoveryAttempt(p) return func(e error) { @@ -1519,9 +1663,10 @@ func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, addr res(p) } } -func DriverOnBalancerUpdate(t *Driver, c *context.Context, needLocalDC bool) func(endpoints []EndpointInfo, localDC string, _ error) { +func DriverOnBalancerUpdate(t *Driver, c *context.Context, call call, needLocalDC bool) func(endpoints []EndpointInfo, localDC string, _ error) { var p DriverBalancerUpdateStartInfo p.Context = c + p.Call = call p.NeedLocalDC = needLocalDC res := t.onBalancerUpdate(p) return func(endpoints []EndpointInfo, localDC string, e error) { @@ -1532,9 +1677,10 @@ func DriverOnBalancerUpdate(t *Driver, c *context.Context, needLocalDC bool) fun res(p) } } -func DriverOnGetCredentials(t *Driver, c *context.Context) func(token string, _ error) { +func DriverOnGetCredentials(t *Driver, c *context.Context, call call) func(token string, _ error) { var p DriverGetCredentialsStartInfo p.Context = c + p.Call = call res := t.onGetCredentials(p) return func(token string, e error) { var p DriverGetCredentialsDoneInfo diff --git a/trace/retry.go b/trace/retry.go index 7de4bb5f8..0ac2c483b 100644 --- a/trace/retry.go +++ b/trace/retry.go @@ -24,6 +24,7 @@ type ( // Deprecated: use Label field instead ID string + Call call Label string Idempotent bool diff --git a/trace/retry_gtrace.go b/trace/retry_gtrace.go index b2fcb3719..ea80a385f 100644 --- a/trace/retry_gtrace.go +++ b/trace/retry_gtrace.go @@ -110,10 +110,11 @@ func (t *Retry) onRetry(r RetryLoopStartInfo) func(RetryLoopIntermediateInfo) fu return res } } -func RetryOnRetry(t *Retry, c *context.Context, iD string, label string, idempotent bool, nestedCall bool) func(error) func(attempts int, _ error) { +func RetryOnRetry(t *Retry, c *context.Context, iD string, call call, label string, idempotent bool, nestedCall bool) func(error) func(attempts int, _ error) { var p RetryLoopStartInfo p.Context = c p.ID = iD + p.Call = call p.Label = label p.Idempotent = idempotent p.NestedCall = nestedCall diff --git a/trace/scheme.go b/trace/scheme.go index c3aec867e..2f4c6f48b 100644 --- a/trace/scheme.go +++ b/trace/scheme.go @@ -1,5 +1,9 @@ package trace +import ( + "context" +) + // tool gtrace used from ./internal/cmd/gtrace //go:generate gtrace @@ -7,5 +11,72 @@ package trace type ( // Scheme specified trace of scheme client activity. // gtrace:gen - Scheme struct{} + Scheme struct { + OnListDirectory func(SchemeListDirectoryStartInfo) func(SchemeListDirectoryDoneInfo) + OnDescribePath func(SchemeDescribePathStartInfo) func(SchemeDescribePathDoneInfo) + OnMakeDirectory func(SchemeMakeDirectoryStartInfo) func(SchemeMakeDirectoryDoneInfo) + OnRemoveDirectory func(SchemeRemoveDirectoryStartInfo) func(SchemeRemoveDirectoryDoneInfo) + OnModifyPermissions func(SchemeModifyPermissionsStartInfo) func(SchemeModifyPermissionsDoneInfo) + } + + SchemeListDirectoryStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + } + SchemeListDirectoryDoneInfo struct { + Error error + } + SchemeDescribePathStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + Path string + } + SchemeDescribePathDoneInfo struct { + EntryType string + Error error + } + SchemeMakeDirectoryStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + Path string + } + SchemeMakeDirectoryDoneInfo struct { + Error error + } + SchemeRemoveDirectoryStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + Path string + } + SchemeRemoveDirectoryDoneInfo struct { + Error error + } + SchemeModifyPermissionsStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + Path string + } + SchemeModifyPermissionsDoneInfo struct { + Error error + } ) diff --git a/trace/scheme_gtrace.go b/trace/scheme_gtrace.go index 633b36fd5..0001d9e17 100644 --- a/trace/scheme_gtrace.go +++ b/trace/scheme_gtrace.go @@ -2,6 +2,10 @@ package trace +import ( + "context" +) + // schemeComposeOptions is a holder of options type schemeComposeOptions struct { panicCallback func(e interface{}) @@ -20,5 +24,321 @@ func WithSchemePanicCallback(cb func(e interface{})) SchemeComposeOption { // Compose returns a new Scheme which has functional fields composed both from t and x. func (t *Scheme) Compose(x *Scheme, opts ...SchemeComposeOption) *Scheme { var ret Scheme + options := schemeComposeOptions{} + for _, opt := range opts { + if opt != nil { + opt(&options) + } + } + { + h1 := t.OnListDirectory + h2 := x.OnListDirectory + ret.OnListDirectory = func(s SchemeListDirectoryStartInfo) func(SchemeListDirectoryDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(SchemeListDirectoryDoneInfo) + if h1 != nil { + r = h1(s) + } + if h2 != nil { + r1 = h2(s) + } + return func(s SchemeListDirectoryDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(s) + } + if r1 != nil { + r1(s) + } + } + } + } + { + h1 := t.OnDescribePath + h2 := x.OnDescribePath + ret.OnDescribePath = func(s SchemeDescribePathStartInfo) func(SchemeDescribePathDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(SchemeDescribePathDoneInfo) + if h1 != nil { + r = h1(s) + } + if h2 != nil { + r1 = h2(s) + } + return func(s SchemeDescribePathDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(s) + } + if r1 != nil { + r1(s) + } + } + } + } + { + h1 := t.OnMakeDirectory + h2 := x.OnMakeDirectory + ret.OnMakeDirectory = func(s SchemeMakeDirectoryStartInfo) func(SchemeMakeDirectoryDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(SchemeMakeDirectoryDoneInfo) + if h1 != nil { + r = h1(s) + } + if h2 != nil { + r1 = h2(s) + } + return func(s SchemeMakeDirectoryDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(s) + } + if r1 != nil { + r1(s) + } + } + } + } + { + h1 := t.OnRemoveDirectory + h2 := x.OnRemoveDirectory + ret.OnRemoveDirectory = func(s SchemeRemoveDirectoryStartInfo) func(SchemeRemoveDirectoryDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(SchemeRemoveDirectoryDoneInfo) + if h1 != nil { + r = h1(s) + } + if h2 != nil { + r1 = h2(s) + } + return func(s SchemeRemoveDirectoryDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(s) + } + if r1 != nil { + r1(s) + } + } + } + } + { + h1 := t.OnModifyPermissions + h2 := x.OnModifyPermissions + ret.OnModifyPermissions = func(s SchemeModifyPermissionsStartInfo) func(SchemeModifyPermissionsDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(SchemeModifyPermissionsDoneInfo) + if h1 != nil { + r = h1(s) + } + if h2 != nil { + r1 = h2(s) + } + return func(s SchemeModifyPermissionsDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(s) + } + if r1 != nil { + r1(s) + } + } + } + } return &ret } +func (t *Scheme) onListDirectory(s SchemeListDirectoryStartInfo) func(SchemeListDirectoryDoneInfo) { + fn := t.OnListDirectory + if fn == nil { + return func(SchemeListDirectoryDoneInfo) { + return + } + } + res := fn(s) + if res == nil { + return func(SchemeListDirectoryDoneInfo) { + return + } + } + return res +} +func (t *Scheme) onDescribePath(s SchemeDescribePathStartInfo) func(SchemeDescribePathDoneInfo) { + fn := t.OnDescribePath + if fn == nil { + return func(SchemeDescribePathDoneInfo) { + return + } + } + res := fn(s) + if res == nil { + return func(SchemeDescribePathDoneInfo) { + return + } + } + return res +} +func (t *Scheme) onMakeDirectory(s SchemeMakeDirectoryStartInfo) func(SchemeMakeDirectoryDoneInfo) { + fn := t.OnMakeDirectory + if fn == nil { + return func(SchemeMakeDirectoryDoneInfo) { + return + } + } + res := fn(s) + if res == nil { + return func(SchemeMakeDirectoryDoneInfo) { + return + } + } + return res +} +func (t *Scheme) onRemoveDirectory(s SchemeRemoveDirectoryStartInfo) func(SchemeRemoveDirectoryDoneInfo) { + fn := t.OnRemoveDirectory + if fn == nil { + return func(SchemeRemoveDirectoryDoneInfo) { + return + } + } + res := fn(s) + if res == nil { + return func(SchemeRemoveDirectoryDoneInfo) { + return + } + } + return res +} +func (t *Scheme) onModifyPermissions(s SchemeModifyPermissionsStartInfo) func(SchemeModifyPermissionsDoneInfo) { + fn := t.OnModifyPermissions + if fn == nil { + return func(SchemeModifyPermissionsDoneInfo) { + return + } + } + res := fn(s) + if res == nil { + return func(SchemeModifyPermissionsDoneInfo) { + return + } + } + return res +} +func SchemeOnListDirectory(t *Scheme, c *context.Context, call call) func(error) { + var p SchemeListDirectoryStartInfo + p.Context = c + p.Call = call + res := t.onListDirectory(p) + return func(e error) { + var p SchemeListDirectoryDoneInfo + p.Error = e + res(p) + } +} +func SchemeOnDescribePath(t *Scheme, c *context.Context, call call, path string) func(entryType string, _ error) { + var p SchemeDescribePathStartInfo + p.Context = c + p.Call = call + p.Path = path + res := t.onDescribePath(p) + return func(entryType string, e error) { + var p SchemeDescribePathDoneInfo + p.EntryType = entryType + p.Error = e + res(p) + } +} +func SchemeOnMakeDirectory(t *Scheme, c *context.Context, call call, path string) func(error) { + var p SchemeMakeDirectoryStartInfo + p.Context = c + p.Call = call + p.Path = path + res := t.onMakeDirectory(p) + return func(e error) { + var p SchemeMakeDirectoryDoneInfo + p.Error = e + res(p) + } +} +func SchemeOnRemoveDirectory(t *Scheme, c *context.Context, call call, path string) func(error) { + var p SchemeRemoveDirectoryStartInfo + p.Context = c + p.Call = call + p.Path = path + res := t.onRemoveDirectory(p) + return func(e error) { + var p SchemeRemoveDirectoryDoneInfo + p.Error = e + res(p) + } +} +func SchemeOnModifyPermissions(t *Scheme, c *context.Context, call call, path string) func(error) { + var p SchemeModifyPermissionsStartInfo + p.Context = c + p.Call = call + p.Path = path + res := t.onModifyPermissions(p) + return func(e error) { + var p SchemeModifyPermissionsDoneInfo + p.Error = e + res(p) + } +} diff --git a/trace/scripting.go b/trace/scripting.go index f6942acf3..4e018e07d 100644 --- a/trace/scripting.go +++ b/trace/scripting.go @@ -39,6 +39,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Query string Parameters scriptingQueryParameters } @@ -52,6 +53,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Query string Parameters scriptingQueryParameters } @@ -67,6 +69,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Query string } ScriptingExplainDoneInfo struct { @@ -79,6 +82,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } ScriptingCloseDoneInfo struct { Error error diff --git a/trace/scripting_gtrace.go b/trace/scripting_gtrace.go index 39dc01585..8af5fd07e 100644 --- a/trace/scripting_gtrace.go +++ b/trace/scripting_gtrace.go @@ -260,9 +260,10 @@ func (t *Scripting) onClose(s ScriptingCloseStartInfo) func(ScriptingCloseDoneIn } return res } -func ScriptingOnExecute(t *Scripting, c *context.Context, query string, parameters scriptingQueryParameters) func(result scriptingResult, _ error) { +func ScriptingOnExecute(t *Scripting, c *context.Context, call call, query string, parameters scriptingQueryParameters) func(result scriptingResult, _ error) { var p ScriptingExecuteStartInfo p.Context = c + p.Call = call p.Query = query p.Parameters = parameters res := t.onExecute(p) @@ -273,9 +274,10 @@ func ScriptingOnExecute(t *Scripting, c *context.Context, query string, paramete res(p) } } -func ScriptingOnStreamExecute(t *Scripting, c *context.Context, query string, parameters scriptingQueryParameters) func(error) func(error) { +func ScriptingOnStreamExecute(t *Scripting, c *context.Context, call call, query string, parameters scriptingQueryParameters) func(error) func(error) { var p ScriptingStreamExecuteStartInfo p.Context = c + p.Call = call p.Query = query p.Parameters = parameters res := t.onStreamExecute(p) @@ -290,9 +292,10 @@ func ScriptingOnStreamExecute(t *Scripting, c *context.Context, query string, pa } } } -func ScriptingOnExplain(t *Scripting, c *context.Context, query string) func(plan string, _ error) { +func ScriptingOnExplain(t *Scripting, c *context.Context, call call, query string) func(plan string, _ error) { var p ScriptingExplainStartInfo p.Context = c + p.Call = call p.Query = query res := t.onExplain(p) return func(plan string, e error) { @@ -302,9 +305,10 @@ func ScriptingOnExplain(t *Scripting, c *context.Context, query string) func(pla res(p) } } -func ScriptingOnClose(t *Scripting, c *context.Context) func(error) { +func ScriptingOnClose(t *Scripting, c *context.Context, call call) func(error) { var p ScriptingCloseStartInfo p.Context = c + p.Call = call res := t.onClose(p) return func(e error) { var p ScriptingCloseDoneInfo diff --git a/trace/sql.go b/trace/sql.go index af591d1d1..2c21fad26 100644 --- a/trace/sql.go +++ b/trace/sql.go @@ -13,15 +13,17 @@ type ( DatabaseSQL struct { OnConnectorConnect func(DatabaseSQLConnectorConnectStartInfo) func(DatabaseSQLConnectorConnectDoneInfo) - OnConnPing func(DatabaseSQLConnPingStartInfo) func(DatabaseSQLConnPingDoneInfo) - OnConnPrepare func(DatabaseSQLConnPrepareStartInfo) func(DatabaseSQLConnPrepareDoneInfo) - OnConnClose func(DatabaseSQLConnCloseStartInfo) func(DatabaseSQLConnCloseDoneInfo) - OnConnBegin func(DatabaseSQLConnBeginStartInfo) func(DatabaseSQLConnBeginDoneInfo) - OnConnQuery func(DatabaseSQLConnQueryStartInfo) func(DatabaseSQLConnQueryDoneInfo) - OnConnExec func(DatabaseSQLConnExecStartInfo) func(DatabaseSQLConnExecDoneInfo) + OnConnPing func(DatabaseSQLConnPingStartInfo) func(DatabaseSQLConnPingDoneInfo) + OnConnPrepare func(DatabaseSQLConnPrepareStartInfo) func(DatabaseSQLConnPrepareDoneInfo) + OnConnClose func(DatabaseSQLConnCloseStartInfo) func(DatabaseSQLConnCloseDoneInfo) + OnConnBegin func(DatabaseSQLConnBeginStartInfo) func(DatabaseSQLConnBeginDoneInfo) + OnConnQuery func(DatabaseSQLConnQueryStartInfo) func(DatabaseSQLConnQueryDoneInfo) + OnConnExec func(DatabaseSQLConnExecStartInfo) func(DatabaseSQLConnExecDoneInfo) + OnConnIsTableExists func(DatabaseSQLConnIsTableExistsStartInfo) func(DatabaseSQLConnIsTableExistsDoneInfo) OnTxQuery func(DatabaseSQLTxQueryStartInfo) func(DatabaseSQLTxQueryDoneInfo) OnTxExec func(DatabaseSQLTxExecStartInfo) func(DatabaseSQLTxExecDoneInfo) + OnTxPrepare func(DatabaseSQLTxPrepareStartInfo) func(DatabaseSQLTxPrepareDoneInfo) OnTxCommit func(DatabaseSQLTxCommitStartInfo) func(DatabaseSQLTxCommitDoneInfo) OnTxRollback func(DatabaseSQLTxRollbackStartInfo) func(DatabaseSQLTxRollbackDoneInfo) @@ -64,6 +66,19 @@ type ( DatabaseSQLConnPrepareDoneInfo struct { Error error } + DatabaseSQLTxPrepareStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + TxContext *context.Context + Tx tableTransactionInfo + Query string + } + DatabaseSQLTxPrepareDoneInfo struct { + Error error + } DatabaseSQLConnCloseStartInfo struct{} DatabaseSQLConnCloseDoneInfo struct { Error error @@ -107,16 +122,28 @@ type ( DatabaseSQLConnExecDoneInfo struct { Error error } + DatabaseSQLConnIsTableExistsStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + TableName string + } + DatabaseSQLConnIsTableExistsDoneInfo struct { + Exists bool + Error error + } DatabaseSQLTxQueryStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function - Context *context.Context - TxContext context.Context - Tx tableTransactionInfo - Query string - Idempotent bool + Context *context.Context + TxContext *context.Context + Tx tableTransactionInfo + Query string } DatabaseSQLTxQueryDoneInfo struct { Error error @@ -126,39 +153,40 @@ type ( // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function - Context *context.Context - TxContext context.Context - Tx tableTransactionInfo - Query string - Idempotent bool + Context *context.Context + TxContext *context.Context + Tx tableTransactionInfo + Query string } DatabaseSQLTxExecDoneInfo struct { Error error } DatabaseSQLTxCommitStartInfo struct { - // Context make available context in trace callback function. + // TxContext make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function - Context *context.Context - Tx tableTransactionInfo + TxContext *context.Context + Tx tableTransactionInfo } DatabaseSQLTxCommitDoneInfo struct { Error error } DatabaseSQLTxRollbackStartInfo struct { - // Context make available context in trace callback function. + // TxContext make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function - Context *context.Context - Tx tableTransactionInfo + TxContext *context.Context + Tx tableTransactionInfo } DatabaseSQLTxRollbackDoneInfo struct { Error error } - DatabaseSQLStmtCloseStartInfo struct{} - DatabaseSQLStmtCloseDoneInfo struct { + DatabaseSQLStmtCloseStartInfo struct { + StmtContext *context.Context + } + DatabaseSQLStmtCloseDoneInfo struct { Error error } DatabaseSQLStmtQueryStartInfo struct { @@ -166,8 +194,9 @@ type ( // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function - Context *context.Context - Query string + Context *context.Context + StmtContext *context.Context + Query string } DatabaseSQLStmtQueryDoneInfo struct { Error error @@ -177,8 +206,9 @@ type ( // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function - Context *context.Context - Query string + Context *context.Context + StmtContext *context.Context + Query string } DatabaseSQLStmtExecDoneInfo struct { Error error diff --git a/trace/sql_gtrace.go b/trace/sql_gtrace.go index 2e05d6e2f..26b707414 100644 --- a/trace/sql_gtrace.go +++ b/trace/sql_gtrace.go @@ -276,6 +276,41 @@ func (t *DatabaseSQL) Compose(x *DatabaseSQL, opts ...DatabaseSQLComposeOption) } } } + { + h1 := t.OnConnIsTableExists + h2 := x.OnConnIsTableExists + ret.OnConnIsTableExists = func(d DatabaseSQLConnIsTableExistsStartInfo) func(DatabaseSQLConnIsTableExistsDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(DatabaseSQLConnIsTableExistsDoneInfo) + if h1 != nil { + r = h1(d) + } + if h2 != nil { + r1 = h2(d) + } + return func(d DatabaseSQLConnIsTableExistsDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(d) + } + if r1 != nil { + r1(d) + } + } + } + } { h1 := t.OnTxQuery h2 := x.OnTxQuery @@ -346,6 +381,41 @@ func (t *DatabaseSQL) Compose(x *DatabaseSQL, opts ...DatabaseSQLComposeOption) } } } + { + h1 := t.OnTxPrepare + h2 := x.OnTxPrepare + ret.OnTxPrepare = func(d DatabaseSQLTxPrepareStartInfo) func(DatabaseSQLTxPrepareDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(DatabaseSQLTxPrepareDoneInfo) + if h1 != nil { + r = h1(d) + } + if h2 != nil { + r1 = h2(d) + } + return func(d DatabaseSQLTxPrepareDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(d) + } + if r1 != nil { + r1(d) + } + } + } + } { h1 := t.OnTxCommit h2 := x.OnTxCommit @@ -679,6 +749,21 @@ func (t *DatabaseSQL) onConnExec(d DatabaseSQLConnExecStartInfo) func(DatabaseSQ } return res } +func (t *DatabaseSQL) onConnIsTableExists(d DatabaseSQLConnIsTableExistsStartInfo) func(DatabaseSQLConnIsTableExistsDoneInfo) { + fn := t.OnConnIsTableExists + if fn == nil { + return func(DatabaseSQLConnIsTableExistsDoneInfo) { + return + } + } + res := fn(d) + if res == nil { + return func(DatabaseSQLConnIsTableExistsDoneInfo) { + return + } + } + return res +} func (t *DatabaseSQL) onTxQuery(d DatabaseSQLTxQueryStartInfo) func(DatabaseSQLTxQueryDoneInfo) { fn := t.OnTxQuery if fn == nil { @@ -709,6 +794,21 @@ func (t *DatabaseSQL) onTxExec(d DatabaseSQLTxExecStartInfo) func(DatabaseSQLTxE } return res } +func (t *DatabaseSQL) onTxPrepare(d DatabaseSQLTxPrepareStartInfo) func(DatabaseSQLTxPrepareDoneInfo) { + fn := t.OnTxPrepare + if fn == nil { + return func(DatabaseSQLTxPrepareDoneInfo) { + return + } + } + res := fn(d) + if res == nil { + return func(DatabaseSQLTxPrepareDoneInfo) { + return + } + } + return res +} func (t *DatabaseSQL) onTxCommit(d DatabaseSQLTxCommitStartInfo) func(DatabaseSQLTxCommitDoneInfo) { fn := t.OnTxCommit if fn == nil { @@ -891,13 +991,25 @@ func DatabaseSQLOnConnExec(t *DatabaseSQL, c *context.Context, query string, mod res(p) } } -func DatabaseSQLOnTxQuery(t *DatabaseSQL, c *context.Context, txContext context.Context, tx tableTransactionInfo, query string, idempotent bool) func(error) { +func DatabaseSQLOnConnIsTableExists(t *DatabaseSQL, c *context.Context, call call, tableName string) func(exists bool, _ error) { + var p DatabaseSQLConnIsTableExistsStartInfo + p.Context = c + p.Call = call + p.TableName = tableName + res := t.onConnIsTableExists(p) + return func(exists bool, e error) { + var p DatabaseSQLConnIsTableExistsDoneInfo + p.Exists = exists + p.Error = e + res(p) + } +} +func DatabaseSQLOnTxQuery(t *DatabaseSQL, c *context.Context, txContext *context.Context, tx tableTransactionInfo, query string) func(error) { var p DatabaseSQLTxQueryStartInfo p.Context = c p.TxContext = txContext p.Tx = tx p.Query = query - p.Idempotent = idempotent res := t.onTxQuery(p) return func(e error) { var p DatabaseSQLTxQueryDoneInfo @@ -905,13 +1017,12 @@ func DatabaseSQLOnTxQuery(t *DatabaseSQL, c *context.Context, txContext context. res(p) } } -func DatabaseSQLOnTxExec(t *DatabaseSQL, c *context.Context, txContext context.Context, tx tableTransactionInfo, query string, idempotent bool) func(error) { +func DatabaseSQLOnTxExec(t *DatabaseSQL, c *context.Context, txContext *context.Context, tx tableTransactionInfo, query string) func(error) { var p DatabaseSQLTxExecStartInfo p.Context = c p.TxContext = txContext p.Tx = tx p.Query = query - p.Idempotent = idempotent res := t.onTxExec(p) return func(e error) { var p DatabaseSQLTxExecDoneInfo @@ -919,9 +1030,22 @@ func DatabaseSQLOnTxExec(t *DatabaseSQL, c *context.Context, txContext context.C res(p) } } -func DatabaseSQLOnTxCommit(t *DatabaseSQL, c *context.Context, tx tableTransactionInfo) func(error) { - var p DatabaseSQLTxCommitStartInfo +func DatabaseSQLOnTxPrepare(t *DatabaseSQL, c *context.Context, txContext *context.Context, tx tableTransactionInfo, query string) func(error) { + var p DatabaseSQLTxPrepareStartInfo p.Context = c + p.TxContext = txContext + p.Tx = tx + p.Query = query + res := t.onTxPrepare(p) + return func(e error) { + var p DatabaseSQLTxPrepareDoneInfo + p.Error = e + res(p) + } +} +func DatabaseSQLOnTxCommit(t *DatabaseSQL, txContext *context.Context, tx tableTransactionInfo) func(error) { + var p DatabaseSQLTxCommitStartInfo + p.TxContext = txContext p.Tx = tx res := t.onTxCommit(p) return func(e error) { @@ -930,9 +1054,9 @@ func DatabaseSQLOnTxCommit(t *DatabaseSQL, c *context.Context, tx tableTransacti res(p) } } -func DatabaseSQLOnTxRollback(t *DatabaseSQL, c *context.Context, tx tableTransactionInfo) func(error) { +func DatabaseSQLOnTxRollback(t *DatabaseSQL, txContext *context.Context, tx tableTransactionInfo) func(error) { var p DatabaseSQLTxRollbackStartInfo - p.Context = c + p.TxContext = txContext p.Tx = tx res := t.onTxRollback(p) return func(e error) { @@ -941,9 +1065,10 @@ func DatabaseSQLOnTxRollback(t *DatabaseSQL, c *context.Context, tx tableTransac res(p) } } -func DatabaseSQLOnStmtQuery(t *DatabaseSQL, c *context.Context, query string) func(error) { +func DatabaseSQLOnStmtQuery(t *DatabaseSQL, c *context.Context, stmtContext *context.Context, query string) func(error) { var p DatabaseSQLStmtQueryStartInfo p.Context = c + p.StmtContext = stmtContext p.Query = query res := t.onStmtQuery(p) return func(e error) { @@ -952,9 +1077,10 @@ func DatabaseSQLOnStmtQuery(t *DatabaseSQL, c *context.Context, query string) fu res(p) } } -func DatabaseSQLOnStmtExec(t *DatabaseSQL, c *context.Context, query string) func(error) { +func DatabaseSQLOnStmtExec(t *DatabaseSQL, c *context.Context, stmtContext *context.Context, query string) func(error) { var p DatabaseSQLStmtExecStartInfo p.Context = c + p.StmtContext = stmtContext p.Query = query res := t.onStmtExec(p) return func(e error) { @@ -963,8 +1089,9 @@ func DatabaseSQLOnStmtExec(t *DatabaseSQL, c *context.Context, query string) fun res(p) } } -func DatabaseSQLOnStmtClose(t *DatabaseSQL) func(error) { +func DatabaseSQLOnStmtClose(t *DatabaseSQL, stmtContext *context.Context) func(error) { var p DatabaseSQLStmtCloseStartInfo + p.StmtContext = stmtContext res := t.onStmtClose(p) return func(e error) { var p DatabaseSQLStmtCloseDoneInfo diff --git a/trace/table.go b/trace/table.go index e4a70546e..81e298324 100644 --- a/trace/table.go +++ b/trace/table.go @@ -30,6 +30,7 @@ type ( OnSessionDelete func(TableSessionDeleteStartInfo) func(TableSessionDeleteDoneInfo) OnSessionKeepAlive func(TableKeepAliveStartInfo) func(TableKeepAliveDoneInfo) // Query events + OnSessionBulkUpsert func(TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) OnSessionQueryPrepare func(TablePrepareDataQueryStartInfo) func(TablePrepareDataQueryDoneInfo) OnSessionQueryExecute func(TableExecuteDataQueryStartInfo) func(TableExecuteDataQueryDoneInfo) OnSessionQueryExplain func(TableExplainQueryStartInfo) func(TableExplainQueryDoneInfo) @@ -119,6 +120,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } TableSessionNewDoneInfo struct { Session tableSessionInfo @@ -130,17 +132,31 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo } TableKeepAliveDoneInfo struct { Error error } + TableBulkUpsertStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + Session tableSessionInfo + } + TableBulkUpsertDoneInfo struct { + Error error + } TableSessionDeleteStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo } TableSessionDeleteDoneInfo struct { @@ -152,6 +168,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo Query string } @@ -165,6 +182,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo Query tableDataQuery Parameters tableQueryParameters @@ -176,6 +194,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo Tx tableTransactionInfo Query tableDataQuery @@ -187,6 +206,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo Tx tableTransactionInfo StatementQuery tableDataQuery @@ -198,6 +218,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo Query string } @@ -226,6 +247,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo } TableSessionQueryStreamReadIntermediateInfo struct { @@ -240,6 +262,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo Query tableDataQuery Parameters tableQueryParameters @@ -256,6 +279,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo } TableSessionTransactionBeginDoneInfo struct { @@ -268,6 +292,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo Tx tableTransactionInfo } @@ -280,6 +305,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo Tx tableTransactionInfo } @@ -292,6 +318,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } TableInitDoneInfo struct { Limit int @@ -307,6 +334,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } TablePoolSessionNewDoneInfo struct { Session tableSessionInfo @@ -318,6 +346,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } TablePoolGetDoneInfo struct { Session tableSessionInfo @@ -330,6 +359,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } // TablePoolWaitDoneInfo means a wait iteration inside Get call is done // Warning: Session and Error may be nil at the same time. This means @@ -344,6 +374,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo } TablePoolPutDoneInfo struct { @@ -355,6 +386,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call Session tableSessionInfo } TablePoolSessionCloseDoneInfo struct{} @@ -370,6 +402,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } TableCloseDoneInfo struct { Error error @@ -380,6 +413,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call // Deprecated: use Label field instead ID string @@ -401,6 +435,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call // Deprecated: use Label field instead ID string @@ -422,6 +457,7 @@ type ( // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function Context *context.Context + Call call } TableCreateSessionIntermediateInfo struct { Error error diff --git a/trace/table_gtrace.go b/trace/table_gtrace.go index 9abbd0ce7..95630c8f8 100644 --- a/trace/table_gtrace.go +++ b/trace/table_gtrace.go @@ -358,6 +358,41 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } } } + { + h1 := t.OnSessionBulkUpsert + h2 := x.OnSessionBulkUpsert + ret.OnSessionBulkUpsert = func(t TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(TableBulkUpsertDoneInfo) + if h1 != nil { + r = h1(t) + } + if h2 != nil { + r1 = h2(t) + } + return func(t TableBulkUpsertDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(t) + } + if r1 != nil { + r1(t) + } + } + } + } { h1 := t.OnSessionQueryPrepare h2 := x.OnSessionQueryPrepare @@ -1130,6 +1165,21 @@ func (t *Table) onSessionKeepAlive(t1 TableKeepAliveStartInfo) func(TableKeepAli } return res } +func (t *Table) onSessionBulkUpsert(t1 TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { + fn := t.OnSessionBulkUpsert + if fn == nil { + return func(TableBulkUpsertDoneInfo) { + return + } + } + res := fn(t1) + if res == nil { + return func(TableBulkUpsertDoneInfo) { + return + } + } + return res +} func (t *Table) onSessionQueryPrepare(t1 TablePrepareDataQueryStartInfo) func(TablePrepareDataQueryDoneInfo) { fn := t.OnSessionQueryPrepare if fn == nil { @@ -1400,9 +1450,10 @@ func (t *Table) onPoolWait(t1 TablePoolWaitStartInfo) func(TablePoolWaitDoneInfo } return res } -func TableOnInit(t *Table, c *context.Context) func(limit int, _ error) { +func TableOnInit(t *Table, c *context.Context, call call) func(limit int, _ error) { var p TableInitStartInfo p.Context = c + p.Call = call res := t.onInit(p) return func(limit int, e error) { var p TableInitDoneInfo @@ -1411,9 +1462,10 @@ func TableOnInit(t *Table, c *context.Context) func(limit int, _ error) { res(p) } } -func TableOnClose(t *Table, c *context.Context) func(error) { +func TableOnClose(t *Table, c *context.Context, call call) func(error) { var p TableCloseStartInfo p.Context = c + p.Call = call res := t.onClose(p) return func(e error) { var p TableCloseDoneInfo @@ -1421,9 +1473,10 @@ func TableOnClose(t *Table, c *context.Context) func(error) { res(p) } } -func TableOnDo(t *Table, c *context.Context, iD string, label string, idempotent bool, nestedCall bool) func(error) func(attempts int, _ error) { +func TableOnDo(t *Table, c *context.Context, call call, iD string, label string, idempotent bool, nestedCall bool) func(error) func(attempts int, _ error) { var p TableDoStartInfo p.Context = c + p.Call = call p.ID = iD p.Label = label p.Idempotent = idempotent @@ -1441,9 +1494,10 @@ func TableOnDo(t *Table, c *context.Context, iD string, label string, idempotent } } } -func TableOnDoTx(t *Table, c *context.Context, iD string, label string, idempotent bool, nestedCall bool) func(error) func(attempts int, _ error) { +func TableOnDoTx(t *Table, c *context.Context, call call, iD string, label string, idempotent bool, nestedCall bool) func(error) func(attempts int, _ error) { var p TableDoTxStartInfo p.Context = c + p.Call = call p.ID = iD p.Label = label p.Idempotent = idempotent @@ -1461,9 +1515,10 @@ func TableOnDoTx(t *Table, c *context.Context, iD string, label string, idempote } } } -func TableOnCreateSession(t *Table, c *context.Context) func(error) func(session tableSessionInfo, attempts int, _ error) { +func TableOnCreateSession(t *Table, c *context.Context, call call) func(error) func(session tableSessionInfo, attempts int, _ error) { var p TableCreateSessionStartInfo p.Context = c + p.Call = call res := t.onCreateSession(p) return func(e error) func(tableSessionInfo, int, error) { var p TableCreateSessionIntermediateInfo @@ -1478,9 +1533,10 @@ func TableOnCreateSession(t *Table, c *context.Context) func(error) func(session } } } -func TableOnSessionNew(t *Table, c *context.Context) func(session tableSessionInfo, _ error) { +func TableOnSessionNew(t *Table, c *context.Context, call call) func(session tableSessionInfo, _ error) { var p TableSessionNewStartInfo p.Context = c + p.Call = call res := t.onSessionNew(p) return func(session tableSessionInfo, e error) { var p TableSessionNewDoneInfo @@ -1489,9 +1545,10 @@ func TableOnSessionNew(t *Table, c *context.Context) func(session tableSessionIn res(p) } } -func TableOnSessionDelete(t *Table, c *context.Context, session tableSessionInfo) func(error) { +func TableOnSessionDelete(t *Table, c *context.Context, call call, session tableSessionInfo) func(error) { var p TableSessionDeleteStartInfo p.Context = c + p.Call = call p.Session = session res := t.onSessionDelete(p) return func(e error) { @@ -1500,9 +1557,10 @@ func TableOnSessionDelete(t *Table, c *context.Context, session tableSessionInfo res(p) } } -func TableOnSessionKeepAlive(t *Table, c *context.Context, session tableSessionInfo) func(error) { +func TableOnSessionKeepAlive(t *Table, c *context.Context, call call, session tableSessionInfo) func(error) { var p TableKeepAliveStartInfo p.Context = c + p.Call = call p.Session = session res := t.onSessionKeepAlive(p) return func(e error) { @@ -1511,9 +1569,22 @@ func TableOnSessionKeepAlive(t *Table, c *context.Context, session tableSessionI res(p) } } -func TableOnSessionQueryPrepare(t *Table, c *context.Context, session tableSessionInfo, query string) func(result tableDataQuery, _ error) { +func TableOnSessionBulkUpsert(t *Table, c *context.Context, call call, session tableSessionInfo) func(error) { + var p TableBulkUpsertStartInfo + p.Context = c + p.Call = call + p.Session = session + res := t.onSessionBulkUpsert(p) + return func(e error) { + var p TableBulkUpsertDoneInfo + p.Error = e + res(p) + } +} +func TableOnSessionQueryPrepare(t *Table, c *context.Context, call call, session tableSessionInfo, query string) func(result tableDataQuery, _ error) { var p TablePrepareDataQueryStartInfo p.Context = c + p.Call = call p.Session = session p.Query = query res := t.onSessionQueryPrepare(p) @@ -1524,9 +1595,10 @@ func TableOnSessionQueryPrepare(t *Table, c *context.Context, session tableSessi res(p) } } -func TableOnSessionQueryExecute(t *Table, c *context.Context, session tableSessionInfo, query tableDataQuery, parameters tableQueryParameters, keepInCache bool) func(tx tableTransactionInfo, prepared bool, result tableResult, _ error) { +func TableOnSessionQueryExecute(t *Table, c *context.Context, call call, session tableSessionInfo, query tableDataQuery, parameters tableQueryParameters, keepInCache bool) func(tx tableTransactionInfo, prepared bool, result tableResult, _ error) { var p TableExecuteDataQueryStartInfo p.Context = c + p.Call = call p.Session = session p.Query = query p.Parameters = parameters @@ -1541,9 +1613,10 @@ func TableOnSessionQueryExecute(t *Table, c *context.Context, session tableSessi res(p) } } -func TableOnSessionQueryExplain(t *Table, c *context.Context, session tableSessionInfo, query string) func(aST string, plan string, _ error) { +func TableOnSessionQueryExplain(t *Table, c *context.Context, call call, session tableSessionInfo, query string) func(aST string, plan string, _ error) { var p TableExplainQueryStartInfo p.Context = c + p.Call = call p.Session = session p.Query = query res := t.onSessionQueryExplain(p) @@ -1555,9 +1628,10 @@ func TableOnSessionQueryExplain(t *Table, c *context.Context, session tableSessi res(p) } } -func TableOnSessionQueryStreamExecute(t *Table, c *context.Context, session tableSessionInfo, query tableDataQuery, parameters tableQueryParameters) func(error) func(error) { +func TableOnSessionQueryStreamExecute(t *Table, c *context.Context, call call, session tableSessionInfo, query tableDataQuery, parameters tableQueryParameters) func(error) func(error) { var p TableSessionQueryStreamExecuteStartInfo p.Context = c + p.Call = call p.Session = session p.Query = query p.Parameters = parameters @@ -1573,9 +1647,10 @@ func TableOnSessionQueryStreamExecute(t *Table, c *context.Context, session tabl } } } -func TableOnSessionQueryStreamRead(t *Table, c *context.Context, session tableSessionInfo) func(error) func(error) { +func TableOnSessionQueryStreamRead(t *Table, c *context.Context, call call, session tableSessionInfo) func(error) func(error) { var p TableSessionQueryStreamReadStartInfo p.Context = c + p.Call = call p.Session = session res := t.onSessionQueryStreamRead(p) return func(e error) func(error) { @@ -1589,9 +1664,10 @@ func TableOnSessionQueryStreamRead(t *Table, c *context.Context, session tableSe } } } -func TableOnSessionTransactionBegin(t *Table, c *context.Context, session tableSessionInfo) func(tx tableTransactionInfo, _ error) { +func TableOnSessionTransactionBegin(t *Table, c *context.Context, call call, session tableSessionInfo) func(tx tableTransactionInfo, _ error) { var p TableSessionTransactionBeginStartInfo p.Context = c + p.Call = call p.Session = session res := t.onSessionTransactionBegin(p) return func(tx tableTransactionInfo, e error) { @@ -1601,9 +1677,10 @@ func TableOnSessionTransactionBegin(t *Table, c *context.Context, session tableS res(p) } } -func TableOnSessionTransactionExecute(t *Table, c *context.Context, session tableSessionInfo, tx tableTransactionInfo, query tableDataQuery, parameters tableQueryParameters) func(result tableResult, _ error) { +func TableOnSessionTransactionExecute(t *Table, c *context.Context, call call, session tableSessionInfo, tx tableTransactionInfo, query tableDataQuery, parameters tableQueryParameters) func(result tableResult, _ error) { var p TableTransactionExecuteStartInfo p.Context = c + p.Call = call p.Session = session p.Tx = tx p.Query = query @@ -1616,9 +1693,10 @@ func TableOnSessionTransactionExecute(t *Table, c *context.Context, session tabl res(p) } } -func TableOnSessionTransactionExecuteStatement(t *Table, c *context.Context, session tableSessionInfo, tx tableTransactionInfo, statementQuery tableDataQuery, parameters tableQueryParameters) func(result tableResult, _ error) { +func TableOnSessionTransactionExecuteStatement(t *Table, c *context.Context, call call, session tableSessionInfo, tx tableTransactionInfo, statementQuery tableDataQuery, parameters tableQueryParameters) func(result tableResult, _ error) { var p TableTransactionExecuteStatementStartInfo p.Context = c + p.Call = call p.Session = session p.Tx = tx p.StatementQuery = statementQuery @@ -1631,9 +1709,10 @@ func TableOnSessionTransactionExecuteStatement(t *Table, c *context.Context, ses res(p) } } -func TableOnSessionTransactionCommit(t *Table, c *context.Context, session tableSessionInfo, tx tableTransactionInfo) func(error) { +func TableOnSessionTransactionCommit(t *Table, c *context.Context, call call, session tableSessionInfo, tx tableTransactionInfo) func(error) { var p TableSessionTransactionCommitStartInfo p.Context = c + p.Call = call p.Session = session p.Tx = tx res := t.onSessionTransactionCommit(p) @@ -1643,9 +1722,10 @@ func TableOnSessionTransactionCommit(t *Table, c *context.Context, session table res(p) } } -func TableOnSessionTransactionRollback(t *Table, c *context.Context, session tableSessionInfo, tx tableTransactionInfo) func(error) { +func TableOnSessionTransactionRollback(t *Table, c *context.Context, call call, session tableSessionInfo, tx tableTransactionInfo) func(error) { var p TableSessionTransactionRollbackStartInfo p.Context = c + p.Call = call p.Session = session p.Tx = tx res := t.onSessionTransactionRollback(p) @@ -1671,9 +1751,10 @@ func TableOnPoolSessionRemove(t *Table, session tableSessionInfo) { p.Session = session t.onPoolSessionRemove(p) } -func TableOnPoolSessionNew(t *Table, c *context.Context) func(session tableSessionInfo, _ error) { +func TableOnPoolSessionNew(t *Table, c *context.Context, call call) func(session tableSessionInfo, _ error) { var p TablePoolSessionNewStartInfo p.Context = c + p.Call = call res := t.onPoolSessionNew(p) return func(session tableSessionInfo, e error) { var p TablePoolSessionNewDoneInfo @@ -1682,9 +1763,10 @@ func TableOnPoolSessionNew(t *Table, c *context.Context) func(session tableSessi res(p) } } -func TableOnPoolSessionClose(t *Table, c *context.Context, session tableSessionInfo) func() { +func TableOnPoolSessionClose(t *Table, c *context.Context, call call, session tableSessionInfo) func() { var p TablePoolSessionCloseStartInfo p.Context = c + p.Call = call p.Session = session res := t.onPoolSessionClose(p) return func() { @@ -1692,9 +1774,10 @@ func TableOnPoolSessionClose(t *Table, c *context.Context, session tableSessionI res(p) } } -func TableOnPoolPut(t *Table, c *context.Context, session tableSessionInfo) func(error) { +func TableOnPoolPut(t *Table, c *context.Context, call call, session tableSessionInfo) func(error) { var p TablePoolPutStartInfo p.Context = c + p.Call = call p.Session = session res := t.onPoolPut(p) return func(e error) { @@ -1703,9 +1786,10 @@ func TableOnPoolPut(t *Table, c *context.Context, session tableSessionInfo) func res(p) } } -func TableOnPoolGet(t *Table, c *context.Context) func(session tableSessionInfo, attempts int, _ error) { +func TableOnPoolGet(t *Table, c *context.Context, call call) func(session tableSessionInfo, attempts int, _ error) { var p TablePoolGetStartInfo p.Context = c + p.Call = call res := t.onPoolGet(p) return func(session tableSessionInfo, attempts int, e error) { var p TablePoolGetDoneInfo @@ -1715,9 +1799,10 @@ func TableOnPoolGet(t *Table, c *context.Context) func(session tableSessionInfo, res(p) } } -func TableOnPoolWait(t *Table, c *context.Context) func(session tableSessionInfo, _ error) { +func TableOnPoolWait(t *Table, c *context.Context, call call) func(session tableSessionInfo, _ error) { var p TablePoolWaitStartInfo p.Context = c + p.Call = call res := t.onPoolWait(p) return func(session tableSessionInfo, e error) { var p TablePoolWaitDoneInfo diff --git a/with_test.go b/with_test.go index d04594d76..5469a45ed 100644 --- a/with_test.go +++ b/with_test.go @@ -132,7 +132,7 @@ func TestWithCertificatesCached(t *testing.T) { db, err := newConnectionFromOptions(ctx, append( test.options, - withConnPool(conn.NewPool(config.New())), + withConnPool(conn.NewPool(context.Background(), config.New())), )..., ) require.NoError(t, err)