diff --git a/pkg/transfer/kafka/backend.go b/pkg/transfer/kafka/backend.go index 5845b8b6e..dd1be5462 100644 --- a/pkg/transfer/kafka/backend.go +++ b/pkg/transfer/kafka/backend.go @@ -81,12 +81,7 @@ var NewProducer = func(cluster []string, conf *sarama.Config) (Producer, error) // NewKafkaBackend: func NewKafkaBackend(ctx context.Context, name string) (*Backend, error) { - var ( - conf = config.FromContext(ctx) - err error - ) shipper := config.ShipperConfigFromContext(ctx) - auth := config.NewAuthInfo(shipper) kafkaConfig := shipper.AsKafkaCluster() topic := kafkaConfig.GetTopic() @@ -94,28 +89,8 @@ func NewKafkaBackend(ctx context.Context, name string) (*Backend, error) { partition := kafkaConfig.GetPartition() cluster := fmt.Sprintf("%s:%d", kafkaConfig.GetDomain(), kafkaConfig.GetPort()) - logging.Debugf("prepare to push to cluster: %v, topic: %v", cluster, topic) - producerConfig, err := NewKafkaProducerConfig(conf) - if err != nil { - logging.Errorf("create producer failed:%v", err) - return nil, err - } - userName, err := auth.GetUserName() - if err != nil { - logging.Warnf("%v may not establish connection %v: username", name, define.ErrGetAuth) - } - passWord, err := auth.GetPassword() - if err != nil { - logging.Warnf("%v may not establish connection %v: password", name, define.ErrGetAuth) - } - if userName != "" || passWord != "" && err == nil { - producerConfig.Net.SASL.User = userName - producerConfig.Net.SASL.Password = passWord - producerConfig.Net.SASL.Enable = true - } - ctx, cancelFun := context.WithCancel(ctx) pipeConfig := config.PipelineConfigFromContext(ctx) return &Backend{ @@ -133,7 +108,7 @@ func NewKafkaBackend(ctx context.Context, name string) (*Backend, error) { producer: nil, Topic: topic, Partition: int32(partition), - }, err + }, nil } func (b *Backend) init() error { @@ -158,6 +133,39 @@ func (b *Backend) init() error { return err } + auth := config.NewAuthInfo(shipper) + username, err := auth.GetUserName() + if err != nil { + logging.Warnf("%v may not establish connection %v: username", b.Name, define.ErrGetAuth) + } + password, err := auth.GetPassword() + if err != nil { + logging.Warnf("%v may not establish connection %v: password", b.Name, define.ErrGetAuth) + } + mqConfig := config.MQConfigFromContext(b.ctx) + if username != "" || password != "" && err == nil { + producerConfig.Net.SASL.User = username + producerConfig.Net.SASL.Password = password + producerConfig.Net.SASL.Enable = true + + // 目前仅支持 sha512/sha256 + info := utils.NewMapHelper(mqConfig.AuthInfo) + if mechanisms, ok := info.GetString(optSaslMechanisms); ok { + switch mechanisms { + case "SCRAM-SHA-512": + producerConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + producerConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA512} + } + case "SCRAM-SHA-256": + producerConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + producerConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA256} + } + } + } + } + b.producer, err = NewProducer([]string{cluster}, producerConfig) if err != nil { logging.Errorf("create backend client, cluster=%v, topic=%v, err %s", cluster, b.Topic, err)