Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ovh): cache refresh and duplicates processing #4932

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 49 additions & 23 deletions provider/ovh/ovh.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,32 @@ func (p *OVHProvider) Records(ctx context.Context) ([]*endpoint.Endpoint, error)
}

// ApplyChanges applies a given set of changes in a given zone.
func (p *OVHProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) error {
func (p *OVHProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) (err error) {
zones, records, err := p.zonesRecords(ctx)
zonesChangeUniques := map[string]bool{}
if err != nil {
return err
return provider.NewSoftError(err)
}

allChanges := make([]ovhChange, 0, countTargets(changes.Create, changes.UpdateNew, changes.UpdateOld, changes.Delete))
zonesChangeUniques := map[string]bool{}

// Always refresh zones even in case of errors.
defer func() {
log.Debugf("OVH: %d zones will be refreshed", len(zonesChangeUniques))

eg, _ := errgroup.WithContext(ctx)
for zone := range zonesChangeUniques {
zone := zone
eg.Go(func() error { return p.refresh(zone) })
}

if e := eg.Wait(); e != nil && err == nil { // return the error only if there is no error during the changes
err = provider.NewSoftError(e)
}
}()

allChanges := make([]ovhChange, 0, countTargets(changes.Create, changes.UpdateNew, changes.UpdateOld, changes.Delete))
allChanges = append(allChanges, newOvhChange(ovhCreate, changes.Create, zones, records)...)
allChanges = append(allChanges, newOvhChange(ovhCreate, changes.UpdateNew, zones, records)...)

allChanges = append(allChanges, newOvhChange(ovhDelete, changes.UpdateOld, zones, records)...)
allChanges = append(allChanges, newOvhChange(ovhDelete, changes.Delete, zones, records)...)

Expand All @@ -161,27 +175,24 @@ func (p *OVHProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) e
eg.Go(func() error { return p.change(change) })
}
if err := eg.Wait(); err != nil {
return err
return provider.NewSoftError(err)
}

log.Infof("OVH: %d zones will be refreshed", len(zonesChangeUniques))

eg, _ = errgroup.WithContext(ctx)
for zone := range zonesChangeUniques {
zone := zone
eg.Go(func() error { return p.refresh(zone) })
}
if err := eg.Wait(); err != nil {
return err
}
return nil
}

func (p *OVHProvider) refresh(zone string) error {
log.Debugf("OVH: Refresh %s zone", zone)

// Zone has been altered so we invalidate the cache
// so that the next run will reload it.
p.invalidateCache(zone)

p.apiRateLimiter.Take()
return p.client.Post(fmt.Sprintf("/domain/zone/%s/refresh", zone), nil, nil)
if err := p.client.Post(fmt.Sprintf("/domain/zone/%s/refresh", zone), nil, nil); err != nil {
return provider.NewSoftError(err)
}
return nil
}

func (p *OVHProvider) change(change ovhChange) error {
Expand All @@ -201,11 +212,15 @@ func (p *OVHProvider) change(change ovhChange) error {
return nil
}

func (p *OVHProvider) invalidateCache(zone string) {
p.cacheInstance.Delete(zone + "#soa")
}

func (p *OVHProvider) zonesRecords(ctx context.Context) ([]string, []ovhRecord, error) {
var allRecords []ovhRecord
zones, err := p.zones()
if err != nil {
return nil, nil, err
return nil, nil, provider.NewSoftError(err)
}

chRecords := make(chan []ovhRecord, len(zones))
Expand All @@ -215,7 +230,7 @@ func (p *OVHProvider) zonesRecords(ctx context.Context) ([]string, []ovhRecord,
eg.Go(func() error { return p.records(&ctx, &zone, chRecords) })
}
if err := eg.Wait(); err != nil {
return nil, nil, err
return nil, nil, provider.NewSoftError(err)
}
close(chRecords)
for records := range chRecords {
Expand Down Expand Up @@ -270,7 +285,7 @@ func (p *OVHProvider) records(ctx *context.Context, zone *string, records chan<-
}
}

p.cacheInstance.Delete(*zone + "#soa")
p.invalidateCache(*zone)
}
}

Expand Down Expand Up @@ -359,6 +374,10 @@ func ovhGroupByNameAndType(records []ovhRecord) []*endpoint.Endpoint {
}

func newOvhChange(action int, endpoints []*endpoint.Endpoint, zones []string, records []ovhRecord) []ovhChange {
// Copy the records because we need to mutate the list.
newRecords := make([]ovhRecord, len(records))
copy(newRecords, records)

zoneNameIDMapper := provider.ZoneIDName{}
ovhChanges := make([]ovhChange, 0, countTargets(endpoints))
for _, zone := range zones {
Expand Down Expand Up @@ -390,9 +409,16 @@ func newOvhChange(action int, endpoints []*endpoint.Endpoint, zones []string, re
if e.RecordTTL.IsConfigured() {
change.TTL = int64(e.RecordTTL)
}
for _, record := range records {
if record.Zone == change.Zone && record.SubDomain == change.SubDomain && record.FieldType == change.FieldType && record.Target == change.Target {
change.ID = record.ID

// The Zone might have multiple records with the same target. In order to avoid applying the action to the
// same OVH record, we remove a record from the list when a match is found.
for i := 0; i < len(newRecords); i++ {
rec := newRecords[i]
if rec.Zone == change.Zone && rec.SubDomain == change.SubDomain && rec.FieldType == change.FieldType && rec.Target == change.Target {
change.ID = rec.ID
// Deleting this record from the list to avoid retargetting it later if a change with a similar target exists.
newRecords = append(newRecords[:i], newRecords[i+1:]...)
break
}
}
ovhChanges = append(ovhChanges, change)
Expand Down
7 changes: 6 additions & 1 deletion provider/ovh/ovh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,18 @@ func TestOvhNewChange(t *testing.T) {

// Delete change
endpoints = []*endpoint.Endpoint{
{DNSName: "ovh.example.net", RecordType: "A", Targets: []string{"203.0.113.42"}},
{DNSName: "ovh.example.net", RecordType: "A", Targets: []string{"203.0.113.42", "203.0.113.42", "203.0.113.42"}},
}
records := []ovhRecord{
{ID: 42, Zone: "example.net", ovhRecordFields: ovhRecordFields{FieldType: "A", SubDomain: "ovh", Target: "203.0.113.42"}},
{ID: 43, Zone: "example.net", ovhRecordFields: ovhRecordFields{FieldType: "A", SubDomain: "ovh", Target: "203.0.113.42"}},
{ID: 44, Zone: "example.net", ovhRecordFields: ovhRecordFields{FieldType: "A", SubDomain: "ovh", Target: "203.0.113.42"}},
}
changes = newOvhChange(ovhDelete, endpoints, []string{"example.net"}, records)
assert.ElementsMatch(changes, []ovhChange{
{Action: ovhDelete, ovhRecord: ovhRecord{ID: 42, Zone: "example.net", ovhRecordFields: ovhRecordFields{SubDomain: "ovh", FieldType: "A", TTL: ovhDefaultTTL, Target: "203.0.113.42"}}},
{Action: ovhDelete, ovhRecord: ovhRecord{ID: 43, Zone: "example.net", ovhRecordFields: ovhRecordFields{SubDomain: "ovh", FieldType: "A", TTL: ovhDefaultTTL, Target: "203.0.113.42"}}},
{Action: ovhDelete, ovhRecord: ovhRecord{ID: 44, Zone: "example.net", ovhRecordFields: ovhRecordFields{SubDomain: "ovh", FieldType: "A", TTL: ovhDefaultTTL, Target: "203.0.113.42"}}},
})
}

Expand Down Expand Up @@ -368,6 +372,7 @@ func TestOvhApplyChanges(t *testing.T) {
client.On("Get", "/domain/zone").Return([]string{"example.net"}, nil).Once()
client.On("Get", "/domain/zone/example.net/record").Return([]uint64{}, nil).Once()
client.On("Post", "/domain/zone/example.net/record", ovhRecordFields{SubDomain: "", FieldType: "A", TTL: 10, Target: "203.0.113.42"}).Return(nil, ovh.ErrAPIDown).Once()
client.On("Post", "/domain/zone/example.net/refresh", nil).Return(nil, nil).Once()
assert.Error(provider.ApplyChanges(context.TODO(), &plan.Changes{
Create: []*endpoint.Endpoint{
{DNSName: ".example.net", RecordType: "A", RecordTTL: 10, Targets: []string{"203.0.113.42"}},
Expand Down
Loading