Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opt #8

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Controller struct {
func NewController(consulAddress string) *Controller {
controller := &Controller{
consulAddress: consulAddress,
pushChannel: make(chan *ChangeEvent),
pushChannel: make(chan *ChangeEvent, 1),
}
return controller
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *Controller) mainLoop(stop <-chan struct{}) {
if err != nil {
log.Errorf("Failed to synchronize consul services to Istio: %v", err)
// Retry if failed
s.pushChannel <- &ChangeEvent{}
// s.pushChannel <- &ChangeEvent{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

重试是否应该保留?

}
debouncedEvents = 0
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/serviceregistry/consul/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *Controller) initCache() error {
if err != nil {
return err
}

c.servicesList = nil
for serviceName := range consulServices {
// get endpoints of a service from consul
endpoints, err := c.getCatalogService(serviceName, nil)
Expand Down
31 changes: 29 additions & 2 deletions pkg/serviceregistry/consul/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func convertServiceEntry(service string, endpoints []*api.CatalogService) *istio
for _, endpoint := range endpoints {
name = endpoint.ServiceName

port := convertPort(endpoint.ServicePort, endpoint.ServiceMeta[protocolTagName])
protoName := getProtoValFromServiceTagsOrMeta(endpoint.ServiceTags, endpoint.ServiceMeta[protocolTagName])
port := convertPort(endpoint.ServicePort, protoName)

if svcPort, exists := ports[port.Number]; exists && svcPort.Protocol != port.Protocol {
log.Warnf("Service %v has two instances on same port %v but different protocols (%v, %v)",
Expand Down Expand Up @@ -83,7 +84,9 @@ func convertWorkloadEntry(endpoint *api.CatalogService) *istio.WorkloadEntry {
if addr == "" {
addr = endpoint.Address
}
port := convertPort(endpoint.ServicePort, endpoint.ServiceMeta[protocolTagName])

protoName := getProtoValFromServiceTagsOrMeta(endpoint.ServiceTags, endpoint.ServiceMeta[protocolTagName])
port := convertPort(endpoint.ServicePort, protoName)

return &istio.WorkloadEntry{
Address: addr,
Expand Down Expand Up @@ -135,3 +138,27 @@ func convertProtocol(name string) string {
}
return string(p)
}

//get the protocol value:get from meta first, otherwise get from tags
func getProtoValFromServiceTagsOrMeta(serviceTags []string, serviceMetaProtocol string) string {
if len(serviceMetaProtocol) > 0 {
log.Debugf("getProtoValFromServiceTagsOrMeta,serviceMetaProtocol:%v", serviceMetaProtocol)
return serviceMetaProtocol
}
defaultProto := "http"
if len(serviceTags) == 0 {
return defaultProto
}
for _, v := range serviceTags {
isExist := strings.Contains(v, "proto")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议使用 protocol, proto 是 “原型” 的意思

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if isExist {
strs := strings.Split(v, "=")
if len(strs) == 2 {
return strs[1]
} else {
log.Warnf("service tags proto is invalid")
}
}
}
return defaultProto
}
6 changes: 5 additions & 1 deletion pkg/serviceregistry/consul/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ type consulMonitor struct {
ServiceChangeHandlers []ServiceChangeHandler
}

const blockQueryWaitTime time.Duration = 10 * time.Minute
const (
blockQueryWaitTime time.Duration = 10 * time.Minute
updateServiceRecordWaitTime time.Duration = 10 * time.Second
)

// NewConsulMonitor watches for changes in Consul services and CatalogServices
func NewConsulMonitor(client *api.Client) Monitor {
Expand Down Expand Up @@ -72,6 +75,7 @@ func (m *consulMonitor) watchConsul(stop <-chan struct{}) {
} else if consulWaitIndex != queryMeta.LastIndex {
consulWaitIndex = queryMeta.LastIndex
m.updateServiceRecord()
time.Sleep(updateServiceRecordWaitTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个应该是在 consul 端修复吧? 对于正常的服务变化来说,每次变化后等待 10 分钟再同步会不会延迟太大了?
我建议缺省值设置小一点,例如 1 分钟。但可以提供一个环境变量参数,在 Consul 的故障没有修复之前,你们可以把这个环境变量参数设置大一些,例如 10 分钟。

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的

}
}
}
Expand Down