Skip to content

Commit

Permalink
Lookout API: Take empty Kubernetes (Pod) IDs into account when proces…
Browse files Browse the repository at this point in the history
…sing failed events (#499)
  • Loading branch information
carlocamurri authored Jan 25, 2021
1 parent 76175ed commit faa876f
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 11 deletions.
4 changes: 0 additions & 4 deletions internal/lookout/repository/job_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package repository
import (
"context"
"database/sql"
"fmt"
"sort"

"github.com/doug-martin/goqu/v9"
Expand Down Expand Up @@ -101,9 +100,6 @@ func (r *SQLJobRepository) createJobSetsDataset(opts *lookout.GetJobSetsRequest)
goqu.I("finished_counts.jobs_finished"),
goqu.I("finished_counts.jobs_succeeded"))

qq, _, _ := ds.ToSQL()
fmt.Println(qq)

return ds
}

Expand Down
22 changes: 15 additions & 7 deletions internal/lookout/repository/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/doug-martin/goqu/v9"
_ "github.com/lib/pq"

"github.com/G-Research/armada/internal/common/util"
"github.com/G-Research/armada/pkg/api"
)

Expand Down Expand Up @@ -54,7 +55,7 @@ func (r *SQLJobStore) RecordJobPriorityChange(event *api.JobReprioritizedEvent)
func (r *SQLJobStore) RecordJobPending(event *api.JobPendingEvent) error {
fields := []string{"created"}
values := []interface{}{event.Created}
return r.updateJobRun(event, fields, values)
return r.updateJobRun(event, event.KubernetesId, fields, values)
}

func (r *SQLJobStore) RecordJobRunning(event *api.JobRunningEvent) error {
Expand All @@ -64,7 +65,7 @@ func (r *SQLJobStore) RecordJobRunning(event *api.JobRunningEvent) error {
fields = append(fields, "node")
values = append(values, event.NodeName)
}
return r.updateJobRun(event, fields, values)
return r.updateJobRun(event, event.KubernetesId, fields, values)
}

func (r *SQLJobStore) RecordJobSucceeded(event *api.JobSucceededEvent) error {
Expand All @@ -74,12 +75,19 @@ func (r *SQLJobStore) RecordJobSucceeded(event *api.JobSucceededEvent) error {
fields = append(fields, "node")
values = append(values, event.NodeName)
}
return r.updateJobRun(event, fields, values)
return r.updateJobRun(event, event.KubernetesId, fields, values)
}

func (r *SQLJobStore) RecordJobFailed(event *api.JobFailedEvent) error {
fields := []string{"finished", "succeeded", "error"}
values := []interface{}{event.Created, false, fmt.Sprintf("%.2048s", event.Reason)}

// If job fails before a pod is created, we generate a new ULID
k8sId := event.KubernetesId
if k8sId == "" {
k8sId = util.NewULID() + "-nopod"
}

if event.NodeName != "" {
fields = append(fields, "node")
values = append(values, event.NodeName)
Expand All @@ -89,18 +97,18 @@ func (r *SQLJobStore) RecordJobFailed(event *api.JobFailedEvent) error {
for name, code := range event.ExitCodes {
_, err := upsertCombinedKey(r.db, "job_run_container",
[]string{"run_id", "container_name"}, []string{"exit_code"},
[]interface{}{event.KubernetesId, name, code})
[]interface{}{k8sId, name, code})
if err != nil {
return err
}
}

return r.updateJobRun(event, fields, values)
return r.updateJobRun(event, k8sId, fields, values)
}

func (r *SQLJobStore) updateJobRun(event api.KubernetesEvent, fields []string, values []interface{}) error {
func (r *SQLJobStore) updateJobRun(event api.KubernetesEvent, k8sId string, fields []string, values []interface{}) error {
_, err := upsert(r.db, "job_run",
"run_id", append([]string{"job_id", "cluster"}, fields...),
append([]interface{}{event.GetKubernetesId(), event.GetJobId(), event.GetClusterId()}, values...))
append([]interface{}{k8sId, event.GetJobId(), event.GetClusterId()}, values...))
return err
}
71 changes: 71 additions & 0 deletions internal/lookout/repository/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,77 @@ func Test_RecordLongError(t *testing.T) {
})
}

func Test_EmptyRunId(t *testing.T) {
withDatabase(t, func(db *goqu.Database) {
jobStore := NewSQLJobStore(db)
jobRepo := NewSQLJobRepository(db, &DefaultClock{})

job_1 := &api.Job{
Id: util.NewULID(),
JobSetId: "job-set",
Queue: "queue",
Namespace: "nameSpace",
Labels: nil,
Annotations: nil,
Owner: "user",
Priority: 0,
PodSpec: &v1.PodSpec{},
Created: time.Now(),
}
err := jobStore.RecordJob(job_1)
assert.NoError(t, err)

err = jobStore.RecordJobFailed(&api.JobFailedEvent{
JobId: job_1.Id,
JobSetId: job_1.JobSetId,
Queue: job_1.Queue,
Created: time.Now(),
ClusterId: cluster,
Reason: "error",
ExitCodes: nil,
KubernetesId: "",
NodeName: "node",
})
assert.NoError(t, err)

job_2 := &api.Job{
Id: util.NewULID(),
JobSetId: "job-set-2",
Queue: "queue-2",
Namespace: "nameSpace-2",
Labels: nil,
Annotations: nil,
Owner: "user",
Priority: 0,
PodSpec: &v1.PodSpec{},
Created: time.Now(),
}
err = jobStore.RecordJob(job_2)
assert.NoError(t, err)

err = jobStore.RecordJobFailed(&api.JobFailedEvent{
JobId: job_2.Id,
JobSetId: job_2.JobSetId,
Queue: job_2.Queue,
Created: time.Now(),
ClusterId: cluster,
Reason: "other error",
ExitCodes: nil,
KubernetesId: "",
NodeName: "node-2",
})
assert.NoError(t, err)

receivedJob1, err := jobRepo.GetJob(ctx, job_1.Id)
assert.NoError(t, err)
assert.Equal(t, JobStates.Failed, receivedJob1.JobState)

receivedJob2, err := jobRepo.GetJob(ctx, job_2.Id)
assert.NoError(t, err)
assert.Equal(t, JobStates.Failed, receivedJob2.JobState)
})
}

func count(t *testing.T, db *goqu.Database, query string) int {
r, err := db.Query(query)
assert.NoError(t, err)
Expand Down

0 comments on commit faa876f

Please sign in to comment.