From cd68dcab6e4697c8175f74a42e572aeb2fb0bb50 Mon Sep 17 00:00:00 2001 From: wisdom <153594697@qq.com> Date: Mon, 11 Apr 2022 09:44:22 +0800 Subject: [PATCH 1/2] feat:avoid blocking --- pkg/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller.go b/pkg/controller.go index a3e6b2d..cdedfec 100644 --- a/pkg/controller.go +++ b/pkg/controller.go @@ -62,7 +62,7 @@ type Controller struct { func NewController(consulAddress string) *Controller { controller := &Controller{ consulAddress: consulAddress, - pushChannel: make(chan *ChangeEvent), + pushChannel: make(chan *ChangeEvent, 1), } return controller } From 322010ce938443bff25409c88ef4469d21c8397f Mon Sep 17 00:00:00 2001 From: wisdom <153594697@qq.com> Date: Mon, 11 Apr 2022 10:43:06 +0800 Subject: [PATCH 2/2] feat: the protocol value:get from meta first, otherwise get from tags; --- pkg/controller.go | 2 +- pkg/serviceregistry/consul/controller.go | 2 +- pkg/serviceregistry/consul/conversion.go | 31 ++++++++++++++++++++++-- pkg/serviceregistry/consul/monitor.go | 6 ++++- 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/pkg/controller.go b/pkg/controller.go index cdedfec..ddefb60 100644 --- a/pkg/controller.go +++ b/pkg/controller.go @@ -129,7 +129,7 @@ func (s *Controller) mainLoop(stop <-chan struct{}) { if err != nil { log.Errorf("Failed to synchronize consul services to Istio: %v", err) // Retry if failed - s.pushChannel <- &ChangeEvent{} + // s.pushChannel <- &ChangeEvent{} } debouncedEvents = 0 } diff --git a/pkg/serviceregistry/consul/controller.go b/pkg/serviceregistry/consul/controller.go index 29b9074..ac63f71 100644 --- a/pkg/serviceregistry/consul/controller.go +++ b/pkg/serviceregistry/consul/controller.go @@ -85,7 +85,7 @@ func (c *Controller) initCache() error { if err != nil { return err } - + c.servicesList = nil for serviceName := range consulServices { // get endpoints of a service from consul endpoints, err := c.getCatalogService(serviceName, nil) diff --git a/pkg/serviceregistry/consul/conversion.go b/pkg/serviceregistry/consul/conversion.go index 8e4a30e..5142cfd 100644 --- a/pkg/serviceregistry/consul/conversion.go +++ b/pkg/serviceregistry/consul/conversion.go @@ -42,7 +42,8 @@ func convertServiceEntry(service string, endpoints []*api.CatalogService) *istio for _, endpoint := range endpoints { name = endpoint.ServiceName - port := convertPort(endpoint.ServicePort, endpoint.ServiceMeta[protocolTagName]) + protoName := getProtoValFromServiceTagsOrMeta(endpoint.ServiceTags, endpoint.ServiceMeta[protocolTagName]) + port := convertPort(endpoint.ServicePort, protoName) if svcPort, exists := ports[port.Number]; exists && svcPort.Protocol != port.Protocol { log.Warnf("Service %v has two instances on same port %v but different protocols (%v, %v)", @@ -83,7 +84,9 @@ func convertWorkloadEntry(endpoint *api.CatalogService) *istio.WorkloadEntry { if addr == "" { addr = endpoint.Address } - port := convertPort(endpoint.ServicePort, endpoint.ServiceMeta[protocolTagName]) + + protoName := getProtoValFromServiceTagsOrMeta(endpoint.ServiceTags, endpoint.ServiceMeta[protocolTagName]) + port := convertPort(endpoint.ServicePort, protoName) return &istio.WorkloadEntry{ Address: addr, @@ -135,3 +138,27 @@ func convertProtocol(name string) string { } return string(p) } + +//get the protocol value:get from meta first, otherwise get from tags +func getProtoValFromServiceTagsOrMeta(serviceTags []string, serviceMetaProtocol string) string { + if len(serviceMetaProtocol) > 0 { + log.Debugf("getProtoValFromServiceTagsOrMeta,serviceMetaProtocol:%v", serviceMetaProtocol) + return serviceMetaProtocol + } + defaultProto := "http" + if len(serviceTags) == 0 { + return defaultProto + } + for _, v := range serviceTags { + isExist := strings.Contains(v, "proto") + if isExist { + strs := strings.Split(v, "=") + if len(strs) == 2 { + return strs[1] + } else { + log.Warnf("service tags proto is invalid") + } + } + } + return defaultProto +} diff --git a/pkg/serviceregistry/consul/monitor.go b/pkg/serviceregistry/consul/monitor.go index ba06982..5836c09 100644 --- a/pkg/serviceregistry/consul/monitor.go +++ b/pkg/serviceregistry/consul/monitor.go @@ -37,7 +37,10 @@ type consulMonitor struct { ServiceChangeHandlers []ServiceChangeHandler } -const blockQueryWaitTime time.Duration = 10 * time.Minute +const ( + blockQueryWaitTime time.Duration = 10 * time.Minute + updateServiceRecordWaitTime time.Duration = 10 * time.Second +) // NewConsulMonitor watches for changes in Consul services and CatalogServices func NewConsulMonitor(client *api.Client) Monitor { @@ -72,6 +75,7 @@ func (m *consulMonitor) watchConsul(stop <-chan struct{}) { } else if consulWaitIndex != queryMeta.LastIndex { consulWaitIndex = queryMeta.LastIndex m.updateServiceRecord() + time.Sleep(updateServiceRecordWaitTime) } } }