Skip to content

Commit

Permalink
feat: transfer 后端 kafka 支持 sasl 鉴权 --story=121214415 (#667)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx authored Dec 13, 2024
1 parent 5b8e6ca commit 3dd8db6
Showing 1 changed file with 34 additions and 26 deletions.
60 changes: 34 additions & 26 deletions pkg/transfer/kafka/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,41 +81,16 @@ var NewProducer = func(cluster []string, conf *sarama.Config) (Producer, error)

// NewKafkaBackend:
func NewKafkaBackend(ctx context.Context, name string) (*Backend, error) {
var (
conf = config.FromContext(ctx)
err error
)
shipper := config.ShipperConfigFromContext(ctx)
auth := config.NewAuthInfo(shipper)
kafkaConfig := shipper.AsKafkaCluster()

topic := kafkaConfig.GetTopic()
// 该参数实际为partition数量,目前分区情况为随机
partition := kafkaConfig.GetPartition()

cluster := fmt.Sprintf("%s:%d", kafkaConfig.GetDomain(), kafkaConfig.GetPort())

logging.Debugf("prepare to push to cluster: %v, topic: %v", cluster, topic)

producerConfig, err := NewKafkaProducerConfig(conf)
if err != nil {
logging.Errorf("create producer failed:%v", err)
return nil, err
}
userName, err := auth.GetUserName()
if err != nil {
logging.Warnf("%v may not establish connection %v: username", name, define.ErrGetAuth)
}
passWord, err := auth.GetPassword()
if err != nil {
logging.Warnf("%v may not establish connection %v: password", name, define.ErrGetAuth)
}
if userName != "" || passWord != "" && err == nil {
producerConfig.Net.SASL.User = userName
producerConfig.Net.SASL.Password = passWord
producerConfig.Net.SASL.Enable = true
}

ctx, cancelFun := context.WithCancel(ctx)
pipeConfig := config.PipelineConfigFromContext(ctx)
return &Backend{
Expand All @@ -133,7 +108,7 @@ func NewKafkaBackend(ctx context.Context, name string) (*Backend, error) {
producer: nil,
Topic: topic,
Partition: int32(partition),
}, err
}, nil
}

func (b *Backend) init() error {
Expand All @@ -158,6 +133,39 @@ func (b *Backend) init() error {
return err
}

auth := config.NewAuthInfo(shipper)
username, err := auth.GetUserName()
if err != nil {
logging.Warnf("%v may not establish connection %v: username", b.Name, define.ErrGetAuth)
}
password, err := auth.GetPassword()
if err != nil {
logging.Warnf("%v may not establish connection %v: password", b.Name, define.ErrGetAuth)
}
mqConfig := config.MQConfigFromContext(b.ctx)
if username != "" || password != "" && err == nil {
producerConfig.Net.SASL.User = username
producerConfig.Net.SASL.Password = password
producerConfig.Net.SASL.Enable = true

// 目前仅支持 sha512/sha256
info := utils.NewMapHelper(mqConfig.AuthInfo)
if mechanisms, ok := info.GetString(optSaslMechanisms); ok {
switch mechanisms {
case "SCRAM-SHA-512":
producerConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
producerConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
case "SCRAM-SHA-256":
producerConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
producerConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
}
}
}

b.producer, err = NewProducer([]string{cluster}, producerConfig)
if err != nil {
logging.Errorf("create backend client, cluster=%v, topic=%v, err %s", cluster, b.Topic, err)
Expand Down

0 comments on commit 3dd8db6

Please sign in to comment.