Skip to content

Commit

Permalink
Merge pull request #887 from ydb-platform/tracing
Browse files Browse the repository at this point in the history
Refactoring for better tracing
  • Loading branch information
asmyasnikov authored Nov 9, 2023
2 parents 3d4ec7a + 935a5ad commit 13c5744
Show file tree
Hide file tree
Showing 45 changed files with 1,307 additions and 321 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added context to some internal methods for better tracing
* Added `trace.FunctionID` helper and `FunctionID` field to trace start info's
* Replaced lazy initialization of ydb clients (table, topic, etc.) to explicit initialization on `ydb.Open` step

## v3.54.1
Expand Down
28 changes: 15 additions & 13 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
schemeConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme/config"
internalScripting "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting"
scriptingConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
internalTable "github.com/ydb-platform/ydb-go-sdk/v3/internal/table"
tableConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicclientinternal"
Expand Down Expand Up @@ -93,7 +94,12 @@ type Driver struct { //nolint:maligned
}

// Close closes Driver and clear resources
func (d *Driver) Close(ctx context.Context) error {
func (d *Driver) Close(ctx context.Context) (finalErr error) {
onDone := trace.DriverOnClose(d.config.Trace(), &ctx, stack.FunctionID(0))
defer func() {
onDone(finalErr)
}()

d.mtx.Lock()
defer d.mtx.Unlock()

Expand All @@ -103,16 +109,16 @@ func (d *Driver) Close(ctx context.Context) error {
}
}()

closers := make([]func(context.Context) error, 0)
closes := make([]func(context.Context) error, 0)
d.childrenMtx.WithLock(func() {
for _, child := range d.children {
closers = append(closers, child.Close)
closes = append(closes, child.Close)
}
d.children = nil
})

closers = append(
closers,
closes = append(
closes,
d.ratelimiter.Close,
d.coordination.Close,
d.scheme.Close,
Expand All @@ -124,8 +130,8 @@ func (d *Driver) Close(ctx context.Context) error {
)

var issues []error
for _, closer := range closers {
if err := closer(ctx); err != nil {
for _, f := range closes {
if err := f(ctx); err != nil {
issues = append(issues, err)
}
}
Expand Down Expand Up @@ -292,11 +298,7 @@ func connect(ctx context.Context, d *Driver) error {
}

onDone := trace.DriverOnInit(
d.config.Trace(),
&ctx,
d.config.Endpoint(),
d.config.Database(),
d.config.Secure(),
d.config.Trace(), &ctx, stack.FunctionID(2), d.config.Endpoint(), d.config.Database(), d.config.Secure(),
)
defer func() {
onDone(err)
Expand All @@ -313,7 +315,7 @@ func connect(ctx context.Context, d *Driver) error {
}

if d.pool == nil {
d.pool = conn.NewPool(d.config)
d.pool = conn.NewPool(ctx, d.config)
}

d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
Expand Down
19 changes: 8 additions & 11 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/repeater"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
var (
address = "ydb:///" + b.driverConfig.Endpoint()
onDone = trace.DriverOnBalancerClusterDiscoveryAttempt(
b.driverConfig.Trace(), &ctx, address,
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), address,
)
endpoints []endpoint.Endpoint
localDC string
Expand Down Expand Up @@ -127,9 +128,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {

func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
onDone := trace.DriverOnBalancerUpdate(
b.driverConfig.Trace(),
&ctx,
b.balancerConfig.DetectlocalDC,
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.balancerConfig.DetectlocalDC,
)
defer func() {
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
Expand Down Expand Up @@ -163,8 +162,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end

func (b *Balancer) Close(ctx context.Context) (err error) {
onDone := trace.DriverOnBalancerClose(
b.driverConfig.Trace(),
&ctx,
b.driverConfig.Trace(), &ctx, stack.FunctionID(0),
)
defer func() {
onDone(err)
Expand All @@ -189,8 +187,7 @@ func New(
) (b *Balancer, finalErr error) {
var (
onDone = trace.DriverOnBalancerInit(
driverConfig.Trace(),
&ctx,
driverConfig.Trace(), &ctx, stack.FunctionID(0),
)
discoveryConfig = discoveryConfig.New(append(opts,
discoveryConfig.With(driverConfig.Common),
Expand Down Expand Up @@ -235,7 +232,8 @@ func New(
}
// run background discovering
if d := discoveryConfig.Interval(); d > 0 {
b.discoveryRepeater = repeater.New(d, b.clusterDiscoveryAttempt,
b.discoveryRepeater = repeater.New(xcontext.WithoutDeadline(ctx),
d, b.clusterDiscoveryAttempt,
repeater.WithName("discovery"),
repeater.WithTrace(b.driverConfig.Trace()),
)
Expand Down Expand Up @@ -320,8 +318,7 @@ func (b *Balancer) connections() *connectionsState {

func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
onDone := trace.DriverOnBalancerChooseEndpoint(
b.driverConfig.Trace(),
&ctx,
b.driverConfig.Trace(), &ctx, stack.FunctionID(0),
)
defer func() {
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/balancer/local_dc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestLocalDCDiscovery(t *testing.T) {
r := &Balancer{
driverConfig: cfg,
balancerConfig: *cfg.Balancer(),
pool: conn.NewPool(cfg),
pool: conn.NewPool(context.Background(), cfg),
discoveryClient: discoveryMock{endpoints: []endpoint.Endpoint{
&mock.Endpoint{AddrField: "a:123", LocationField: "a"},
&mock.Endpoint{AddrField: "b:234", LocationField: "b"},
Expand Down
51 changes: 20 additions & 31 deletions internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/response"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xatomic"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
Expand Down Expand Up @@ -42,8 +43,8 @@ type Conn interface {
Ping(ctx context.Context) error
IsState(states ...State) bool
GetState() State
SetState(State) State
Unban() State
SetState(context.Context, State) State
Unban(context.Context) State
}

type conn struct {
Expand Down Expand Up @@ -93,9 +94,7 @@ func (c *conn) IsState(states ...State) bool {

func (c *conn) park(ctx context.Context) (err error) {
onDone := trace.DriverOnConnPark(
c.config.Trace(),
&ctx,
c.Endpoint(),
c.config.Trace(), &ctx, stack.FunctionID(0), c.Endpoint(),
)
defer func() {
onDone(err)
Expand All @@ -112,7 +111,7 @@ func (c *conn) park(ctx context.Context) (err error) {
return nil
}

err = c.close()
err = c.close(ctx)

if err != nil {
return c.wrapError(err)
Expand All @@ -135,20 +134,20 @@ func (c *conn) Endpoint() endpoint.Endpoint {
return nil
}

func (c *conn) SetState(s State) State {
return c.setState(s)
func (c *conn) SetState(ctx context.Context, s State) State {
return c.setState(ctx, s)
}

func (c *conn) setState(s State) State {
func (c *conn) setState(ctx context.Context, s State) State {
if state := State(c.state.Swap(uint32(s))); state != s {
trace.DriverOnConnStateChange(
c.config.Trace(), c.endpoint.Copy(), state,
c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(), state,
)(s)
}
return s
}

func (c *conn) Unban() State {
func (c *conn) Unban(ctx context.Context) State {
var newState State
c.mtx.RLock()
cc := c.cc
Expand All @@ -159,7 +158,7 @@ func (c *conn) Unban() State {
newState = Offline
}

c.setState(newState)
c.setState(ctx, newState)
return newState
}

Expand All @@ -186,9 +185,7 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
}

onDone := trace.DriverOnConnDial(
c.config.Trace(),
&ctx,
c.endpoint.Copy(),
c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(),
)

defer func() {
Expand Down Expand Up @@ -221,7 +218,7 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
}

c.cc = cc
c.setState(Online)
c.setState(ctx, Online)

return c.cc, nil
}
Expand All @@ -243,13 +240,13 @@ func isAvailable(raw *grpc.ClientConn) bool {
}

// conn must be locked
func (c *conn) close() (err error) {
func (c *conn) close(ctx context.Context) (err error) {
if c.cc == nil {
return nil
}
err = c.cc.Close()
c.cc = nil
c.setState(Offline)
c.setState(ctx, Offline)
return c.wrapError(err)
}

Expand All @@ -268,19 +265,17 @@ func (c *conn) Close(ctx context.Context) (err error) {
}

onDone := trace.DriverOnConnClose(
c.config.Trace(),
&ctx,
c.Endpoint(),
c.config.Trace(), &ctx, stack.FunctionID(0), c.Endpoint(),
)
defer func() {
onDone(err)
}()

c.closed = true

err = c.close()
err = c.close(ctx)

c.setState(Destroyed)
c.setState(ctx, Destroyed)

for _, onClose := range c.onClose {
onClose(c)
Expand All @@ -301,10 +296,7 @@ func (c *conn) Invoke(
issues []trace.Issue
useWrapping = UseWrapping(ctx)
onDone = trace.DriverOnConnInvoke(
c.config.Trace(),
&ctx,
c.endpoint,
trace.Method(method),
c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint, trace.Method(method),
)
cc *grpc.ClientConn
md = metadata.MD{}
Expand Down Expand Up @@ -386,10 +378,7 @@ func (c *conn) NewStream(
) (_ grpc.ClientStream, err error) {
var (
streamRecv = trace.DriverOnConnNewStream(
c.config.Trace(),
&ctx,
c.endpoint.Copy(),
trace.Method(method),
c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(), trace.Method(method),
)
useWrapping = UseWrapping(ctx)
cc *grpc.ClientConn
Expand Down
Loading

0 comments on commit 13c5744

Please sign in to comment.