diff --git a/README.md b/README.md index 3509f6f4cdb..c7ace1edb35 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ Armada adheres to the CNCF [Code of Conduct](https://github.com/cncf/foundation/ For an overview of the architecture and design of Armada, and instructions for submitting jobs, see: + - [Components overview](./docs/design/relationships_diagram.md) - [Scheduler](./docs/design/scheduler.md) - [Architecture](./docs/design/architecture.md) diff --git a/build/queryapi/Dockerfile b/build/queryapi/Dockerfile new file mode 100644 index 00000000000..36c5cc75ad2 --- /dev/null +++ b/build/queryapi/Dockerfile @@ -0,0 +1,13 @@ +FROM alpine:3.18.3 + +RUN addgroup -S -g 2000 armada && adduser -S -u 1000 armada -G armada + +USER armada + +COPY ./queryapi /app/ + +COPY /config/ /app/config/queyapi + +WORKDIR /app + +ENTRYPOINT ["./queryapi"] diff --git a/build_goreleaser/queryapi/Dockerfile b/build_goreleaser/queryapi/Dockerfile new file mode 100644 index 00000000000..1af6f375af2 --- /dev/null +++ b/build_goreleaser/queryapi/Dockerfile @@ -0,0 +1,15 @@ +ARG BASE_IMAGE=alpine:3.18.3 +FROM ${BASE_IMAGE} +LABEL org.opencontainers.image.title=queryapi +LABEL org.opencontainers.image.description="queryapi" +LABEL org.opencontainers.image.url=https://hub.docker.com/r/gresearchdev/queryapi + +RUN addgroup -S -g 2000 armada && adduser -S -u 1000 armada -G armada +USER armada + +COPY queryapi /app/ +COPY config/queriapi/config.yaml /app/config/queryapi/config.yaml + +WORKDIR /app + +ENTRYPOINT ["./queryapi"] diff --git a/cmd/queryapi/main.go b/cmd/queryapi/main.go new file mode 100644 index 00000000000..9dbaee9d0a0 --- /dev/null +++ b/cmd/queryapi/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "fmt" + "os" + + "github.com/spf13/pflag" + "github.com/spf13/viper" + + "github.com/armadaproject/armada/internal/common" + "github.com/armadaproject/armada/internal/queryapi" +) + +const ( + CustomConfigLocation = "config" +) + +func init() { + pflag.StringSlice( + CustomConfigLocation, + []string{}, + "Fully qualified path to application configuration file (for multiple config files repeat this arg or separate paths with commas)", + ) + pflag.Parse() +} + +func main() { + common.ConfigureLogging() + common.BindCommandlineArguments() + + var config queryapi.Configuration + userSpecifiedConfigs := viper.GetStringSlice(CustomConfigLocation) + + common.LoadConfig(&config, "./config/queryapi", userSpecifiedConfigs) + if err := queryapi.Run(config); err != nil { + fmt.Println(err) + os.Exit(-1) + } +} diff --git a/config/queryapi/config.yaml b/config/queryapi/config.yaml new file mode 100644 index 00000000000..366549097b3 --- /dev/null +++ b/config/queryapi/config.yaml @@ -0,0 +1,21 @@ +http: + port: 8080 +grpc: + port: 50052 + keepaliveParams: + maxConnectionIdle: 5m + time: 120s + timeout: 20s + keepaliveEnforcementPolicy: + minTime: 10s + permitWithoutStream: true + tls: + enabled: false +postgres: + connection: + host: postgres + port: 5432 + user: postgres + password: psw + dbname: postgres + sslmode: disable diff --git a/deployment/queryapi/Chart.yaml b/deployment/queryapi/Chart.yaml new file mode 100644 index 00000000000..8e4754af4c0 --- /dev/null +++ b/deployment/queryapi/Chart.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +description: A helm chart for Armada Query API component +name: armada-query-api +version: 0.0.0-latest +appVersion: 0.0.0-latest diff --git a/deployment/queryapi/templates/_helpers.tpl b/deployment/queryapi/templates/_helpers.tpl new file mode 100644 index 00000000000..5d4cc9e565a --- /dev/null +++ b/deployment/queryapi/templates/_helpers.tpl @@ -0,0 +1,43 @@ + +{{- define "queryapi.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}} +{{- end -}} + +{{- define "queryapi.config.name" -}} +{{- printf "%s-%s" ( include "queryapi.name" .) "config" -}} +{{- end }} + +{{- define "queryapi.config.filename" -}} +{{- printf "%s%s" ( include "queryapi.config.name" .) ".yaml" -}} +{{- end }} + +{{- define "queryapi.users.name" -}} +{{- printf "%s-%s" ( include "queryapi.name" .) "users" -}} +{{- end }} + +{{- define "queryapi.users.filename" -}} +{{- printf "%s%s" ( include "queryapi.users.name" .) ".yaml" -}} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "queryapi.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}} +{{- end -}} + +{{- define "queryapi.labels.identity" -}} +app: {{ include "queryapi.name" . }} +{{- end -}} + +{{/* +Common labels +*/}} +{{- define "queryapi.labels.all" -}} +{{ include "queryapi.labels.identity" . }} +chart: {{ include "queryapi.chart" . }} +release: {{ .Release.Name }} +{{- if .Values.additionalLabels }} +{{ toYaml .Values.additionalLabels }} +{{- end }} +{{- end -}} diff --git a/deployment/queryapi/templates/deployment.yaml b/deployment/queryapi/templates/deployment.yaml new file mode 100644 index 00000000000..87eb1f7fb5b --- /dev/null +++ b/deployment/queryapi/templates/deployment.yaml @@ -0,0 +1,109 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "queryapi.name" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "queryapi.labels.all" . | nindent 4 }} +spec: + replicas: {{ .Values.replicas }} + selector: + matchLabels: + {{- include "queryapi.labels.identity" . | nindent 6 }} + {{- if .Values.strategy }} + strategy: + {{- toYaml .Values.strategy | nindent 4 }} + {{- end }} + template: + metadata: + name: {{ include "queryapi.name" . }} + annotations: + checksum/config: {{ include (print $.Template.BasePath "/secret.yaml") . | sha256sum }} + labels: + {{- include "queryapi.labels.all" . | nindent 8 }} + spec: + terminationGracePeriodSeconds: {{ .Values.terminationGracePeriodSeconds }} + serviceAccountName: {{ .Values.customServiceAccount | default (include "queryapi.name" .) }} + securityContext: + runAsUser: 1000 + runAsGroup: 2000 + {{- if .Values.tolerations }} + tolerations: + {{- toYaml .Values.tolerations | nindent 8 }} + {{- end }} + containers: + - name: queryapi + imagePullPolicy: IfNotPresent + image: {{ .Values.image.repository }}:{{ required "A value is required for .Values.image.tag" .Values.image.tag }} + args: + - --config + - /config/application_config.yaml + {{- if .Values.env }} + env: + {{- toYaml .Values.env | nindent 12 -}} + {{- end }} + resources: + {{- toYaml .Values.resources | nindent 12 }} + ports: + - containerPort: {{ .Values.applicationConfig.grpcPort }} + protocol: TCP + name: grpc + - containerPort: {{ .Values.applicationConfig.metricsPort }} + protocol: TCP + name: metrics + - containerPort: {{ .Values.applicationConfig.httpPort }} + protocol: TCP + name: web + volumeMounts: + - name: user-config + mountPath: /config/application_config.yaml + subPath: {{ include "queryapi.config.filename" . }} + readOnly: true + {{- if .Values.applicationConfig.grpc.tls.enabled }} + - name: tls-certs + mountPath: /certs + readOnly: true + {{- end }} + {{- if .Values.additionalVolumeMounts }} + {{- toYaml .Values.additionalVolumeMounts | nindent 12 -}} + {{- end }} + securityContext: + allowPrivilegeEscalation: false + readinessProbe: + httpGet: + path: /health + port: web + initialDelaySeconds: 5 + timeoutSeconds: 5 + failureThreshold: 2 + livenessProbe: + httpGet: + path: /health + port: web + initialDelaySeconds: 10 + timeoutSeconds: 10 + failureThreshold: 3 + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: app + operator: In + values: + - {{ include "queryapi.name" . }} + topologyKey: kubernetes.io/hostname + volumes: + - name: user-config + secret: + secretName: {{ include "queryapi.config.name" . }} + {{- if .Values.applicationConfig.grpc.tls.enabled }} + - name: tls-certs + secret: + secretName: queryapi-service-tls + {{- end }} + {{- if .Values.additionalVolumes }} + {{- toYaml .Values.additionalVolumes | nindent 8 }} + {{- end }} diff --git a/deployment/queryapi/templates/ingress.yaml b/deployment/queryapi/templates/ingress.yaml new file mode 100644 index 00000000000..6ce3d4edf2c --- /dev/null +++ b/deployment/queryapi/templates/ingress.yaml @@ -0,0 +1,43 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ include "queryapi.name" . }} + namespace: {{ .Release.Namespace }} + annotations: + kubernetes.io/ingress.class: {{ required "A value is required for .Values.ingressClass" .Values.ingressClass }} + nginx.ingress.kubernetes.io/ssl-redirect: "true" + {{- if .Values.applicationConfig.grpc.tls.enabled }} + nginx.ingress.kubernetes.io/backend-protocol: "GRPCS" + nginx.ingress.kubernetes.io/ssl-passthrough: "true" + {{- else }} + nginx.ingress.kubernetes.io/backend-protocol: "GRPC" + {{- end }} + certmanager.k8s.io/cluster-issuer: {{ required "A value is required for .Values.clusterIssuer" .Values.clusterIssuer }} + cert-manager.io/cluster-issuer: {{ required "A value is required for .Values.clusterIssuer" .Values.clusterIssuer }} + {{- if .Values.ingress.annotations }} + {{- toYaml .Values.ingress.annotations | nindent 4 }} + {{- end }} + labels: + {{- include "queryapi.labels.all" . | nindent 4 }} + {{- if .Values.ingress.labels }} + {{- toYaml .Values.ingress.labels | nindent 4 }} + {{- end }} +spec: + rules: + {{- $root := . -}} + {{ range required "A value is required for .Values.hostnames" .Values.hostnames }} + - host: {{ . }} + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: {{ include "queryapi.name" $root }} + port: + number: {{ $root.Values.applicationConfig.grpcPort }} + {{ end }} + tls: + - hosts: + {{- toYaml .Values.hostnames | nindent 8 }} + secretName: queryapi-service-tls diff --git a/deployment/queryapi/templates/secret.yaml b/deployment/queryapi/templates/secret.yaml new file mode 100644 index 00000000000..0f6b536da0e --- /dev/null +++ b/deployment/queryapi/templates/secret.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Secret +metadata: + name: {{ include "queryapi.config.name" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "queryapi.labels.all" . | nindent 4 }} +type: Opaque +data: + {{ include "queryapi.config.filename" . }}: | +{{- if .Values.applicationConfig }} +{{ toYaml .Values.applicationConfig | b64enc | indent 4 }} +{{- end }} diff --git a/deployment/queryapi/templates/service.yaml b/deployment/queryapi/templates/service.yaml new file mode 100644 index 00000000000..2d9c12cd01f --- /dev/null +++ b/deployment/queryapi/templates/service.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ include "queryapi.name" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "queryapi.labels.all" . | nindent 4 }} +spec: + {{- if .Values.nodePort }} + type: NodePort + {{- end }} + selector: + {{- include "queryapi.labels.identity" . | nindent 4 }} + ports: + - name: grpc + protocol: TCP + port: {{ .Values.applicationConfig.grpcPort }} + {{- if .Values.nodePort }} + nodePort: {{ .Values.nodePort }} + {{- end }} + - name: web + protocol: TCP + port: {{ .Values.applicationConfig.httpPort }} + {{- if .Values.httpNodePort }} + nodePort: {{ .Values.httpNodePort }} + {{- end }} + - name: metrics + protocol: TCP + port: {{ .Values.applicationConfig.metricsPort }} diff --git a/deployment/queryapi/templates/serviceaccount.yaml b/deployment/queryapi/templates/serviceaccount.yaml new file mode 100644 index 00000000000..fbdf847aeca --- /dev/null +++ b/deployment/queryapi/templates/serviceaccount.yaml @@ -0,0 +1,12 @@ +{{ if not .Values.customServiceAccount }} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ include "queryapi.name" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "queryapi.labels.all" . | nindent 4 }} +{{ if .Values.serviceAccount }} +{{ toYaml .Values.serviceAccount }} +{{ end }} +{{ end }} diff --git a/deployment/queryapi/values.yaml b/deployment/queryapi/values.yaml new file mode 100644 index 00000000000..00d6885f1e3 --- /dev/null +++ b/deployment/queryapi/values.yaml @@ -0,0 +1,42 @@ +image: + repository: gresearchdev/armada-query-api + tag: 0.0.0-latest +resources: + limits: + memory: 1Gi + cpu: 300m + requests: + memory: 512Mi + cpu: 200m +# -- Tolerations +tolerations: [] +additionalLabels: {} +additionalClusterRoleBindings: [] +additionalVolumeMounts: [] +additionalVolumes: [] +terminationGracePeriodSeconds: 5 +replicas: 1 +strategy: + rollingUpdate: + maxUnavailable: 1 + type: RollingUpdate +ingress: + annotations: {} + labels: {} +prometheus: + enabled: false + labels: {} + scrapeInterval: 10s + scrapeTimeout: 10s +customServiceAccount: null +serviceAccount: null + +applicationConfig: + grpcPort: 50051 + grpc: + tls: + enabled: false + certPath: /certs/tls.crt + keyPath: /certs/tls.key + httpPort: 8080 + metricsPort: 9000 diff --git a/internal/queryapi/application.go b/internal/queryapi/application.go new file mode 100644 index 00000000000..3d408c1d10c --- /dev/null +++ b/internal/queryapi/application.go @@ -0,0 +1,30 @@ +package queryapi + +import ( + "github.com/pkg/errors" + + "github.com/armadaproject/armada/internal/common/app" + "github.com/armadaproject/armada/internal/common/armadacontext" + "github.com/armadaproject/armada/internal/common/auth" + "github.com/armadaproject/armada/internal/common/database" + grpcCommon "github.com/armadaproject/armada/internal/common/grpc" + "github.com/armadaproject/armada/internal/queryapi/server" + "github.com/armadaproject/armada/pkg/queryapi" +) + +func Run(config Configuration) error { + g, _ := armadacontext.ErrGroup(app.CreateContextWithShutdown()) + authServices, err := auth.ConfigureAuth(config.Auth) + if err != nil { + return errors.WithMessage(err, "error creating auth services") + } + + db, err := database.OpenPgxPool(config.Postgres) + if err != nil { + return errors.WithMessage(err, "error creating postgres pool") + } + grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, authServices, config.Grpc.Tls) + defer grpcServer.GracefulStop() + queryapi.RegisterQueryApiServer(grpcServer, server.New(db)) + return g.Wait() +} diff --git a/internal/queryapi/config.go b/internal/queryapi/config.go new file mode 100644 index 00000000000..27b499451fa --- /dev/null +++ b/internal/queryapi/config.go @@ -0,0 +1,13 @@ +package queryapi + +import ( + postgresconfig "github.com/armadaproject/armada/internal/armada/configuration" + authconfig "github.com/armadaproject/armada/internal/common/auth/configuration" + grpcconfig "github.com/armadaproject/armada/internal/common/grpc/configuration" +) + +type Configuration struct { + Grpc grpcconfig.GrpcConfig + Auth authconfig.AuthConfig + Postgres postgresconfig.PostgresConfig +} diff --git a/internal/queryapi/database/db.go b/internal/queryapi/database/db.go new file mode 100644 index 00000000000..5b03f7b80d9 --- /dev/null +++ b/internal/queryapi/database/db.go @@ -0,0 +1,32 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.22.0 + +package database + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/queryapi/database/models.go b/internal/queryapi/database/models.go new file mode 100644 index 00000000000..204e55d749f --- /dev/null +++ b/internal/queryapi/database/models.go @@ -0,0 +1,54 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.22.0 + +package database + +import ( + "github.com/jackc/pgx/v5/pgtype" +) + +type Job struct { + JobID string `db:"job_id"` + Queue string `db:"queue"` + Owner string `db:"owner"` + Jobset string `db:"jobset"` + Cpu int64 `db:"cpu"` + Memory int64 `db:"memory"` + EphemeralStorage int64 `db:"ephemeral_storage"` + Gpu int64 `db:"gpu"` + Priority int64 `db:"priority"` + Submitted pgtype.Timestamp `db:"submitted"` + Cancelled pgtype.Timestamp `db:"cancelled"` + State int16 `db:"state"` + LastTransitionTime pgtype.Timestamp `db:"last_transition_time"` + LastTransitionTimeSeconds int64 `db:"last_transition_time_seconds"` + JobSpec []byte `db:"job_spec"` + Duplicate bool `db:"duplicate"` + PriorityClass *string `db:"priority_class"` + LatestRunID *string `db:"latest_run_id"` + CancelReason *string `db:"cancel_reason"` + Namespace *string `db:"namespace"` +} + +type JobRun struct { + RunID string `db:"run_id"` + JobID string `db:"job_id"` + Cluster string `db:"cluster"` + Node *string `db:"node"` + Pending pgtype.Timestamp `db:"pending"` + Started pgtype.Timestamp `db:"started"` + Finished pgtype.Timestamp `db:"finished"` + JobRunState int16 `db:"job_run_state"` + Error []byte `db:"error"` + ExitCode *int32 `db:"exit_code"` + Leased pgtype.Timestamp `db:"leased"` +} + +type UserAnnotationLookup struct { + JobID string `db:"job_id"` + Key string `db:"key"` + Value string `db:"value"` + Queue string `db:"queue"` + Jobset string `db:"jobset"` +} diff --git a/internal/queryapi/database/query.sql b/internal/queryapi/database/query.sql new file mode 100644 index 00000000000..7b733f66d59 --- /dev/null +++ b/internal/queryapi/database/query.sql @@ -0,0 +1,2 @@ +-- name: GetJobState :many +SELECT state FROM job WHERE job_id = $1; diff --git a/internal/queryapi/database/query.sql.go b/internal/queryapi/database/query.sql.go new file mode 100644 index 00000000000..6b1e63d1036 --- /dev/null +++ b/internal/queryapi/database/query.sql.go @@ -0,0 +1,34 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.22.0 +// source: query.sql + +package database + +import ( + "context" +) + +const getJobState = `-- name: GetJobState :many +SELECT state FROM job WHERE job_id = $1 +` + +func (q *Queries) GetJobState(ctx context.Context, jobID string) ([]int16, error) { + rows, err := q.db.Query(ctx, getJobState, jobID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []int16 + for rows.Next() { + var state int16 + if err := rows.Scan(&state); err != nil { + return nil, err + } + items = append(items, state) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/internal/queryapi/database/sql.yaml b/internal/queryapi/database/sql.yaml new file mode 100644 index 00000000000..4bfb92a1da2 --- /dev/null +++ b/internal/queryapi/database/sql.yaml @@ -0,0 +1,21 @@ +# Compile with "sqlc generate -f internal/queryapi/database/sql.yaml" from the project root directory. +version: 2 +sql: + - schema: "../../lookoutv2/schema/migrations" + queries: "query.sql" + engine: "postgresql" + gen: + go: + out: "." + package: "database" + sql_package: "pgx/v5" + emit_prepared_queries: true + emit_db_tags: true + emit_interface: false + emit_pointers_for_null_types: true + overrides: + - db_type: "timestamptz" + go_type: + type: "time.Time" + pointer: true + nullable: true diff --git a/internal/queryapi/server/query_api.go b/internal/queryapi/server/query_api.go new file mode 100644 index 00000000000..4291605ec78 --- /dev/null +++ b/internal/queryapi/server/query_api.go @@ -0,0 +1,51 @@ +package server + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/armadaproject/armada/internal/common/database/lookout" + "github.com/armadaproject/armada/internal/queryapi/database" + "github.com/armadaproject/armada/pkg/queryapi" +) + +// JobStateMap is a mapping between database state and api states +var JobStateMap = map[int16]queryapi.JobStatus{ + lookout.JobLeasedOrdinal: queryapi.JobStatus_LEASED, + lookout.JobQueuedOrdinal: queryapi.JobStatus_QUEUED, + lookout.JobPendingOrdinal: queryapi.JobStatus_PENDING, + lookout.JobRunningOrdinal: queryapi.JobStatus_RUNNING, + lookout.JobSucceededOrdinal: queryapi.JobStatus_SUCCEEDED, + lookout.JobFailedOrdinal: queryapi.JobStatus_FAILED, + lookout.JobCancelledOrdinal: queryapi.JobStatus_CANCELLED, + lookout.JobPreemptedOrdinal: queryapi.JobStatus_PREEMPTED, +} + +type QueryApi struct { + db *pgxpool.Pool +} + +func New(db *pgxpool.Pool) *QueryApi { + return &QueryApi{db: db} +} + +func (q *QueryApi) GetJobStatus(ctx context.Context, req *queryapi.JobStatusRequest) (*queryapi.JobStatusResponse, error) { + queries := database.New(q.db) + queryResult, err := queries.GetJobState(ctx, req.JobId) + if err != nil { + return nil, err + } + status := int16(-1) + if len(queryResult) > 0 { + status = queryResult[0] + } + apiStatus, ok := JobStateMap[status] + if !ok { + apiStatus = queryapi.JobStatus_UNKNOWN + } + return &queryapi.JobStatusResponse{ + JobId: req.JobId, + JobStatus: apiStatus, + }, nil +} diff --git a/internal/queryapi/server/query_api_test.go b/internal/queryapi/server/query_api_test.go new file mode 100644 index 00000000000..743e2d35c83 --- /dev/null +++ b/internal/queryapi/server/query_api_test.go @@ -0,0 +1,116 @@ +package server + +import ( + "context" + "testing" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/armadaproject/armada/internal/common/database/lookout" + "github.com/armadaproject/armada/pkg/queryapi" +) + +func TestGetJobStatus(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // setup job db + tests := map[string]struct { + jobId string + expectedResponse *queryapi.JobStatusResponse + }{ + "leased job": { + jobId: "leasedJob", + expectedResponse: &queryapi.JobStatusResponse{JobId: "leasedJob", JobStatus: queryapi.JobStatus_LEASED}, + }, + "running job": { + jobId: "runningJob", + expectedResponse: &queryapi.JobStatusResponse{JobId: "runningJob", JobStatus: queryapi.JobStatus_RUNNING}, + }, + "completed job": { + jobId: "completedJob", + expectedResponse: &queryapi.JobStatusResponse{JobId: "completedJob", JobStatus: queryapi.JobStatus_SUCCEEDED}, + }, + "missing job": { + jobId: "missingJob", + expectedResponse: &queryapi.JobStatusResponse{JobId: "missingJob", JobStatus: queryapi.JobStatus_UNKNOWN}, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { + queryApi := New(db) + err := insertTestData(ctx, db) + require.NoError(t, err) + resp, err := queryApi.GetJobStatus(context.Background(), &queryapi.JobStatusRequest{JobId: tc.jobId}) + require.NoError(t, err) + assert.Equal(t, resp, tc.expectedResponse) + return nil + }) + assert.NoError(t, err) + }) + } +} + +func insertTestData(ctx context.Context, db *pgxpool.Pool) error { + err := insertJob(ctx, db, "leasedJob", lookout.JobLeasedOrdinal) + if err != nil { + return err + } + err = insertJob(ctx, db, "runningJob", lookout.JobRunningOrdinal) + if err != nil { + return err + } + err = insertJob(ctx, db, "completedJob", lookout.JobSucceededOrdinal) + if err != nil { + return err + } + return nil +} + +func insertJob(ctx context.Context, db *pgxpool.Pool, jobId string, state int16) error { + // Prepare the SQL query + sql := `INSERT INTO job(job_id, queue, owner, jobset, cpu, memory, ephemeral_storage, gpu, priority, submitted, state, last_transition_time, last_transition_time_seconds, job_spec, duplicate) + VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)` + + // Prepare dummy values + queue := " " + owner := " " + jobset := " " + cpu := int64(0) + memory := int64(0) + ephemeralStorage := int64(0) + gpu := int64(0) + priority := int64(0) + submitted := time.Now() + lastTransitionTime := time.Now() + lastTransitionTimeSeconds := int64(0) + jobSpec := []byte{} + duplicate := false + + // Execute the query with the prepared dummy values + _, err := db.Exec( + ctx, + sql, + jobId, + queue, + owner, + jobset, + cpu, + memory, + ephemeralStorage, + gpu, + priority, + submitted, + state, + lastTransitionTime, + lastTransitionTimeSeconds, + jobSpec, + duplicate) + + // Return any error that might have occurred during the execution + return err +} diff --git a/magefiles/main.go b/magefiles/main.go index 662f20cd7d9..f86d539b586 100644 --- a/magefiles/main.go +++ b/magefiles/main.go @@ -325,7 +325,7 @@ func Generate() error { // CI Image to build func BuildCI() error { - ciImage := []string{"bundle", "lookout-bundle", "server", "executor", "armadactl", "testsuite", "lookoutv2", "lookoutingesterv2", "eventingester", "scheduler", "scheduleringester", "binoculars", "jobservice"} + ciImage := []string{"bundle", "lookout-bundle", "server", "executor", "armadactl", "testsuite", "lookoutv2", "lookoutingesterv2", "eventingester", "scheduler", "scheduleringester", "binoculars", "jobservice", "queryapi"} err := goreleaserMinimalRelease(ciImage...) if err != nil { return err diff --git a/magefiles/proto.go b/magefiles/proto.go index c9848a09d26..164432dd9dc 100644 --- a/magefiles/proto.go +++ b/magefiles/proto.go @@ -100,6 +100,7 @@ func protoGenerate() error { "pkg/api/binoculars/*.proto", "pkg/api/jobservice/*.proto", "pkg/executorapi/*.proto", + "pkg/queryapi/*.proto", } for _, pattern := range patterns { matches, err := filepath.Glob(pattern) diff --git a/pkg/queryapi/queryapi.pb.go b/pkg/queryapi/queryapi.pb.go new file mode 100644 index 00000000000..87e49c455a2 --- /dev/null +++ b/pkg/queryapi/queryapi.pb.go @@ -0,0 +1,699 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pkg/queryapi/queryapi.proto + +package queryapi + +import ( + context "context" + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type JobStatus int32 + +const ( + JobStatus_UNKNOWN JobStatus = 0 + JobStatus_SUBMITTED JobStatus = 1 + JobStatus_QUEUED JobStatus = 2 + JobStatus_LEASED JobStatus = 3 + JobStatus_PENDING JobStatus = 4 + JobStatus_RUNNING JobStatus = 5 + JobStatus_SUCCEEDED JobStatus = 6 + JobStatus_FAILED JobStatus = 7 + JobStatus_PREEMPTED JobStatus = 8 + JobStatus_CANCELLED JobStatus = 9 +) + +var JobStatus_name = map[int32]string{ + 0: "UNKNOWN", + 1: "SUBMITTED", + 2: "QUEUED", + 3: "LEASED", + 4: "PENDING", + 5: "RUNNING", + 6: "SUCCEEDED", + 7: "FAILED", + 8: "PREEMPTED", + 9: "CANCELLED", +} + +var JobStatus_value = map[string]int32{ + "UNKNOWN": 0, + "SUBMITTED": 1, + "QUEUED": 2, + "LEASED": 3, + "PENDING": 4, + "RUNNING": 5, + "SUCCEEDED": 6, + "FAILED": 7, + "PREEMPTED": 8, + "CANCELLED": 9, +} + +func (x JobStatus) String() string { + return proto.EnumName(JobStatus_name, int32(x)) +} + +func (JobStatus) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_91f9781d786f55de, []int{0} +} + +type JobStatusRequest struct { + JobId string `protobuf:"bytes,1,opt,name=job_id,json=jobId,proto3" json:"jobId,omitempty"` +} + +func (m *JobStatusRequest) Reset() { *m = JobStatusRequest{} } +func (*JobStatusRequest) ProtoMessage() {} +func (*JobStatusRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_91f9781d786f55de, []int{0} +} +func (m *JobStatusRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *JobStatusRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_JobStatusRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *JobStatusRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_JobStatusRequest.Merge(m, src) +} +func (m *JobStatusRequest) XXX_Size() int { + return m.Size() +} +func (m *JobStatusRequest) XXX_DiscardUnknown() { + xxx_messageInfo_JobStatusRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_JobStatusRequest proto.InternalMessageInfo + +func (m *JobStatusRequest) GetJobId() string { + if m != nil { + return m.JobId + } + return "" +} + +type JobStatusResponse struct { + JobId string `protobuf:"bytes,1,opt,name=job_id,json=jobId,proto3" json:"jobId,omitempty"` + JobStatus JobStatus `protobuf:"varint,2,opt,name=job_status,json=jobStatus,proto3,enum=queryapi.JobStatus" json:"jobStatus,omitempty"` +} + +func (m *JobStatusResponse) Reset() { *m = JobStatusResponse{} } +func (*JobStatusResponse) ProtoMessage() {} +func (*JobStatusResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_91f9781d786f55de, []int{1} +} +func (m *JobStatusResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *JobStatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_JobStatusResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *JobStatusResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_JobStatusResponse.Merge(m, src) +} +func (m *JobStatusResponse) XXX_Size() int { + return m.Size() +} +func (m *JobStatusResponse) XXX_DiscardUnknown() { + xxx_messageInfo_JobStatusResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_JobStatusResponse proto.InternalMessageInfo + +func (m *JobStatusResponse) GetJobId() string { + if m != nil { + return m.JobId + } + return "" +} + +func (m *JobStatusResponse) GetJobStatus() JobStatus { + if m != nil { + return m.JobStatus + } + return JobStatus_UNKNOWN +} + +func init() { + proto.RegisterEnum("queryapi.JobStatus", JobStatus_name, JobStatus_value) + proto.RegisterType((*JobStatusRequest)(nil), "queryapi.JobStatusRequest") + proto.RegisterType((*JobStatusResponse)(nil), "queryapi.JobStatusResponse") +} + +func init() { proto.RegisterFile("pkg/queryapi/queryapi.proto", fileDescriptor_91f9781d786f55de) } + +var fileDescriptor_91f9781d786f55de = []byte{ + // 407 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x92, 0x3d, 0x6f, 0xd3, 0x40, + 0x18, 0xc7, 0x7d, 0x85, 0xba, 0xf1, 0xf1, 0x76, 0x38, 0x03, 0x55, 0x2a, 0x5d, 0xab, 0x4e, 0x55, + 0x55, 0x62, 0xa9, 0xec, 0x48, 0x7e, 0x39, 0x22, 0xd3, 0xf4, 0x48, 0xed, 0x5a, 0x48, 0x2c, 0xc8, + 0x4e, 0x8c, 0xb1, 0x91, 0x39, 0xc7, 0x3e, 0x0f, 0xd9, 0xf8, 0x00, 0x0c, 0x88, 0x4f, 0xc5, 0x98, + 0x31, 0x53, 0x04, 0xce, 0x96, 0x4f, 0x81, 0xce, 0x49, 0x8c, 0x87, 0x2c, 0x6c, 0xcf, 0xef, 0x9e, + 0xfb, 0xff, 0xe4, 0xc7, 0xf7, 0xc0, 0x93, 0xec, 0x4b, 0xa4, 0x4d, 0xcb, 0x30, 0x9f, 0xf9, 0x59, + 0xdc, 0x14, 0xfd, 0x2c, 0x67, 0x9c, 0xa9, 0x9d, 0x1d, 0xf7, 0x5e, 0x46, 0x31, 0xff, 0x5c, 0x06, + 0xfd, 0x31, 0x4b, 0xb5, 0x88, 0x45, 0x4c, 0xab, 0x2f, 0x04, 0xe5, 0xa7, 0x9a, 0x6a, 0xa8, 0xab, + 0x4d, 0xf0, 0xfc, 0x35, 0x44, 0x6f, 0x59, 0xe0, 0x72, 0x9f, 0x97, 0x85, 0x13, 0x4e, 0xcb, 0xb0, + 0xe0, 0xea, 0x25, 0x94, 0x13, 0x16, 0x7c, 0x8c, 0x27, 0xc7, 0xe0, 0x0c, 0x5c, 0x28, 0x46, 0x77, + 0xbd, 0x3c, 0x7d, 0x96, 0xb0, 0xc0, 0x9e, 0x5c, 0xb1, 0x34, 0xe6, 0x61, 0x9a, 0xf1, 0x99, 0x73, + 0x58, 0x1f, 0x9c, 0x7f, 0x07, 0xf0, 0x79, 0x4b, 0x50, 0x64, 0xec, 0x6b, 0x11, 0xfe, 0x8f, 0x41, + 0xbd, 0x81, 0x50, 0xdc, 0x2d, 0x6a, 0xc3, 0xf1, 0xc1, 0x19, 0xb8, 0x78, 0x7a, 0xdd, 0xed, 0x37, + 0xf3, 0x35, 0x72, 0xe3, 0xc5, 0x7a, 0x79, 0xda, 0x4d, 0x76, 0xd8, 0x12, 0x29, 0xcd, 0xe1, 0xe5, + 0x4f, 0x00, 0x95, 0x26, 0xa1, 0x3e, 0x82, 0x47, 0x1e, 0xbd, 0xa1, 0xef, 0xde, 0x53, 0x24, 0xa9, + 0x4f, 0xa0, 0xe2, 0x7a, 0xc6, 0xad, 0x7d, 0x7f, 0x4f, 0x2c, 0x04, 0x54, 0x08, 0xe5, 0x3b, 0x8f, + 0x78, 0xc4, 0x42, 0x07, 0xa2, 0x1e, 0x12, 0xdd, 0x25, 0x16, 0x7a, 0x20, 0x32, 0x23, 0x42, 0x2d, + 0x9b, 0x0e, 0xd0, 0x43, 0x01, 0x8e, 0x47, 0xa9, 0x80, 0xc3, 0x8d, 0xc0, 0x34, 0x09, 0xb1, 0x88, + 0x85, 0x64, 0x11, 0x7a, 0xa3, 0xdb, 0x43, 0x62, 0xa1, 0x23, 0xd1, 0x1a, 0x39, 0x84, 0xdc, 0x8e, + 0x84, 0xbb, 0x23, 0xd0, 0xd4, 0xa9, 0x49, 0x86, 0xa2, 0xab, 0x5c, 0xbb, 0xb0, 0x73, 0x27, 0xc6, + 0xd1, 0xb3, 0x58, 0x1d, 0xc0, 0xc7, 0x83, 0x90, 0xff, 0xfb, 0xc4, 0xde, 0x9e, 0x49, 0xb7, 0xef, + 0xd0, 0x3b, 0xd9, 0xdb, 0xdb, 0xfc, 0x62, 0x83, 0x2e, 0xfe, 0x60, 0xe9, 0x5b, 0x85, 0xc1, 0xaf, + 0x0a, 0x83, 0x79, 0x85, 0xc1, 0xef, 0x0a, 0x83, 0x1f, 0x2b, 0x2c, 0xcd, 0x57, 0x58, 0x5a, 0xac, + 0xb0, 0xf4, 0xe1, 0xaa, 0xb5, 0x09, 0x7e, 0x9e, 0xfa, 0x13, 0x3f, 0xcb, 0x59, 0x12, 0x8e, 0xf9, + 0x96, 0xb4, 0xf6, 0x42, 0x05, 0x72, 0xbd, 0x0f, 0xaf, 0xfe, 0x06, 0x00, 0x00, 0xff, 0xff, 0xb8, + 0xad, 0x27, 0x62, 0x67, 0x02, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// QueryApiClient is the client API for QueryApi service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type QueryApiClient interface { + GetJobStatus(ctx context.Context, in *JobStatusRequest, opts ...grpc.CallOption) (*JobStatusResponse, error) +} + +type queryApiClient struct { + cc *grpc.ClientConn +} + +func NewQueryApiClient(cc *grpc.ClientConn) QueryApiClient { + return &queryApiClient{cc} +} + +func (c *queryApiClient) GetJobStatus(ctx context.Context, in *JobStatusRequest, opts ...grpc.CallOption) (*JobStatusResponse, error) { + out := new(JobStatusResponse) + err := c.cc.Invoke(ctx, "/queryapi.QueryApi/GetJobStatus", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// QueryApiServer is the server API for QueryApi service. +type QueryApiServer interface { + GetJobStatus(context.Context, *JobStatusRequest) (*JobStatusResponse, error) +} + +// UnimplementedQueryApiServer can be embedded to have forward compatible implementations. +type UnimplementedQueryApiServer struct { +} + +func (*UnimplementedQueryApiServer) GetJobStatus(ctx context.Context, req *JobStatusRequest) (*JobStatusResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetJobStatus not implemented") +} + +func RegisterQueryApiServer(s *grpc.Server, srv QueryApiServer) { + s.RegisterService(&_QueryApi_serviceDesc, srv) +} + +func _QueryApi_GetJobStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(JobStatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QueryApiServer).GetJobStatus(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/queryapi.QueryApi/GetJobStatus", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QueryApiServer).GetJobStatus(ctx, req.(*JobStatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _QueryApi_serviceDesc = grpc.ServiceDesc{ + ServiceName: "queryapi.QueryApi", + HandlerType: (*QueryApiServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetJobStatus", + Handler: _QueryApi_GetJobStatus_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkg/queryapi/queryapi.proto", +} + +func (m *JobStatusRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *JobStatusRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *JobStatusRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.JobId) > 0 { + i -= len(m.JobId) + copy(dAtA[i:], m.JobId) + i = encodeVarintQueryapi(dAtA, i, uint64(len(m.JobId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *JobStatusResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *JobStatusResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *JobStatusResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.JobStatus != 0 { + i = encodeVarintQueryapi(dAtA, i, uint64(m.JobStatus)) + i-- + dAtA[i] = 0x10 + } + if len(m.JobId) > 0 { + i -= len(m.JobId) + copy(dAtA[i:], m.JobId) + i = encodeVarintQueryapi(dAtA, i, uint64(len(m.JobId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintQueryapi(dAtA []byte, offset int, v uint64) int { + offset -= sovQueryapi(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *JobStatusRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.JobId) + if l > 0 { + n += 1 + l + sovQueryapi(uint64(l)) + } + return n +} + +func (m *JobStatusResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.JobId) + if l > 0 { + n += 1 + l + sovQueryapi(uint64(l)) + } + if m.JobStatus != 0 { + n += 1 + sovQueryapi(uint64(m.JobStatus)) + } + return n +} + +func sovQueryapi(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozQueryapi(x uint64) (n int) { + return sovQueryapi(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *JobStatusRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&JobStatusRequest{`, + `JobId:` + fmt.Sprintf("%v", this.JobId) + `,`, + `}`, + }, "") + return s +} +func (this *JobStatusResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&JobStatusResponse{`, + `JobId:` + fmt.Sprintf("%v", this.JobId) + `,`, + `JobStatus:` + fmt.Sprintf("%v", this.JobStatus) + `,`, + `}`, + }, "") + return s +} +func valueToStringQueryapi(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *JobStatusRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryapi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: JobStatusRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: JobStatusRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field JobId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryapi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueryapi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueryapi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.JobId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQueryapi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQueryapi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *JobStatusResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryapi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: JobStatusResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: JobStatusResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field JobId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryapi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueryapi + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueryapi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.JobId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field JobStatus", wireType) + } + m.JobStatus = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryapi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.JobStatus |= JobStatus(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipQueryapi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQueryapi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipQueryapi(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowQueryapi + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowQueryapi + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowQueryapi + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthQueryapi + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupQueryapi + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthQueryapi + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthQueryapi = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowQueryapi = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupQueryapi = fmt.Errorf("proto: unexpected end of group") +) diff --git a/pkg/queryapi/queryapi.proto b/pkg/queryapi/queryapi.proto new file mode 100644 index 00000000000..7a9b7b76bad --- /dev/null +++ b/pkg/queryapi/queryapi.proto @@ -0,0 +1,34 @@ +syntax = 'proto3'; + +package queryapi; +option go_package = "github.com/armadaproject/armada/pkg/queryapi"; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +option (gogoproto.goproto_stringer_all) = false; +option (gogoproto.stringer_all) = true; + +enum JobStatus { + UNKNOWN = 0; + SUBMITTED = 1; + QUEUED = 2; + LEASED = 3; + PENDING = 4; + RUNNING = 5; + SUCCEEDED = 6; + FAILED = 7; + PREEMPTED = 8; + CANCELLED = 9; +} + +message JobStatusRequest{ + string job_id = 1; +} + +message JobStatusResponse{ + string job_id = 1; + JobStatus job_status = 2; +} + +service QueryApi { + rpc GetJobStatus (JobStatusRequest) returns (JobStatusResponse); +} diff --git a/scripts/build-python-client.sh b/scripts/build-python-client.sh index dad4be2079e..95440ee7254 100755 --- a/scripts/build-python-client.sh +++ b/scripts/build-python-client.sh @@ -3,7 +3,7 @@ # make the python package armada.client, not pkg.api mkdir -p proto/armada -cp pkg/api/event.proto pkg/api/queue.proto pkg/api/submit.proto pkg/api/usage.proto pkg/api/health.proto proto/armada +cp pkg/api/event.proto pkg/api/queue.proto pkg/api/submit.proto pkg/api/usage.proto pkg/api/health.proto pkg/queryapi/queryapi.proto proto/armada sed -i 's/\([^\/]\)pkg\/api/\1armada/g' proto/armada/*.proto # generate python stubs @@ -11,7 +11,7 @@ cd proto python3 -m grpc_tools.protoc -I. --plugin=protoc-gen-mypy=$(which protoc-gen-mypy) --python_out=../client/python/armada_client --grpc_python_out=../client/python/armada_client --mypy_out=../client/python/armada_client \ google/api/annotations.proto \ google/api/http.proto \ - armada/event.proto armada/queue.proto armada/submit.proto armada/usage.proto armada/health.proto \ + armada/event.proto armada/queue.proto armada/submit.proto armada/usage.proto armada/health.proto armada/queryapi.proto \ github.com/gogo/protobuf/gogoproto/gogo.proto \ k8s.io/api/core/v1/generated.proto \ k8s.io/apimachinery/pkg/api/resource/generated.proto \ diff --git a/scripts/common.sh b/scripts/common.sh index 120003c094f..d7cafd3c1ad 100755 --- a/scripts/common.sh +++ b/scripts/common.sh @@ -16,6 +16,7 @@ export image_names=( "armada-scheduler" "armada-scheduler-ingester" "armada-binoculars" + "armada-queryapi" "armada-jobservice" "armadactl" )