Skip to content

Commit

Permalink
fix: 修复 mqconfig 加载不生效的问题 (#101)
Browse files Browse the repository at this point in the history
* fix: 修复 mqconfig 加载不生效的问题

* chore: 代码优化

* chore: 代码优化

* chore: 修改 json tag
  • Loading branch information
chenjiandongx authored Dec 18, 2023
1 parent 8d355af commit 92f022c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/transfer/json/std_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

//go:build jsonstd
//go:build !jsonsonic

package json

Expand Down
20 changes: 10 additions & 10 deletions pkg/transfer/pipeline/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ type BulkManager interface {
// Bulk defaults
var (
BulkDefaultBufferSize = 2000
BulkDefaultFlushInterval = 1 * time.Second
BulkDefaultFlushInterval = 2 * time.Second
BulkDefaultFlushRetries = 3
BulkDefaultConcurrency int64 = 25
BulkDefaultConcurrency int64 = 32
BulkDefaultMaxConcurrency int64 = 10000
)

Expand Down Expand Up @@ -159,22 +159,22 @@ type BulkBackendAdapter struct {
func getBufferSizeAndFlushInterval(ctx context.Context, name string) (int, time.Duration) {
bufferSize := BulkDefaultBufferSize
flushInterval := BulkDefaultFlushInterval
shipperConfig, ok := ctx.Value(define.ContextShipperKey).(*config.MetaClusterInfo)
if !ok {
logging.Warn("get shipper config failed,use default bufferSize and flushInterval")
mqConfig := config.MQConfigFromContext(ctx)
if mqConfig == nil {
return bufferSize, flushInterval
}
if shipperConfig.BatchSize != 0 {
bufferSize = shipperConfig.BatchSize

if mqConfig.BatchSize != 0 {
bufferSize = mqConfig.BatchSize
}
if shipperConfig.FlushInterval != "" {
interval, err := time.ParseDuration(shipperConfig.FlushInterval)
if mqConfig.FlushInterval != "" {
interval, err := time.ParseDuration(mqConfig.FlushInterval)
if err == nil {
flushInterval = interval
}
}
logging.Debugf("backend:%s use bufferSize:%d and flushInterval:%s", name, bufferSize, flushInterval)

logging.Debugf("backend:%s use bufferSize:%d and flushInterval:%s", name, bufferSize, flushInterval)
return bufferSize, flushInterval
}

Expand Down

0 comments on commit 92f022c

Please sign in to comment.