diff --git a/.gitignore b/.gitignore
index 73c5e2d5452..9bcaaea3a38 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,6 +14,10 @@ junit.xml
docker-compose.dev.yaml
delve
+# VS Code debugging
+__debug_bin
+**/__debug_bin*
+
# Build artifacts
dist
.goreleaser-minimal.yml
diff --git a/.goreleaser.yml b/.goreleaser.yml
index e50580f99e0..51a56c20e22 100644
--- a/.goreleaser.yml
+++ b/.goreleaser.yml
@@ -224,6 +224,7 @@ dockers:
- internal/lookout/ui
- pkg/api/api.swagger.json
- pkg/api/binoculars/api.swagger.json
+ - pkg/api/schedulerobjects/api.swagger.json
dockerfile: ./build/bundles/lookout/Dockerfile
- id: full-bundle
@@ -258,6 +259,7 @@ dockers:
- internal/lookout/ui
- pkg/api/api.swagger.json
- pkg/api/binoculars/api.swagger.json
+ - pkg/api/schedulerobjects/api.swagger.json
dockerfile: ./build/bundles/full/Dockerfile
- id: server
@@ -359,6 +361,7 @@ dockers:
- internal/lookout/ui
- pkg/api/api.swagger.json
- pkg/api/binoculars/api.swagger.json
+ - pkg/api/schedulerobjects/api.swagger.json
- config/lookoutv2/config.yaml
- config/lookoutingesterv2/config.yaml
- config/logging.yaml
diff --git a/.run/LookoutV2.run.xml b/.run/LookoutV2.run.xml
index ee4e291cb3c..a85ddf17726 100644
--- a/.run/LookoutV2.run.xml
+++ b/.run/LookoutV2.run.xml
@@ -7,6 +7,8 @@
+
+
diff --git a/build/bundles/full/Dockerfile b/build/bundles/full/Dockerfile
index 5199e21239b..a55303fbbb8 100644
--- a/build/bundles/full/Dockerfile
+++ b/build/bundles/full/Dockerfile
@@ -9,6 +9,7 @@ LABEL org.opencontainers.image.url=https://hub.docker.com/r/gresearch/armada-ful
COPY internal/lookout/ui /project/internal/lookout/ui
COPY pkg/api/*.swagger.json /project/pkg/api/
COPY pkg/api/binoculars/*.swagger.json /project/pkg/api/binoculars/
+COPY pkg/api/schedulerobjects/*.swagger.json /project/pkg/api/schedulerobjects/
RUN ./project/internal/lookout/ui/openapi.sh
FROM ${NODE_BUILD_IMAGE} AS NODE
diff --git a/build/lookoutv2/Dockerfile b/build/lookoutv2/Dockerfile
index a75154b7358..a8dde1600c9 100644
--- a/build/lookoutv2/Dockerfile
+++ b/build/lookoutv2/Dockerfile
@@ -7,6 +7,7 @@ FROM ${OPENAPI_BUILD_IMAGE} AS OPENAPI
COPY internal/lookout/ui /project/internal/lookout/ui
COPY pkg/api/*.swagger.json /project/pkg/api/
COPY pkg/api/binoculars/*.swagger.json /project/pkg/api/binoculars/
+COPY pkg/api/schedulerobjects/*.swagger.json /project/pkg/api/schedulerobjects/
RUN ./project/internal/lookout/ui/openapi.sh
FROM ${NODE_BUILD_IMAGE} AS NODE
diff --git a/cmd/server/main.go b/cmd/server/main.go
index df909f5649d..38286f393b8 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -21,6 +21,7 @@ import (
"github.com/armadaproject/armada/internal/server"
"github.com/armadaproject/armada/internal/server/configuration"
"github.com/armadaproject/armada/pkg/api"
+ "github.com/armadaproject/armada/pkg/api/schedulerobjects"
)
const CustomConfigLocation string = "config"
@@ -92,6 +93,7 @@ func main() {
api.RegisterSubmitHandler,
api.RegisterEventHandler,
api.RegisterJobsHandler,
+ schedulerobjects.RegisterSchedulerReportingHandler,
)
defer shutdownGateway()
diff --git a/docs/python_airflow_operator.md b/docs/python_airflow_operator.md
index 86b928230ab..be46d19238a 100644
--- a/docs/python_airflow_operator.md
+++ b/docs/python_airflow_operator.md
@@ -63,7 +63,7 @@ and handles job cancellation if the Airflow task is killed.
* **reattach_policy** (*Optional**[**str**] **| **Callable**[**[**JobState**, **str**]**, **bool**]*) –
- * **extra_links** (*Optional**[**Dict**[**str**, **str**]**]*) –
+ * **extra_links** (*Optional**[**Dict**[**str**, **Union**[**str**, **re.Pattern**]**]**]*) –
@@ -199,74 +199,10 @@ acknowledged by Armada.
:param reattach_policy: Operator reattach policy to use (defaults to: never)
:type reattach_policy: Optional[str] | Callable[[JobState, str], bool]
:param kwargs: Additional keyword arguments to pass to the BaseOperator.
-:param extra_links: Extra links to be shown in addition to Lookout URL.
-:type extra_links: Optional[Dict[str, str]]
+:param extra_links: Extra links to be shown in addition to Lookout URL. Regex patterns will be extracted from container logs (taking first match).
+:type extra_links: Optional[Dict[str, Union[str, re.Pattern]]]
:param kwargs: Additional keyword arguments to pass to the BaseOperator.
-
-### _class_ armada.operators.armada.DynamicLink(name)
-Bases: `BaseOperatorLink`, `LoggingMixin`
-
-
-* **Parameters**
-
- **name** (*str*) –
-
-
-
-#### get_link(operator, \*, ti_key)
-Link to external system.
-
-Note: The old signature of this function was `(self, operator, dttm: datetime)`. That is still
-supported at runtime but is deprecated.
-
-
-* **Parameters**
-
-
- * **operator** (*BaseOperator*) – The Airflow operator object this link is associated to.
-
-
- * **ti_key** (*TaskInstanceKey*) – TaskInstance ID to return link for.
-
-
-
-* **Returns**
-
- link to external system
-
-
-
-#### name(_: st_ )
-
-### _class_ armada.operators.armada.LookoutLink()
-Bases: `BaseOperatorLink`
-
-
-#### get_link(operator, \*, ti_key)
-Link to external system.
-
-Note: The old signature of this function was `(self, operator, dttm: datetime)`. That is still
-supported at runtime but is deprecated.
-
-
-* **Parameters**
-
-
- * **operator** (*BaseOperator*) – The Airflow operator object this link is associated to.
-
-
- * **ti_key** (*TaskInstanceKey*) – TaskInstance ID to return link for.
-
-
-
-* **Returns**
-
- link to external system
-
-
-
-#### name(_ = 'Lookout_ )
## armada.triggers.armada module
## armada.auth module
diff --git a/internal/armadactl/scheduling.go b/internal/armadactl/scheduling.go
index fa804cb31ec..fb4591a5a24 100644
--- a/internal/armadactl/scheduling.go
+++ b/internal/armadactl/scheduling.go
@@ -4,7 +4,7 @@ import (
"fmt"
"github.com/armadaproject/armada/internal/common"
- "github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
+ "github.com/armadaproject/armada/pkg/api/schedulerobjects"
"github.com/armadaproject/armada/pkg/client"
)
diff --git a/internal/common/logging/global.go b/internal/common/logging/global.go
index c34b8cf9529..a67ecaa507b 100644
--- a/internal/common/logging/global.go
+++ b/internal/common/logging/global.go
@@ -6,11 +6,11 @@ import (
"github.com/rs/zerolog"
)
-// The global Logger. Comes configured with some sensible defaults for e.g. unit tests, but applications should
-// generally configure their own logging config via ReplaceStdLogger
var (
StdSkipFrames = 4
- stdLogger = createDefaultLogger()
+ // The global Logger. Comes configured with some sensible defaults for e.g. unit tests, but applications should
+ // generally configure their own logging config via ReplaceStdLogger
+ stdLogger = createDefaultLogger()
)
// ReplaceStdLogger Replaces the global logger. This should be called once at app startup!
@@ -23,57 +23,57 @@ func StdLogger() *Logger {
return stdLogger
}
-// Debug logs a message at level Debug.
+// Debug logs a message at level Debug on the default logger.
func Debug(args ...any) {
stdLogger.Debug(args...)
}
-// Info logs a message at level Info.
+// Info logs a message at level Info on the default logger.
func Info(args ...any) {
stdLogger.Info(args...)
}
-// Warn logs a message at level Warn on the standard logger.
+// Warn logs a message at level Warn on the default logger.
func Warn(args ...any) {
stdLogger.Warn(args...)
}
-// Error logs a message at level Error on the standard logger.
+// Error logs a message at level Error on the default logger.
func Error(args ...any) {
stdLogger.Error(args...)
}
-// Panic logs a message at level Panic on the standard logger.
+// Panic logs a message at level Panic on the default logger.
func Panic(args ...any) {
stdLogger.Panic(args...)
}
-// Fatal logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
+// Fatal logs a message at level Fatal on the default logger then the process will exit with status set to 1.
func Fatal(args ...any) {
stdLogger.Fatal(args...)
}
-// Debugf logs a message at level Debug on the standard logger.
+// Debugf logs a message at level Debug on the default logger.
func Debugf(format string, args ...any) {
stdLogger.Debugf(format, args...)
}
-// Infof logs a message at level Info on the standard logger.
+// Infof logs a message at level Info on the default logger.
func Infof(format string, args ...any) {
stdLogger.Infof(format, args...)
}
-// Warnf logs a message at level Warn on the standard logger.
+// Warnf logs a message at level Warn on the default logger.
func Warnf(format string, args ...any) {
stdLogger.Warnf(format, args...)
}
-// Errorf logs a message at level Error on the standard logger.
+// Errorf logs a message at level Error on the default logger.
func Errorf(format string, args ...any) {
stdLogger.Errorf(format, args...)
}
-// Fatalf logs a message at level Fatal on the standard logger then the process will exit with status set to 1.
+// Fatalf logs a message at level Fatal on the default logger then the process will exit with status set to 1.
func Fatalf(format string, args ...any) {
stdLogger.Fatalf(format, args...)
}
diff --git a/internal/common/resource/resource_test.go b/internal/common/resource/resource_test.go
index 184d1fa9067..80752ca18aa 100644
--- a/internal/common/resource/resource_test.go
+++ b/internal/common/resource/resource_test.go
@@ -142,7 +142,7 @@ func TestTotalResourceRequest_ShouldSumAllContainers(t *testing.T) {
func TestTotalResourceRequestShouldReportMaxInitContainerValues(t *testing.T) {
highCpuResource := makeContainerResource(1000, 5)
highRamResource := makeContainerResource(100, 500)
- // WithField init containers, it should take the max of each individual resource from all init containers
+ // With init containers, it should take the max of each individual resource from all init containers
expectedResult := makeContainerResource(1000, 500)
pod := makePodWithResource([]*v1.ResourceList{}, []*v1.ResourceList{&highCpuResource, &highRamResource})
diff --git a/internal/lookout/ui/openapi.sh b/internal/lookout/ui/openapi.sh
index 0c1f57dd464..dae1c1e6322 100755
--- a/internal/lookout/ui/openapi.sh
+++ b/internal/lookout/ui/openapi.sh
@@ -6,3 +6,7 @@
-g typescript-fetch \
-i /project/pkg/api/binoculars/api.swagger.json \
-o /project/internal/lookout/ui/src/openapi/binoculars
+/usr/local/bin/docker-entrypoint.sh generate \
+ -g typescript-fetch \
+ -i /project/pkg/api/schedulerobjects/api.swagger.json \
+ -o /project/internal/lookout/ui/src/openapi/schedulerobjects
diff --git a/internal/lookout/ui/src/components/CodeBlock.tsx b/internal/lookout/ui/src/components/CodeBlock.tsx
index 28bd2671f7b..dcae404ef16 100644
--- a/internal/lookout/ui/src/components/CodeBlock.tsx
+++ b/internal/lookout/ui/src/components/CodeBlock.tsx
@@ -1,4 +1,4 @@
-import { useCallback } from "react"
+import { ReactNode, useCallback } from "react"
import { Download } from "@mui/icons-material"
import { IconButton, Skeleton, styled, useColorScheme } from "@mui/material"
@@ -45,6 +45,7 @@ const StyledPre = styled("pre")(({ theme }) => ({
minHeight: 50,
display: "flex",
alignItems: "center",
+ margin: 0,
}))
const Code = styled("code")({
@@ -72,6 +73,12 @@ const CodeLineNumber = styled("span")({
},
})
+interface CodeBlockActionProps {
+ title: string
+ onClick: () => void
+ icon: ReactNode
+}
+
interface CodeBlockLoadingProps {
loading: true
code?: undefined | string
@@ -96,13 +103,14 @@ interface CodeBlockNonDownloadbaleProps {
downloadFileName?: undefined | string
}
-interface CodeBlockbaseProps {
+interface CodeBlockBaseProps {
showLineNumbers: boolean
loadingLines?: number
loadingLineLength?: number
+ additionalActions?: CodeBlockActionProps[]
}
-export type CodeBlockProps = CodeBlockbaseProps &
+export type CodeBlockProps = CodeBlockBaseProps &
(CodeBlockLoadedProps | CodeBlockLoadingProps) &
(CodeBlockDownloadbaleProps | CodeBlockNonDownloadbaleProps)
@@ -116,6 +124,7 @@ export const CodeBlock = ({
showLineNumbers,
loadingLines = DEFAULT_LOADING_LINES,
loadingLineLength = DEFAULT_LOADING_LINE_LENGTH,
+ additionalActions = [],
}: CodeBlockProps) => {
const { colorScheme } = useColorScheme()
@@ -177,6 +186,11 @@ export const CodeBlock = ({
)}
+ {additionalActions.map(({ title, onClick, icon }) => (
+
+ {icon}
+
+ ))}
({
+ display: "flex",
+ flexDirection: "column",
+ gap: theme.spacing(SPACING.xs),
+}))
+
+const RatingAndLabelContainer = styled("div")(({ theme }) => ({
+ display: "flex",
+ alignItems: "center",
+ gap: theme.spacing(SPACING.sm),
+}))
+
+const StyledRating = styled(Rating)(({ theme }) => ({
+ "& .MuiRating-iconFilled": {
+ color: theme.palette.secondary.light,
+ },
+ "& .MuiRating-iconHover": {
+ color: theme.palette.secondary.main,
+ },
+}))
+
+export interface VerbositySelectorProps {
+ name: string
+ legendLabel: string
+ max: number
+ verbosity: number
+ onChange: (verbosity: number) => void
+ disabled?: boolean
+}
+
+export const VerbositySelector = ({
+ name,
+ legendLabel,
+ max,
+ verbosity,
+ onChange,
+ disabled,
+}: VerbositySelectorProps) => {
+ const [hoverValue, setHoverValue] = useState(-1)
+ return (
+
+
+ {legendLabel}
+
+
+ {
+ setHoverValue(value)
+ }}
+ onChange={(_, value) => onChange(value ?? 0)}
+ max={max}
+ getLabelText={(value) => value.toString()}
+ icon={}
+ emptyIcon={}
+ disabled={disabled}
+ />
+ Level {hoverValue !== -1 ? hoverValue.toString() : verbosity.toString()}
+
+
+ )
+}
diff --git a/internal/lookout/ui/src/components/lookoutV2/sidebar/Sidebar.tsx b/internal/lookout/ui/src/components/lookoutV2/sidebar/Sidebar.tsx
index d74f777dac1..34834140305 100644
--- a/internal/lookout/ui/src/components/lookoutV2/sidebar/Sidebar.tsx
+++ b/internal/lookout/ui/src/components/lookoutV2/sidebar/Sidebar.tsx
@@ -10,6 +10,7 @@ import { SidebarTabJobDetails } from "./SidebarTabJobDetails"
import { SidebarTabJobLogs } from "./SidebarTabJobLogs"
import { SidebarTabJobResult } from "./SidebarTabJobResult"
import { SidebarTabJobYaml } from "./SidebarTabJobYaml"
+import { SidebarTabScheduling } from "./SidebarTabScheduling"
import { Job, JobState } from "../../../models/lookoutV2Models"
import { ICordonService } from "../../../services/lookoutV2/CordonService"
import { IGetJobInfoService } from "../../../services/lookoutV2/GetJobInfoService"
@@ -20,6 +21,7 @@ import { CommandSpec } from "../../../utils"
enum SidebarTab {
JobDetails = "JobDetails",
JobResult = "JobResult",
+ Scheduling = "Scheduling",
Yaml = "Yaml",
Logs = "Logs",
Commands = "Commands",
@@ -149,6 +151,11 @@ export const Sidebar = memo(
commandSpecs,
}: SidebarProps) => {
const [openTab, setOpenTab] = useState(SidebarTab.JobDetails)
+ useEffect(() => {
+ if (openTab === SidebarTab.Scheduling && job.state !== JobState.Queued) {
+ setOpenTab(SidebarTab.JobDetails)
+ }
+ }, [openTab, job.state])
const handleTabChange = useCallback((_: SyntheticEvent, newValue: SidebarTab) => {
setOpenTab(newValue)
@@ -254,6 +261,9 @@ export const Sidebar = memo(
+ {job.state === JobState.Queued && (
+
+ )}
+ {job.state === JobState.Queued && (
+
+
+
+ )}
+
diff --git a/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabScheduling.tsx b/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabScheduling.tsx
new file mode 100644
index 00000000000..efae2533185
--- /dev/null
+++ b/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabScheduling.tsx
@@ -0,0 +1,257 @@
+import { useState } from "react"
+
+import { Refresh } from "@mui/icons-material"
+import {
+ Accordion,
+ AccordionDetails,
+ AccordionSummary,
+ Alert,
+ AlertTitle,
+ Button,
+ Stack,
+ Typography,
+} from "@mui/material"
+
+import { KeyValuePairTable } from "./KeyValuePairTable"
+import { SidebarTabHeading, SidebarTabProminentValueCard, SidebarTabSubheading } from "./sidebarTabContentComponents"
+import { Job, JobState } from "../../../models/lookoutV2Models"
+import { useGetJobSchedulingReport } from "../../../services/lookoutV2/useGetJobSchedulingReport"
+import { useGetQueueSchedulingReport } from "../../../services/lookoutV2/useGetQueueSchedulingReport"
+import { useGetSchedulingReport } from "../../../services/lookoutV2/useGetSchedulingReport"
+import { SPACING } from "../../../styling/spacing"
+import { formatTimeSince } from "../../../utils/jobsTableFormatters"
+import { formatCpu, formatBytes } from "../../../utils/resourceUtils"
+import { CodeBlock } from "../../CodeBlock"
+import { VerbositySelector } from "../../VerbositySelector"
+
+export interface SidebarTabSchedulingProps {
+ job: Job
+}
+
+export const SidebarTabScheduling = ({ job }: SidebarTabSchedulingProps) => {
+ const [showAdvancedReports, setShowAdvancedReports] = useState(false)
+ const [queueReportVerbosity, setQueueReportVerbosity] = useState(0)
+ const [schedulingReportVerbosity, setSchedulingReportVerbosity] = useState(0)
+
+ const getJobSchedulingReportResult = useGetJobSchedulingReport(job.jobId, Boolean(job.jobId))
+ const getQueueSchedulingReportResult = useGetQueueSchedulingReport(
+ job.queue,
+ queueReportVerbosity,
+ showAdvancedReports && Boolean(job.queue),
+ )
+ const getSchedulingReportResult = useGetSchedulingReport(schedulingReportVerbosity, showAdvancedReports)
+
+ return (
+ <>
+
+ {job.state === JobState.Queued && (
+
+ )}
+
+ Scheduling report
+ This is the scheduling report for this job from the latest scheduling round.
+ {getJobSchedulingReportResult.status === "error" && (
+ getJobSchedulingReportResult.refetch()}>
+ Retry
+
+ }
+ >
+ Failed to get the scheduling report for this job.
+ {getJobSchedulingReportResult.error}
+
+ )}
+ {(getJobSchedulingReportResult.status === "pending" || getJobSchedulingReportResult.isFetching) && (
+
+ )}
+ {getJobSchedulingReportResult.status === "success" &&
+ !getJobSchedulingReportResult.isFetching &&
+ (getJobSchedulingReportResult.data.report ? (
+ getJobSchedulingReportResult.refetch(), icon: },
+ ]}
+ />
+ ) : (
+ getJobSchedulingReportResult.refetch()}>
+ Retry
+
+ }
+ >
+ No scheduling report is available for this job.
+
+ ))}
+
+
setShowAdvancedReports(expanded)}>
+
+ Advanced reports
+
+
+
+ Report for this job's queue ({job.queue})
+
+ setQueueReportVerbosity(verbosity)}
+ disabled={
+ getQueueSchedulingReportResult.status === "pending" || getQueueSchedulingReportResult.isFetching
+ }
+ />
+ {getQueueSchedulingReportResult.status === "error" && (
+ getQueueSchedulingReportResult.refetch()}>
+ Retry
+
+ }
+ >
+ Failed to get the scheduling report for this queue.
+ {getQueueSchedulingReportResult.error}
+
+ )}
+ {(getQueueSchedulingReportResult.status === "pending" || getQueueSchedulingReportResult.isFetching) && (
+
+ )}
+ {getQueueSchedulingReportResult.status === "success" &&
+ !getQueueSchedulingReportResult.isFetching &&
+ (getQueueSchedulingReportResult.data.report ? (
+ getQueueSchedulingReportResult.refetch(),
+ icon: ,
+ },
+ ]}
+ />
+ ) : (
+ getQueueSchedulingReportResult.refetch()}>
+ Retry
+
+ }
+ >
+ No scheduling report is available for this queue.
+
+ ))}
+
+ Overall scheduling report
+
+ setSchedulingReportVerbosity(verbosity)}
+ disabled={getSchedulingReportResult.status === "pending" || getSchedulingReportResult.isFetching}
+ />
+ {getSchedulingReportResult.status === "error" && (
+ getSchedulingReportResult.refetch()}>
+ Retry
+
+ }
+ >
+ Failed to get the overall scheduling report.
+ {getSchedulingReportResult.error}
+
+ )}
+ {(getSchedulingReportResult.status === "pending" || getSchedulingReportResult.isFetching) && (
+
+ )}
+ {getSchedulingReportResult.status === "success" &&
+ !getSchedulingReportResult.isFetching &&
+ (getSchedulingReportResult.data.report ? (
+ getSchedulingReportResult.refetch(), icon: },
+ ]}
+ />
+ ) : (
+ getSchedulingReportResult.refetch()}>
+ Retry
+
+ }
+ >
+ No scheduling report is available.
+
+ ))}
+
+
+
+
+
+ Scheduling parameters
+
+ These job details are used by Armada's scheduler when determining which jobs to execute on the cluster.
+
+
+
+
+ Resource requests
+
+ >
+ )
+}
diff --git a/internal/lookout/ui/src/components/lookoutV2/sidebar/sidebarTabContentComponents.tsx b/internal/lookout/ui/src/components/lookoutV2/sidebar/sidebarTabContentComponents.tsx
index 63928d7cd38..6f0c7174cf1 100644
--- a/internal/lookout/ui/src/components/lookoutV2/sidebar/sidebarTabContentComponents.tsx
+++ b/internal/lookout/ui/src/components/lookoutV2/sidebar/sidebarTabContentComponents.tsx
@@ -1,4 +1,6 @@
-import { styled, Typography, TypographyProps } from "@mui/material"
+import { Card, styled, Typography, TypographyProps } from "@mui/material"
+
+import { SPACING } from "../../../styling/spacing"
const StyledHeading = styled(Typography)(({ theme }) => ({
fontSize: theme.typography.h6.fontSize,
@@ -13,3 +15,41 @@ const StyledSubheading = styled(Typography)(({ theme }) => ({
}))
export const SidebarTabSubheading = (props: TypographyProps) =>
+
+const StyledCard = styled(Card)(({ theme }) => ({
+ display: "flex",
+ flexDirection: "column",
+ rowGap: theme.spacing(SPACING.sm),
+ alignItems: "flex-start",
+ padding: theme.spacing(SPACING.md),
+}))
+
+const CardLabel = styled(Typography)(({ theme }) => ({
+ fontSize: theme.typography.h6.fontSize,
+ color: theme.palette.text.secondary,
+ lineHeight: 1,
+}))
+
+const CardValue = styled(Typography)(({ theme }) => ({
+ fontSize: theme.typography.h4.fontSize,
+ fontWeight: theme.typography.fontWeightMedium,
+ lineHeight: 1,
+}))
+
+export interface SidebarTabProminentValueCardProps {
+ label: string
+ value: string
+}
+
+export const SidebarTabProminentValueCard = ({ label, value }: SidebarTabProminentValueCardProps) => (
+
+
+
+ {label}
+
+
+ {value}
+
+
+
+)
diff --git a/internal/lookout/ui/src/services/lookoutV2/useGetJobSchedulingReport.ts b/internal/lookout/ui/src/services/lookoutV2/useGetJobSchedulingReport.ts
new file mode 100644
index 00000000000..f1de0ff73d3
--- /dev/null
+++ b/internal/lookout/ui/src/services/lookoutV2/useGetJobSchedulingReport.ts
@@ -0,0 +1,27 @@
+import { useQuery } from "@tanstack/react-query"
+
+import { useGetUiConfig } from "./useGetUiConfig"
+import { SchedulerReportingApi, Configuration, SchedulerobjectsJobReport } from "../../openapi/schedulerobjects"
+import { getErrorMessage } from "../../utils"
+
+export const useGetJobSchedulingReport = (jobId: string, enabled = true) => {
+ const { data: uiConfig } = useGetUiConfig(enabled)
+ const armadaApiBaseUrl = uiConfig?.armadaApiBaseUrl
+
+ const schedulerReportingApiConfiguration: Configuration = new Configuration({ basePath: armadaApiBaseUrl })
+ const schedulerReportingApi = new SchedulerReportingApi(schedulerReportingApiConfiguration)
+
+ return useQuery({
+ queryKey: ["getJobSchedulingReport", jobId],
+ queryFn: async ({ signal }) => {
+ try {
+ return await schedulerReportingApi.getJobReport({ jobId }, { signal })
+ } catch (e) {
+ throw await getErrorMessage(e)
+ }
+ },
+ enabled: Boolean(enabled && armadaApiBaseUrl),
+ refetchOnMount: false,
+ staleTime: 30_000,
+ })
+}
diff --git a/internal/lookout/ui/src/services/lookoutV2/useGetQueueSchedulingReport.ts b/internal/lookout/ui/src/services/lookoutV2/useGetQueueSchedulingReport.ts
new file mode 100644
index 00000000000..3612c03189e
--- /dev/null
+++ b/internal/lookout/ui/src/services/lookoutV2/useGetQueueSchedulingReport.ts
@@ -0,0 +1,27 @@
+import { useQuery } from "@tanstack/react-query"
+
+import { useGetUiConfig } from "./useGetUiConfig"
+import { SchedulerReportingApi, Configuration, SchedulerobjectsQueueReport } from "../../openapi/schedulerobjects"
+import { getErrorMessage } from "../../utils"
+
+export const useGetQueueSchedulingReport = (queueName: string, verbosity: number, enabled = true) => {
+ const { data: uiConfig } = useGetUiConfig(enabled)
+ const armadaApiBaseUrl = uiConfig?.armadaApiBaseUrl
+
+ const schedulerReportingApiConfiguration: Configuration = new Configuration({ basePath: armadaApiBaseUrl })
+ const schedulerReportingApi = new SchedulerReportingApi(schedulerReportingApiConfiguration)
+
+ return useQuery({
+ queryKey: ["getQueueSchedulingReport", queueName, verbosity],
+ queryFn: async ({ signal }) => {
+ try {
+ return await schedulerReportingApi.getQueueReport({ queueName, verbosity }, { signal })
+ } catch (e) {
+ throw await getErrorMessage(e)
+ }
+ },
+ enabled: Boolean(enabled && armadaApiBaseUrl),
+ refetchOnMount: false,
+ staleTime: 30_000,
+ })
+}
diff --git a/internal/lookout/ui/src/services/lookoutV2/useGetSchedulingReport.ts b/internal/lookout/ui/src/services/lookoutV2/useGetSchedulingReport.ts
new file mode 100644
index 00000000000..c9c46caa868
--- /dev/null
+++ b/internal/lookout/ui/src/services/lookoutV2/useGetSchedulingReport.ts
@@ -0,0 +1,27 @@
+import { useQuery } from "@tanstack/react-query"
+
+import { useGetUiConfig } from "./useGetUiConfig"
+import { SchedulerReportingApi, Configuration, SchedulerobjectsSchedulingReport } from "../../openapi/schedulerobjects"
+import { getErrorMessage } from "../../utils"
+
+export const useGetSchedulingReport = (verbosity: number, enabled = true) => {
+ const { data: uiConfig } = useGetUiConfig(enabled)
+ const armadaApiBaseUrl = uiConfig?.armadaApiBaseUrl
+
+ const schedulerReportingApiConfiguration: Configuration = new Configuration({ basePath: armadaApiBaseUrl })
+ const schedulerReportingApi = new SchedulerReportingApi(schedulerReportingApiConfiguration)
+
+ return useQuery({
+ queryKey: ["getSchedulingReport", verbosity],
+ queryFn: async ({ signal }) => {
+ try {
+ return await schedulerReportingApi.getSchedulingReport({ verbosity }, { signal })
+ } catch (e) {
+ throw await getErrorMessage(e)
+ }
+ },
+ enabled: Boolean(enabled && armadaApiBaseUrl),
+ refetchOnMount: false,
+ staleTime: 30_000,
+ })
+}
diff --git a/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts b/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts
new file mode 100644
index 00000000000..683c41b3062
--- /dev/null
+++ b/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts
@@ -0,0 +1,81 @@
+import { useQuery } from "@tanstack/react-query"
+
+import { getErrorMessage, OidcConfig, UIConfig } from "../../utils"
+
+const DEFAULT_OIDC_CONFIG: OidcConfig = {
+ authority: "",
+ clientId: "",
+ scope: "",
+}
+
+const DEFAULT_UI_CONFIG: UIConfig = {
+ armadaApiBaseUrl: "",
+ userAnnotationPrefix: "",
+ binocularsBaseUrlPattern: "",
+ jobSetsAutoRefreshMs: undefined,
+ jobsAutoRefreshMs: undefined,
+ debugEnabled: false,
+ fakeDataEnabled: false,
+ customTitle: "",
+ oidcEnabled: false,
+ oidc: undefined,
+ commandSpecs: [],
+ backend: undefined,
+}
+
+export const useGetUiConfig = (enabled = true) => {
+ const searchParams = new URLSearchParams(window.location.search)
+
+ const config = {
+ ...DEFAULT_UI_CONFIG,
+ debugEnabled: searchParams.has("debug"),
+ fakeDataEnabled: searchParams.has("fakeData"),
+ }
+
+ return useQuery({
+ queryKey: ["useGetUiConfig"],
+ queryFn: async ({ signal }) => {
+ try {
+ const response = await fetch("/config", { signal })
+ const json = await response.json()
+
+ if (json.ArmadaApiBaseUrl) config.armadaApiBaseUrl = json.ArmadaApiBaseUrl
+ if (json.UserAnnotationPrefix) config.userAnnotationPrefix = json.UserAnnotationPrefix
+ if (json.BinocularsBaseUrlPattern) config.binocularsBaseUrlPattern = json.BinocularsBaseUrlPattern
+ if (json.JobSetsAutoRefreshMs) config.jobSetsAutoRefreshMs = json.JobSetsAutoRefreshMs
+ if (json.JobsAutoRefreshMs) config.jobsAutoRefreshMs = json.JobsAutoRefreshMs
+ if (json.CustomTitle) config.customTitle = json.CustomTitle
+ if (json.OidcEnabled) config.oidcEnabled = json.OidcEnabled
+
+ if (json.Oidc) {
+ config.oidc = DEFAULT_OIDC_CONFIG
+ if (json.Oidc.Authority) config.oidc.authority = json.Oidc.Authority
+ if (json.Oidc.ClientId) config.oidc.clientId = json.Oidc.ClientId
+ if (json.Oidc.Scope) config.oidc.scope = json.Oidc.Scope
+ }
+
+ if (json.CommandSpecs) {
+ config.commandSpecs = json.CommandSpecs.map(({ Name, Template }: { Name: string; Template: string }) => ({
+ name: Name,
+ template: Template,
+ }))
+ }
+
+ if (json.Backend) config.backend = json.Backend
+ } catch (e) {
+ throw await getErrorMessage(e)
+ }
+
+ config.oidcEnabled =
+ searchParams.get("oidcEnabled") === JSON.stringify(true) || window.location.pathname === "/oidc"
+
+ const backend = searchParams.get("backend")
+ if (backend) config.backend = backend
+
+ return config
+ },
+ enabled,
+ refetchOnMount: false,
+ staleTime: Infinity,
+ })
+}
diff --git a/internal/lookout/ui/src/utils.tsx b/internal/lookout/ui/src/utils.tsx
index eefeedae5d4..3c4caf410d7 100644
--- a/internal/lookout/ui/src/utils.tsx
+++ b/internal/lookout/ui/src/utils.tsx
@@ -14,7 +14,7 @@ export interface CommandSpec {
template: string
}
-interface UIConfig {
+export interface UIConfig {
armadaApiBaseUrl: string
userAnnotationPrefix: string
binocularsBaseUrlPattern: string
diff --git a/internal/lookoutv2/gen/models/filter.go b/internal/lookoutv2/gen/models/filter.go
index 79f30ae8841..92d01a29f86 100644
--- a/internal/lookoutv2/gen/models/filter.go
+++ b/internal/lookoutv2/gen/models/filter.go
@@ -30,7 +30,7 @@ type Filter struct {
// match
// Required: true
- // Enum: [exact anyOf startsWith contains greaterThan lessThan greaterThanOrEqualTo lessThanOrEqualTo exists]
+ // Enum: ["exact","anyOf","startsWith","contains","greaterThan","lessThan","greaterThanOrEqualTo","lessThanOrEqualTo","exists"]
Match string `json:"match"`
// value
diff --git a/internal/lookoutv2/gen/models/job.go b/internal/lookoutv2/gen/models/job.go
index 5a1ee2afbc6..203beaf9af5 100644
--- a/internal/lookoutv2/gen/models/job.go
+++ b/internal/lookoutv2/gen/models/job.go
@@ -111,7 +111,7 @@ type Job struct {
// state
// Required: true
- // Enum: [QUEUED PENDING RUNNING SUCCEEDED FAILED CANCELLED PREEMPTED LEASED REJECTED]
+ // Enum: ["QUEUED","PENDING","RUNNING","SUCCEEDED","FAILED","CANCELLED","PREEMPTED","LEASED","REJECTED"]
State string `json:"state"`
// submitted
@@ -492,6 +492,11 @@ func (m *Job) contextValidateRuns(ctx context.Context, formats strfmt.Registry)
for i := 0; i < len(m.Runs); i++ {
if m.Runs[i] != nil {
+
+ if swag.IsZero(m.Runs[i]) { // not required
+ return nil
+ }
+
if err := m.Runs[i].ContextValidate(ctx, formats); err != nil {
if ve, ok := err.(*errors.Validation); ok {
return ve.ValidateName("runs" + "." + strconv.Itoa(i))
diff --git a/internal/lookoutv2/gen/models/order.go b/internal/lookoutv2/gen/models/order.go
index 1be5c6c6084..a40eac27592 100644
--- a/internal/lookoutv2/gen/models/order.go
+++ b/internal/lookoutv2/gen/models/order.go
@@ -22,7 +22,7 @@ type Order struct {
// direction
// Required: true
- // Enum: [ASC DESC]
+ // Enum: ["ASC","DESC"]
Direction string `json:"direction"`
// field
diff --git a/internal/lookoutv2/gen/models/run.go b/internal/lookoutv2/gen/models/run.go
index c2e46a61319..59df7e4bb1e 100644
--- a/internal/lookoutv2/gen/models/run.go
+++ b/internal/lookoutv2/gen/models/run.go
@@ -34,7 +34,7 @@ type Run struct {
// job run state
// Required: true
- // Enum: [RUN_PENDING RUN_RUNNING RUN_SUCCEEDED RUN_FAILED RUN_TERMINATED RUN_PREEMPTED RUN_UNABLE_TO_SCHEDULE RUN_LEASE_RETURNED RUN_LEASE_EXPIRED RUN_MAX_RUNS_EXCEEDED RUN_LEASED RUN_CANCELLED]
+ // Enum: ["RUN_PENDING","RUN_RUNNING","RUN_SUCCEEDED","RUN_FAILED","RUN_TERMINATED","RUN_PREEMPTED","RUN_UNABLE_TO_SCHEDULE","RUN_LEASE_RETURNED","RUN_LEASE_EXPIRED","RUN_MAX_RUNS_EXCEEDED","RUN_LEASED","RUN_CANCELLED"]
JobRunState string `json:"jobRunState"`
// leased
diff --git a/internal/lookoutv2/gen/restapi/operations/get_job_error_parameters.go b/internal/lookoutv2/gen/restapi/operations/get_job_error_parameters.go
index 865ee37d4e2..54580325790 100644
--- a/internal/lookoutv2/gen/restapi/operations/get_job_error_parameters.go
+++ b/internal/lookoutv2/gen/restapi/operations/get_job_error_parameters.go
@@ -6,7 +6,6 @@ package operations
// Editing this file might prove futile when you re-run the swagger generate command
import (
- "context"
"io"
"net/http"
@@ -64,7 +63,7 @@ func (o *GetJobErrorParams) BindRequest(r *http.Request, route *middleware.Match
res = append(res, err)
}
- ctx := validate.WithOperationRequest(context.Background())
+ ctx := validate.WithOperationRequest(r.Context())
if err := body.ContextValidate(ctx, route.Formats); err != nil {
res = append(res, err)
}
diff --git a/internal/lookoutv2/gen/restapi/operations/get_job_run_debug_message_parameters.go b/internal/lookoutv2/gen/restapi/operations/get_job_run_debug_message_parameters.go
index 78281bd4bef..1c47c4e9ac4 100644
--- a/internal/lookoutv2/gen/restapi/operations/get_job_run_debug_message_parameters.go
+++ b/internal/lookoutv2/gen/restapi/operations/get_job_run_debug_message_parameters.go
@@ -6,7 +6,6 @@ package operations
// Editing this file might prove futile when you re-run the swagger generate command
import (
- "context"
"io"
"net/http"
@@ -64,7 +63,7 @@ func (o *GetJobRunDebugMessageParams) BindRequest(r *http.Request, route *middle
res = append(res, err)
}
- ctx := validate.WithOperationRequest(context.Background())
+ ctx := validate.WithOperationRequest(r.Context())
if err := body.ContextValidate(ctx, route.Formats); err != nil {
res = append(res, err)
}
diff --git a/internal/lookoutv2/gen/restapi/operations/get_job_run_error_parameters.go b/internal/lookoutv2/gen/restapi/operations/get_job_run_error_parameters.go
index 103623d3fcf..6ac94c9acae 100644
--- a/internal/lookoutv2/gen/restapi/operations/get_job_run_error_parameters.go
+++ b/internal/lookoutv2/gen/restapi/operations/get_job_run_error_parameters.go
@@ -6,7 +6,6 @@ package operations
// Editing this file might prove futile when you re-run the swagger generate command
import (
- "context"
"io"
"net/http"
@@ -64,7 +63,7 @@ func (o *GetJobRunErrorParams) BindRequest(r *http.Request, route *middleware.Ma
res = append(res, err)
}
- ctx := validate.WithOperationRequest(context.Background())
+ ctx := validate.WithOperationRequest(r.Context())
if err := body.ContextValidate(ctx, route.Formats); err != nil {
res = append(res, err)
}
diff --git a/internal/lookoutv2/gen/restapi/operations/get_job_spec_parameters.go b/internal/lookoutv2/gen/restapi/operations/get_job_spec_parameters.go
index e0defe162b8..7c0a2322f29 100644
--- a/internal/lookoutv2/gen/restapi/operations/get_job_spec_parameters.go
+++ b/internal/lookoutv2/gen/restapi/operations/get_job_spec_parameters.go
@@ -6,7 +6,6 @@ package operations
// Editing this file might prove futile when you re-run the swagger generate command
import (
- "context"
"io"
"net/http"
@@ -64,7 +63,7 @@ func (o *GetJobSpecParams) BindRequest(r *http.Request, route *middleware.Matche
res = append(res, err)
}
- ctx := validate.WithOperationRequest(context.Background())
+ ctx := validate.WithOperationRequest(r.Context())
if err := body.ContextValidate(ctx, route.Formats); err != nil {
res = append(res, err)
}
diff --git a/internal/lookoutv2/gen/restapi/operations/get_jobs.go b/internal/lookoutv2/gen/restapi/operations/get_jobs.go
index 308570bd5b4..1cb9fc27916 100644
--- a/internal/lookoutv2/gen/restapi/operations/get_jobs.go
+++ b/internal/lookoutv2/gen/restapi/operations/get_jobs.go
@@ -174,6 +174,11 @@ func (o *GetJobsBody) contextValidateFilters(ctx context.Context, formats strfmt
for i := 0; i < len(o.Filters); i++ {
if o.Filters[i] != nil {
+
+ if swag.IsZero(o.Filters[i]) { // not required
+ return nil
+ }
+
if err := o.Filters[i].ContextValidate(ctx, formats); err != nil {
if ve, ok := err.(*errors.Validation); ok {
return ve.ValidateName("getJobsRequest" + "." + "filters" + "." + strconv.Itoa(i))
@@ -192,6 +197,7 @@ func (o *GetJobsBody) contextValidateFilters(ctx context.Context, formats strfmt
func (o *GetJobsBody) contextValidateOrder(ctx context.Context, formats strfmt.Registry) error {
if o.Order != nil {
+
if err := o.Order.ContextValidate(ctx, formats); err != nil {
if ve, ok := err.(*errors.Validation); ok {
return ve.ValidateName("getJobsRequest" + "." + "order")
@@ -291,6 +297,11 @@ func (o *GetJobsOKBody) contextValidateJobs(ctx context.Context, formats strfmt.
for i := 0; i < len(o.Jobs); i++ {
if o.Jobs[i] != nil {
+
+ if swag.IsZero(o.Jobs[i]) { // not required
+ return nil
+ }
+
if err := o.Jobs[i].ContextValidate(ctx, formats); err != nil {
if ve, ok := err.(*errors.Validation); ok {
return ve.ValidateName("getJobsOK" + "." + "jobs" + "." + strconv.Itoa(i))
diff --git a/internal/lookoutv2/gen/restapi/operations/get_jobs_parameters.go b/internal/lookoutv2/gen/restapi/operations/get_jobs_parameters.go
index 3085016524a..501ac5de4c3 100644
--- a/internal/lookoutv2/gen/restapi/operations/get_jobs_parameters.go
+++ b/internal/lookoutv2/gen/restapi/operations/get_jobs_parameters.go
@@ -6,7 +6,6 @@ package operations
// Editing this file might prove futile when you re-run the swagger generate command
import (
- "context"
"io"
"net/http"
@@ -76,7 +75,7 @@ func (o *GetJobsParams) BindRequest(r *http.Request, route *middleware.MatchedRo
res = append(res, err)
}
- ctx := validate.WithOperationRequest(context.Background())
+ ctx := validate.WithOperationRequest(r.Context())
if err := body.ContextValidate(ctx, route.Formats); err != nil {
res = append(res, err)
}
diff --git a/internal/lookoutv2/gen/restapi/operations/group_jobs.go b/internal/lookoutv2/gen/restapi/operations/group_jobs.go
index 89f808f197d..1f1ff8cbc26 100644
--- a/internal/lookoutv2/gen/restapi/operations/group_jobs.go
+++ b/internal/lookoutv2/gen/restapi/operations/group_jobs.go
@@ -223,6 +223,11 @@ func (o *GroupJobsBody) contextValidateFilters(ctx context.Context, formats strf
for i := 0; i < len(o.Filters); i++ {
if o.Filters[i] != nil {
+
+ if swag.IsZero(o.Filters[i]) { // not required
+ return nil
+ }
+
if err := o.Filters[i].ContextValidate(ctx, formats); err != nil {
if ve, ok := err.(*errors.Validation); ok {
return ve.ValidateName("groupJobsRequest" + "." + "filters" + "." + strconv.Itoa(i))
@@ -241,6 +246,7 @@ func (o *GroupJobsBody) contextValidateFilters(ctx context.Context, formats strf
func (o *GroupJobsBody) contextValidateGroupedField(ctx context.Context, formats strfmt.Registry) error {
if o.GroupedField != nil {
+
if err := o.GroupedField.ContextValidate(ctx, formats); err != nil {
if ve, ok := err.(*errors.Validation); ok {
return ve.ValidateName("groupJobsRequest" + "." + "groupedField")
@@ -257,6 +263,7 @@ func (o *GroupJobsBody) contextValidateGroupedField(ctx context.Context, formats
func (o *GroupJobsBody) contextValidateOrder(ctx context.Context, formats strfmt.Registry) error {
if o.Order != nil {
+
if err := o.Order.ContextValidate(ctx, formats); err != nil {
if ve, ok := err.(*errors.Validation); ok {
return ve.ValidateName("groupJobsRequest" + "." + "order")
@@ -358,6 +365,11 @@ func (o *GroupJobsOKBody) contextValidateGroups(ctx context.Context, formats str
for i := 0; i < len(o.Groups); i++ {
if o.Groups[i] != nil {
+
+ if swag.IsZero(o.Groups[i]) { // not required
+ return nil
+ }
+
if err := o.Groups[i].ContextValidate(ctx, formats); err != nil {
if ve, ok := err.(*errors.Validation); ok {
return ve.ValidateName("groupJobsOK" + "." + "groups" + "." + strconv.Itoa(i))
diff --git a/internal/lookoutv2/gen/restapi/operations/group_jobs_parameters.go b/internal/lookoutv2/gen/restapi/operations/group_jobs_parameters.go
index 6df300eeabb..3a315cc5c84 100644
--- a/internal/lookoutv2/gen/restapi/operations/group_jobs_parameters.go
+++ b/internal/lookoutv2/gen/restapi/operations/group_jobs_parameters.go
@@ -6,7 +6,6 @@ package operations
// Editing this file might prove futile when you re-run the swagger generate command
import (
- "context"
"io"
"net/http"
@@ -76,7 +75,7 @@ func (o *GroupJobsParams) BindRequest(r *http.Request, route *middleware.Matched
res = append(res, err)
}
- ctx := validate.WithOperationRequest(context.Background())
+ ctx := validate.WithOperationRequest(r.Context())
if err := body.ContextValidate(ctx, route.Formats); err != nil {
res = append(res, err)
}
diff --git a/internal/lookoutv2/gen/restapi/operations/lookout_api.go b/internal/lookoutv2/gen/restapi/operations/lookout_api.go
index f4d99c99ee4..664c3777bd9 100644
--- a/internal/lookoutv2/gen/restapi/operations/lookout_api.go
+++ b/internal/lookoutv2/gen/restapi/operations/lookout_api.go
@@ -377,6 +377,6 @@ func (o *LookoutAPI) AddMiddlewareFor(method, path string, builder middleware.Bu
}
o.Init()
if h, ok := o.handlers[um][path]; ok {
- o.handlers[method][path] = builder(h)
+ o.handlers[um][path] = builder(h)
}
}
diff --git a/internal/lookoutv2/gen/restapi/server.go b/internal/lookoutv2/gen/restapi/server.go
index 4af6552d17e..97cf93cee70 100644
--- a/internal/lookoutv2/gen/restapi/server.go
+++ b/internal/lookoutv2/gen/restapi/server.go
@@ -8,7 +8,6 @@ import (
"crypto/x509"
"errors"
"fmt"
- "io/ioutil"
"log"
"net"
"net/http"
@@ -81,7 +80,7 @@ type Server struct {
ListenLimit int `long:"listen-limit" description:"limit the number of outstanding requests"`
KeepAlive time.Duration `long:"keep-alive" description:"sets the TCP keep-alive timeouts on accepted connections. It prunes dead TCP connections ( e.g. closing laptop mid-download)" default:"3m"`
ReadTimeout time.Duration `long:"read-timeout" description:"maximum duration before timing out read of the request" default:"30s"`
- WriteTimeout time.Duration `long:"write-timeout" description:"maximum duration before timing out write of the response" default:"60s"`
+ WriteTimeout time.Duration `long:"write-timeout" description:"maximum duration before timing out write of the response" default:"30s"`
httpServerL net.Listener
TLSHost string `long:"tls-host" description:"the IP to listen on for tls, when not specified it's the same as --host" env:"TLS_HOST"`
@@ -274,7 +273,7 @@ func (s *Server) Serve() (err error) {
if s.TLSCACertificate != "" {
// include specified CA certificate
- caCert, caCertErr := ioutil.ReadFile(string(s.TLSCACertificate))
+ caCert, caCertErr := os.ReadFile(string(s.TLSCACertificate))
if caCertErr != nil {
return caCertErr
}
diff --git a/internal/scheduler/nodedb/nodeiteration.go b/internal/scheduler/nodedb/nodeiteration.go
index fda04565657..d8260270fa2 100644
--- a/internal/scheduler/nodedb/nodeiteration.go
+++ b/internal/scheduler/nodedb/nodeiteration.go
@@ -219,7 +219,7 @@ type NodeTypeIterator struct {
priority int32
// Used to index into node.keys to assert that keys are always increasing.
// This to detect if the iterator gets stuck.
- // TODO(albin): WithField better testing we should be able to remove this.
+ // TODO(albin): With better testing we should be able to remove this.
keyIndex int
// Name of the memdb index used for node iteration.
// Should correspond to the priority set for this iterator.
diff --git a/internal/scheduler/reports/leader_proxying_reports_server.go b/internal/scheduler/reports/leader_proxying_reports_server.go
index 9275db3fef9..ebc795dd714 100644
--- a/internal/scheduler/reports/leader_proxying_reports_server.go
+++ b/internal/scheduler/reports/leader_proxying_reports_server.go
@@ -6,7 +6,7 @@ import (
"google.golang.org/grpc"
"github.com/armadaproject/armada/internal/scheduler/leader"
- "github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
+ "github.com/armadaproject/armada/pkg/api/schedulerobjects"
)
type LeaderProxyingSchedulingReportsServer struct {
diff --git a/internal/scheduler/reports/leader_proxying_reports_server_test.go b/internal/scheduler/reports/leader_proxying_reports_server_test.go
index 14adacf3812..ace96e7bb00 100644
--- a/internal/scheduler/reports/leader_proxying_reports_server_test.go
+++ b/internal/scheduler/reports/leader_proxying_reports_server_test.go
@@ -10,7 +10,7 @@ import (
"google.golang.org/grpc"
"github.com/armadaproject/armada/internal/common/armadacontext"
- "github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
+ "github.com/armadaproject/armada/pkg/api/schedulerobjects"
)
func TestLeaderProxyingSchedulingReportsServer_GetJobReports(t *testing.T) {
diff --git a/internal/scheduler/reports/proxying_reports_server.go b/internal/scheduler/reports/proxying_reports_server.go
index 12794f7a951..f52ffc3944b 100644
--- a/internal/scheduler/reports/proxying_reports_server.go
+++ b/internal/scheduler/reports/proxying_reports_server.go
@@ -4,7 +4,7 @@ import (
"context"
"time"
- "github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
+ "github.com/armadaproject/armada/pkg/api/schedulerobjects"
)
type ProxyingSchedulingReportsServer struct {
diff --git a/internal/scheduler/reports/proxying_reports_server_test.go b/internal/scheduler/reports/proxying_reports_server_test.go
index 896734e07fb..585735860c9 100644
--- a/internal/scheduler/reports/proxying_reports_server_test.go
+++ b/internal/scheduler/reports/proxying_reports_server_test.go
@@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/armadaproject/armada/internal/common/armadacontext"
- "github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
+ "github.com/armadaproject/armada/pkg/api/schedulerobjects"
)
func TestProxyingSchedulingReportsServer_GetJobReports(t *testing.T) {
diff --git a/internal/scheduler/reports/server.go b/internal/scheduler/reports/server.go
index da8bcf49b47..6c9653ffdab 100644
--- a/internal/scheduler/reports/server.go
+++ b/internal/scheduler/reports/server.go
@@ -11,7 +11,7 @@ import (
"github.com/openconfig/goyang/pkg/indent"
"google.golang.org/grpc/codes"
- "github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
+ "github.com/armadaproject/armada/pkg/api/schedulerobjects"
)
type Server struct {
diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go
index c45097a59df..6c21f07cf39 100644
--- a/internal/scheduler/schedulerapp.go
+++ b/internal/scheduler/schedulerapp.go
@@ -41,9 +41,9 @@ import (
"github.com/armadaproject/armada/internal/scheduler/metrics"
"github.com/armadaproject/armada/internal/scheduler/queue"
"github.com/armadaproject/armada/internal/scheduler/reports"
- "github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/scheduling"
"github.com/armadaproject/armada/pkg/api"
+ "github.com/armadaproject/armada/pkg/api/schedulerobjects"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/client"
"github.com/armadaproject/armada/pkg/executorapi"
@@ -225,8 +225,8 @@ func Run(config schedulerconfig.Configuration) error {
reportServer := reports.NewServer(schedulingContextRepository)
leaderClientConnectionProvider := leader.NewLeaderConnectionProvider(leaderController, config.Leader)
- schedulingReportServer := reports.NewLeaderProxyingSchedulingReportsServer(reportServer, leaderClientConnectionProvider)
- schedulerobjects.RegisterSchedulerReportingServer(grpcServer, schedulingReportServer)
+ schedulingSchedulerReportingServer := reports.NewLeaderProxyingSchedulingReportsServer(reportServer, leaderClientConnectionProvider)
+ schedulerobjects.RegisterSchedulerReportingServer(grpcServer, schedulingSchedulerReportingServer)
// ////////////////////////////////////////////////////////////////////////
// Scheduling
diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go
index 47b5921389c..b24bb7b1ff9 100644
--- a/internal/scheduler/scheduling/scheduling_algo.go
+++ b/internal/scheduler/scheduling/scheduling_algo.go
@@ -130,7 +130,7 @@ func (l *FairSchedulingAlgo) Schedule(
start := time.Now()
schedulerResult, sctx, err := l.SchedulePool(ctx, fsctx, pool.Name, pool.MarketDriven)
- ctx.Infof("Scheduled on executor pool %s in %v with error %v", pool, time.Now().Sub(start), err)
+ ctx.Infof("Scheduled on executor pool %s in %v with error %v", pool.Name, time.Now().Sub(start), err)
if errors.Is(err, context.DeadlineExceeded) {
// We've reached the scheduling time limit;
diff --git a/internal/server/server.go b/internal/server/server.go
index e35e7974244..5a61af58acc 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -25,7 +25,6 @@ import (
controlplaneeventspulsarutils "github.com/armadaproject/armada/internal/common/pulsarutils/controlplaneevents"
"github.com/armadaproject/armada/internal/common/pulsarutils/jobsetevents"
"github.com/armadaproject/armada/internal/scheduler/reports"
- "github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/server/configuration"
"github.com/armadaproject/armada/internal/server/event"
"github.com/armadaproject/armada/internal/server/executor"
@@ -33,6 +32,7 @@ import (
"github.com/armadaproject/armada/internal/server/queue"
"github.com/armadaproject/armada/internal/server/submit"
"github.com/armadaproject/armada/pkg/api"
+ "github.com/armadaproject/armada/pkg/api/schedulerobjects"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/client"
"github.com/armadaproject/armada/pkg/controlplaneevents"
diff --git a/magefiles/proto.go b/magefiles/proto.go
index 8e50be208d1..40e6ceb917d 100644
--- a/magefiles/proto.go
+++ b/magefiles/proto.go
@@ -99,6 +99,7 @@ func protoGenerate() error {
"internal/scheduler/schedulerobjects/*.proto",
"internal/scheduler/simulator/*.proto",
"pkg/api/binoculars/*.proto",
+ "pkg/api/schedulerobjects/*.proto",
"pkg/executorapi/*.proto",
}
for _, pattern := range patterns {
@@ -122,6 +123,11 @@ func protoGenerate() error {
return err
}
+ err = protoProtocRun(false, true, "./pkg/api/schedulerobjects/api", "pkg/api/schedulerobjects/scheduler_reporting.proto")
+ if err != nil {
+ return err
+ }
+
err = sh.Run(
"swagger", "generate", "spec",
"-m", "-o", "pkg/api/api.swagger.definitions.json",
@@ -145,6 +151,13 @@ func protoGenerate() error {
return err
}
}
+ if s, err := goOutput("run", "./scripts/merge_swagger/merge_swagger.go", "schedulerobjects/api.swagger.json"); err != nil {
+ return err
+ } else {
+ if err := os.WriteFile("pkg/api/schedulerobjects/api.swagger.json", []byte(s), 0o755); err != nil {
+ return err
+ }
+ }
if err := os.Remove("pkg/api/api.swagger.definitions.json"); err != nil {
return err
}
@@ -157,8 +170,12 @@ func protoGenerate() error {
if err != nil {
return err
}
+ err = sh.Run("templify", "-e", "-p=schedulerobjects", "-f=SwaggerJson", "pkg/api/schedulerobjects/api.swagger.json")
+ if err != nil {
+ return err
+ }
- err = sh.Run("goimports", "-w", "-local", "github.com/armadaproject/armada", "./pkg/api/", "./pkg/armadaevents/", "./pkg/controlplaneevents/", "./internal/scheduler/schedulerobjects/", "./pkg/executorapi/")
+ err = sh.Run("goimports", "-w", "-local", "github.com/armadaproject/armada", "./pkg/api/", "./pkg/armadaevents/", "./pkg/controlplaneevents/", "./internal/scheduler/schedulerobjects/", "./pkg/executorapi/", "./pkg/api/schedulerobjects/")
if err != nil {
return err
}
diff --git a/pkg/api/schedulerobjects/api.swagger.go b/pkg/api/schedulerobjects/api.swagger.go
new file mode 100644
index 00000000000..9f9dc8f7cfc
--- /dev/null
+++ b/pkg/api/schedulerobjects/api.swagger.go
@@ -0,0 +1,196 @@
+/*
+ * CODE GENERATED AUTOMATICALLY WITH
+ * github.com/wlbr/templify
+ * THIS FILE SHOULD NOT BE EDITED BY HAND
+ */
+
+package schedulerobjects
+
+// SwaggerJsonTemplate is a generated function returning the template as a string.
+// That string should be parsed by the functions of the golang's template package.
+func SwaggerJsonTemplate() string {
+ var tmpl = "{\n" +
+ " \"consumes\": [\n" +
+ " \"application/json\"\n" +
+ " ],\n" +
+ " \"produces\": [\n" +
+ " \"application/json\"\n" +
+ " ],\n" +
+ " \"swagger\": \"2.0\",\n" +
+ " \"info\": {\n" +
+ " \"title\": \"pkg/api/schedulerobjects/scheduler_reporting.proto\",\n" +
+ " \"version\": \"version not set\"\n" +
+ " },\n" +
+ " \"paths\": {\n" +
+ " \"/v1/job/{jobId}/scheduler-report\": {\n" +
+ " \"get\": {\n" +
+ " \"tags\": [\n" +
+ " \"SchedulerReporting\"\n" +
+ " ],\n" +
+ " \"summary\": \"Return the most recent scheduling report for each executor for the given job.\",\n" +
+ " \"operationId\": \"GetJobReport\",\n" +
+ " \"parameters\": [\n" +
+ " {\n" +
+ " \"type\": \"string\",\n" +
+ " \"name\": \"jobId\",\n" +
+ " \"in\": \"path\",\n" +
+ " \"required\": true\n" +
+ " }\n" +
+ " ],\n" +
+ " \"responses\": {\n" +
+ " \"200\": {\n" +
+ " \"description\": \"A successful response.\",\n" +
+ " \"schema\": {\n" +
+ " \"$ref\": \"#/definitions/schedulerobjectsJobReport\"\n" +
+ " }\n" +
+ " },\n" +
+ " \"default\": {\n" +
+ " \"description\": \"An unexpected error response.\",\n" +
+ " \"schema\": {\n" +
+ " \"$ref\": \"#/definitions/runtimeError\"\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " },\n" +
+ " \"/v1/queue/{queueName}/scheduler-report\": {\n" +
+ " \"get\": {\n" +
+ " \"tags\": [\n" +
+ " \"SchedulerReporting\"\n" +
+ " ],\n" +
+ " \"summary\": \"Return the most recent report scheduling for each executor for the given queue.\",\n" +
+ " \"operationId\": \"GetQueueReport\",\n" +
+ " \"parameters\": [\n" +
+ " {\n" +
+ " \"type\": \"string\",\n" +
+ " \"name\": \"queueName\",\n" +
+ " \"in\": \"path\",\n" +
+ " \"required\": true\n" +
+ " },\n" +
+ " {\n" +
+ " \"type\": \"integer\",\n" +
+ " \"format\": \"int32\",\n" +
+ " \"name\": \"verbosity\",\n" +
+ " \"in\": \"query\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"responses\": {\n" +
+ " \"200\": {\n" +
+ " \"description\": \"A successful response.\",\n" +
+ " \"schema\": {\n" +
+ " \"$ref\": \"#/definitions/schedulerobjectsQueueReport\"\n" +
+ " }\n" +
+ " },\n" +
+ " \"default\": {\n" +
+ " \"description\": \"An unexpected error response.\",\n" +
+ " \"schema\": {\n" +
+ " \"$ref\": \"#/definitions/runtimeError\"\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " },\n" +
+ " \"/v1/scheduling-report\": {\n" +
+ " \"get\": {\n" +
+ " \"tags\": [\n" +
+ " \"SchedulerReporting\"\n" +
+ " ],\n" +
+ " \"summary\": \"Return the most recent scheduling report for each executor.\",\n" +
+ " \"operationId\": \"GetSchedulingReport\",\n" +
+ " \"parameters\": [\n" +
+ " {\n" +
+ " \"type\": \"string\",\n" +
+ " \"name\": \"mostRecentForQueue.queueName\",\n" +
+ " \"in\": \"query\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"type\": \"string\",\n" +
+ " \"name\": \"mostRecentForJob.jobId\",\n" +
+ " \"in\": \"query\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"type\": \"integer\",\n" +
+ " \"format\": \"int32\",\n" +
+ " \"name\": \"verbosity\",\n" +
+ " \"in\": \"query\"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"responses\": {\n" +
+ " \"200\": {\n" +
+ " \"description\": \"A successful response.\",\n" +
+ " \"schema\": {\n" +
+ " \"$ref\": \"#/definitions/schedulerobjectsSchedulingReport\"\n" +
+ " }\n" +
+ " },\n" +
+ " \"default\": {\n" +
+ " \"description\": \"An unexpected error response.\",\n" +
+ " \"schema\": {\n" +
+ " \"$ref\": \"#/definitions/runtimeError\"\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " },\n" +
+ " \"definitions\": {\n" +
+ " \"protobufAny\": {\n" +
+ " \"type\": \"object\",\n" +
+ " \"properties\": {\n" +
+ " \"typeUrl\": {\n" +
+ " \"type\": \"string\"\n" +
+ " },\n" +
+ " \"value\": {\n" +
+ " \"type\": \"string\",\n" +
+ " \"format\": \"byte\"\n" +
+ " }\n" +
+ " }\n" +
+ " },\n" +
+ " \"runtimeError\": {\n" +
+ " \"type\": \"object\",\n" +
+ " \"properties\": {\n" +
+ " \"code\": {\n" +
+ " \"type\": \"integer\",\n" +
+ " \"format\": \"int32\"\n" +
+ " },\n" +
+ " \"details\": {\n" +
+ " \"type\": \"array\",\n" +
+ " \"items\": {\n" +
+ " \"$ref\": \"#/definitions/protobufAny\"\n" +
+ " }\n" +
+ " },\n" +
+ " \"error\": {\n" +
+ " \"type\": \"string\"\n" +
+ " },\n" +
+ " \"message\": {\n" +
+ " \"type\": \"string\"\n" +
+ " }\n" +
+ " }\n" +
+ " },\n" +
+ " \"schedulerobjectsJobReport\": {\n" +
+ " \"type\": \"object\",\n" +
+ " \"properties\": {\n" +
+ " \"report\": {\n" +
+ " \"type\": \"string\"\n" +
+ " }\n" +
+ " }\n" +
+ " },\n" +
+ " \"schedulerobjectsQueueReport\": {\n" +
+ " \"type\": \"object\",\n" +
+ " \"properties\": {\n" +
+ " \"report\": {\n" +
+ " \"type\": \"string\"\n" +
+ " }\n" +
+ " }\n" +
+ " },\n" +
+ " \"schedulerobjectsSchedulingReport\": {\n" +
+ " \"type\": \"object\",\n" +
+ " \"properties\": {\n" +
+ " \"report\": {\n" +
+ " \"type\": \"string\"\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ " }\n" +
+ "}"
+ return tmpl
+}
diff --git a/pkg/api/schedulerobjects/api.swagger.json b/pkg/api/schedulerobjects/api.swagger.json
new file mode 100644
index 00000000000..b305900d220
--- /dev/null
+++ b/pkg/api/schedulerobjects/api.swagger.json
@@ -0,0 +1,183 @@
+{
+ "consumes": [
+ "application/json"
+ ],
+ "produces": [
+ "application/json"
+ ],
+ "swagger": "2.0",
+ "info": {
+ "title": "pkg/api/schedulerobjects/scheduler_reporting.proto",
+ "version": "version not set"
+ },
+ "paths": {
+ "/v1/job/{jobId}/scheduler-report": {
+ "get": {
+ "tags": [
+ "SchedulerReporting"
+ ],
+ "summary": "Return the most recent scheduling report for each executor for the given job.",
+ "operationId": "GetJobReport",
+ "parameters": [
+ {
+ "type": "string",
+ "name": "jobId",
+ "in": "path",
+ "required": true
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "A successful response.",
+ "schema": {
+ "$ref": "#/definitions/schedulerobjectsJobReport"
+ }
+ },
+ "default": {
+ "description": "An unexpected error response.",
+ "schema": {
+ "$ref": "#/definitions/runtimeError"
+ }
+ }
+ }
+ }
+ },
+ "/v1/queue/{queueName}/scheduler-report": {
+ "get": {
+ "tags": [
+ "SchedulerReporting"
+ ],
+ "summary": "Return the most recent report scheduling for each executor for the given queue.",
+ "operationId": "GetQueueReport",
+ "parameters": [
+ {
+ "type": "string",
+ "name": "queueName",
+ "in": "path",
+ "required": true
+ },
+ {
+ "type": "integer",
+ "format": "int32",
+ "name": "verbosity",
+ "in": "query"
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "A successful response.",
+ "schema": {
+ "$ref": "#/definitions/schedulerobjectsQueueReport"
+ }
+ },
+ "default": {
+ "description": "An unexpected error response.",
+ "schema": {
+ "$ref": "#/definitions/runtimeError"
+ }
+ }
+ }
+ }
+ },
+ "/v1/scheduling-report": {
+ "get": {
+ "tags": [
+ "SchedulerReporting"
+ ],
+ "summary": "Return the most recent scheduling report for each executor.",
+ "operationId": "GetSchedulingReport",
+ "parameters": [
+ {
+ "type": "string",
+ "name": "mostRecentForQueue.queueName",
+ "in": "query"
+ },
+ {
+ "type": "string",
+ "name": "mostRecentForJob.jobId",
+ "in": "query"
+ },
+ {
+ "type": "integer",
+ "format": "int32",
+ "name": "verbosity",
+ "in": "query"
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "A successful response.",
+ "schema": {
+ "$ref": "#/definitions/schedulerobjectsSchedulingReport"
+ }
+ },
+ "default": {
+ "description": "An unexpected error response.",
+ "schema": {
+ "$ref": "#/definitions/runtimeError"
+ }
+ }
+ }
+ }
+ }
+ },
+ "definitions": {
+ "protobufAny": {
+ "type": "object",
+ "properties": {
+ "typeUrl": {
+ "type": "string"
+ },
+ "value": {
+ "type": "string",
+ "format": "byte"
+ }
+ }
+ },
+ "runtimeError": {
+ "type": "object",
+ "properties": {
+ "code": {
+ "type": "integer",
+ "format": "int32"
+ },
+ "details": {
+ "type": "array",
+ "items": {
+ "$ref": "#/definitions/protobufAny"
+ }
+ },
+ "error": {
+ "type": "string"
+ },
+ "message": {
+ "type": "string"
+ }
+ }
+ },
+ "schedulerobjectsJobReport": {
+ "type": "object",
+ "properties": {
+ "report": {
+ "type": "string"
+ }
+ }
+ },
+ "schedulerobjectsQueueReport": {
+ "type": "object",
+ "properties": {
+ "report": {
+ "type": "string"
+ }
+ }
+ },
+ "schedulerobjectsSchedulingReport": {
+ "type": "object",
+ "properties": {
+ "report": {
+ "type": "string"
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/internal/scheduler/schedulerobjects/reporting.pb.go b/pkg/api/schedulerobjects/scheduler_reporting.pb.go
similarity index 82%
rename from internal/scheduler/schedulerobjects/reporting.pb.go
rename to pkg/api/schedulerobjects/scheduler_reporting.pb.go
index 18239dcae8c..7b4c11a6bb1 100644
--- a/internal/scheduler/schedulerobjects/reporting.pb.go
+++ b/pkg/api/schedulerobjects/scheduler_reporting.pb.go
@@ -1,5 +1,5 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
-// source: internal/scheduler/schedulerobjects/reporting.proto
+// source: pkg/api/schedulerobjects/scheduler_reporting.proto
package schedulerobjects
@@ -11,6 +11,7 @@ import (
math_bits "math/bits"
proto "github.com/gogo/protobuf/proto"
+ _ "google.golang.org/genproto/googleapis/api/annotations"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
@@ -27,6 +28,7 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
+// Deprecated. This will be removed in a future release. Please use GetQueueReport instead.
type MostRecentForQueue struct {
QueueName string `protobuf:"bytes,1,opt,name=queue_name,json=queueName,proto3" json:"queueName,omitempty"`
}
@@ -35,7 +37,7 @@ func (m *MostRecentForQueue) Reset() { *m = MostRecentForQueue{} }
func (m *MostRecentForQueue) String() string { return proto.CompactTextString(m) }
func (*MostRecentForQueue) ProtoMessage() {}
func (*MostRecentForQueue) Descriptor() ([]byte, []int) {
- return fileDescriptor_131a439a3ff6540b, []int{0}
+ return fileDescriptor_c6edb75717835892, []int{0}
}
func (m *MostRecentForQueue) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -71,6 +73,7 @@ func (m *MostRecentForQueue) GetQueueName() string {
return ""
}
+// Deprecated. This will be removed in a future release. Please use GetJobReport instead.
type MostRecentForJob struct {
JobId string `protobuf:"bytes,1,opt,name=job_id,json=jobId,proto3" json:"jobId,omitempty"`
}
@@ -79,7 +82,7 @@ func (m *MostRecentForJob) Reset() { *m = MostRecentForJob{} }
func (m *MostRecentForJob) String() string { return proto.CompactTextString(m) }
func (*MostRecentForJob) ProtoMessage() {}
func (*MostRecentForJob) Descriptor() ([]byte, []int) {
- return fileDescriptor_131a439a3ff6540b, []int{1}
+ return fileDescriptor_c6edb75717835892, []int{1}
}
func (m *MostRecentForJob) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -128,7 +131,7 @@ func (m *SchedulingReportRequest) Reset() { *m = SchedulingReportRequest
func (m *SchedulingReportRequest) String() string { return proto.CompactTextString(m) }
func (*SchedulingReportRequest) ProtoMessage() {}
func (*SchedulingReportRequest) Descriptor() ([]byte, []int) {
- return fileDescriptor_131a439a3ff6540b, []int{2}
+ return fileDescriptor_c6edb75717835892, []int{2}
}
func (m *SchedulingReportRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -217,7 +220,7 @@ func (m *SchedulingReport) Reset() { *m = SchedulingReport{} }
func (m *SchedulingReport) String() string { return proto.CompactTextString(m) }
func (*SchedulingReport) ProtoMessage() {}
func (*SchedulingReport) Descriptor() ([]byte, []int) {
- return fileDescriptor_131a439a3ff6540b, []int{3}
+ return fileDescriptor_c6edb75717835892, []int{3}
}
func (m *SchedulingReport) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -262,7 +265,7 @@ func (m *QueueReportRequest) Reset() { *m = QueueReportRequest{} }
func (m *QueueReportRequest) String() string { return proto.CompactTextString(m) }
func (*QueueReportRequest) ProtoMessage() {}
func (*QueueReportRequest) Descriptor() ([]byte, []int) {
- return fileDescriptor_131a439a3ff6540b, []int{4}
+ return fileDescriptor_c6edb75717835892, []int{4}
}
func (m *QueueReportRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -313,7 +316,7 @@ func (m *QueueReport) Reset() { *m = QueueReport{} }
func (m *QueueReport) String() string { return proto.CompactTextString(m) }
func (*QueueReport) ProtoMessage() {}
func (*QueueReport) Descriptor() ([]byte, []int) {
- return fileDescriptor_131a439a3ff6540b, []int{5}
+ return fileDescriptor_c6edb75717835892, []int{5}
}
func (m *QueueReport) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -357,7 +360,7 @@ func (m *JobReportRequest) Reset() { *m = JobReportRequest{} }
func (m *JobReportRequest) String() string { return proto.CompactTextString(m) }
func (*JobReportRequest) ProtoMessage() {}
func (*JobReportRequest) Descriptor() ([]byte, []int) {
- return fileDescriptor_131a439a3ff6540b, []int{6}
+ return fileDescriptor_c6edb75717835892, []int{6}
}
func (m *JobReportRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -401,7 +404,7 @@ func (m *JobReport) Reset() { *m = JobReport{} }
func (m *JobReport) String() string { return proto.CompactTextString(m) }
func (*JobReport) ProtoMessage() {}
func (*JobReport) Descriptor() ([]byte, []int) {
- return fileDescriptor_131a439a3ff6540b, []int{7}
+ return fileDescriptor_c6edb75717835892, []int{7}
}
func (m *JobReport) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -449,43 +452,48 @@ func init() {
}
func init() {
- proto.RegisterFile("internal/scheduler/schedulerobjects/reporting.proto", fileDescriptor_131a439a3ff6540b)
-}
-
-var fileDescriptor_131a439a3ff6540b = []byte{
- // 512 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x5f, 0x6b, 0xd3, 0x50,
- 0x14, 0x6f, 0x2a, 0x2b, 0xf6, 0x4c, 0x34, 0xdc, 0x2a, 0x1b, 0x55, 0x93, 0x12, 0x7c, 0x98, 0x32,
- 0x5a, 0xd8, 0x50, 0x10, 0x61, 0x48, 0x04, 0xab, 0xc5, 0x3f, 0x98, 0xe1, 0x8b, 0x20, 0x21, 0xb7,
- 0x39, 0xeb, 0x52, 0x9a, 0x9c, 0xee, 0xe6, 0x46, 0x18, 0x3e, 0xfa, 0x05, 0xfc, 0x0c, 0x7e, 0x1a,
- 0x1f, 0x7c, 0xd8, 0xa3, 0x4f, 0x41, 0xda, 0xb7, 0x7c, 0x0a, 0x59, 0xda, 0xb5, 0xf9, 0x33, 0xb7,
- 0xd5, 0xb7, 0xe4, 0x77, 0xcf, 0x3d, 0xbf, 0xdf, 0x39, 0xbf, 0x73, 0x2e, 0xec, 0x7a, 0x81, 0x44,
- 0x11, 0x38, 0xa3, 0x4e, 0xd8, 0x3f, 0x44, 0x37, 0x1a, 0xa1, 0x58, 0x7e, 0x11, 0x1f, 0x62, 0x5f,
- 0x86, 0x1d, 0x81, 0x63, 0x12, 0xd2, 0x0b, 0x06, 0xed, 0xb1, 0x20, 0x49, 0x4c, 0x2d, 0x46, 0x18,
- 0x6f, 0x80, 0xbd, 0xa5, 0x50, 0x5a, 0xd8, 0xc7, 0x40, 0xbe, 0x24, 0xf1, 0x21, 0xc2, 0x08, 0xd9,
- 0x13, 0x80, 0xa3, 0xd3, 0x0f, 0x3b, 0x70, 0x7c, 0xdc, 0x54, 0x5a, 0xca, 0x56, 0xdd, 0xdc, 0x48,
- 0x62, 0xbd, 0x91, 0xa2, 0xef, 0x1c, 0x1f, 0xb7, 0xc9, 0xf7, 0x24, 0xfa, 0x63, 0x79, 0x6c, 0xd5,
- 0x17, 0xa0, 0xb1, 0x07, 0x6a, 0x2e, 0x5b, 0x8f, 0x38, 0x7b, 0x04, 0xb5, 0x21, 0x71, 0xdb, 0x73,
- 0xe7, 0x79, 0x1a, 0x49, 0xac, 0xdf, 0x1a, 0x12, 0x7f, 0xed, 0x66, 0x72, 0xac, 0xa5, 0x80, 0xf1,
- 0xab, 0x0a, 0x1b, 0xfb, 0x33, 0x89, 0x5e, 0x30, 0xb0, 0x52, 0xf5, 0x16, 0x1e, 0x45, 0x18, 0x4a,
- 0xf6, 0x15, 0xee, 0xf8, 0x14, 0x4a, 0x5b, 0xa4, 0xc9, 0xed, 0x03, 0x12, 0x76, 0x4a, 0x9c, 0xa6,
- 0x5d, 0xdf, 0x79, 0xd0, 0x2e, 0xd6, 0xd6, 0x2e, 0x17, 0x66, 0xb6, 0x92, 0x58, 0xbf, 0xe7, 0x97,
- 0xf0, 0xa5, 0x92, 0x57, 0x15, 0x8b, 0x95, 0xcf, 0x59, 0x08, 0x8d, 0x22, 0xf9, 0x90, 0xf8, 0x66,
- 0x35, 0xa5, 0x36, 0x2e, 0xa1, 0xee, 0x11, 0x37, 0xb5, 0x24, 0xd6, 0x9b, 0x7e, 0x01, 0xcd, 0xd1,
- 0xaa, 0xc5, 0x53, 0xf6, 0x18, 0xea, 0x5f, 0x50, 0x70, 0x0a, 0x3d, 0x79, 0xbc, 0x79, 0xad, 0xa5,
- 0x6c, 0xad, 0xcd, 0x4c, 0x58, 0x80, 0x59, 0x13, 0x16, 0xa0, 0x79, 0x1d, 0x6a, 0x07, 0xde, 0x48,
- 0xa2, 0x30, 0x9e, 0x83, 0x5a, 0xec, 0x26, 0xdb, 0x86, 0xda, 0x6c, 0x2a, 0xe6, 0x76, 0xdc, 0x4e,
- 0x62, 0x5d, 0x9d, 0x21, 0x99, 0x74, 0xf3, 0x18, 0xe3, 0x9b, 0x02, 0x2c, 0xed, 0x40, 0xde, 0x8b,
- 0xff, 0x9c, 0x8f, 0x7c, 0x45, 0xd5, 0xab, 0x56, 0x64, 0x3c, 0x83, 0xf5, 0x8c, 0x88, 0x15, 0x4b,
- 0xd8, 0x03, 0xb5, 0x47, 0x3c, 0xaf, 0x7f, 0x95, 0x99, 0x7c, 0x0a, 0xf5, 0xc5, 0xfd, 0xd5, 0xa8,
- 0x77, 0x7e, 0x54, 0x81, 0xed, 0x9f, 0x8d, 0x86, 0x75, 0xb6, 0x8b, 0xcc, 0x85, 0x46, 0x17, 0x65,
- 0xc9, 0x99, 0x87, 0xe5, 0x31, 0xfa, 0xc7, 0x2e, 0x34, 0x8d, 0xcb, 0x43, 0xd9, 0x47, 0xb8, 0xd9,
- 0x45, 0x99, 0xed, 0xdb, 0x39, 0x2b, 0x52, 0xf6, 0xb6, 0x79, 0xff, 0xc2, 0x28, 0xf6, 0x1e, 0x6e,
- 0x74, 0x51, 0x2e, 0x3b, 0x72, 0x8e, 0x94, 0x62, 0xbb, 0x9b, 0x77, 0x2f, 0x88, 0x31, 0x3f, 0xff,
- 0x9c, 0x68, 0xca, 0xc9, 0x44, 0x53, 0xfe, 0x4c, 0x34, 0xe5, 0xfb, 0x54, 0xab, 0x9c, 0x4c, 0xb5,
- 0xca, 0xef, 0xa9, 0x56, 0xf9, 0xf4, 0x62, 0xe0, 0xc9, 0xc3, 0x88, 0xb7, 0xfb, 0xe4, 0x77, 0x1c,
- 0xe1, 0x3b, 0xae, 0x33, 0x16, 0x74, 0x7a, 0x7d, 0xfe, 0xd7, 0xb9, 0xc2, 0x13, 0xc8, 0x6b, 0xe9,
- 0xcb, 0xb7, 0xfb, 0x37, 0x00, 0x00, 0xff, 0xff, 0x09, 0x81, 0x96, 0x21, 0x30, 0x05, 0x00, 0x00,
+ proto.RegisterFile("pkg/api/schedulerobjects/scheduler_reporting.proto", fileDescriptor_c6edb75717835892)
+}
+
+var fileDescriptor_c6edb75717835892 = []byte{
+ // 589 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xcf, 0x6b, 0x13, 0x41,
+ 0x14, 0xce, 0xa6, 0x34, 0x98, 0xa9, 0xe8, 0x32, 0xb1, 0x24, 0xc4, 0x76, 0x13, 0x47, 0xc1, 0x46,
+ 0x6a, 0x16, 0x23, 0x8a, 0x22, 0x14, 0xc9, 0xc1, 0x68, 0x50, 0xc1, 0xed, 0xcd, 0xcb, 0xb2, 0x93,
+ 0x4c, 0xb7, 0x1b, 0xb3, 0xfb, 0xb6, 0xb3, 0x93, 0x42, 0x0d, 0x5e, 0x2c, 0x78, 0x16, 0xfc, 0x03,
+ 0xfc, 0x77, 0x3c, 0x78, 0x28, 0x78, 0xf1, 0x14, 0x24, 0xf1, 0x94, 0xbf, 0x42, 0x32, 0x9b, 0x9f,
+ 0xbb, 0xfd, 0x61, 0x7a, 0x4b, 0xbe, 0x79, 0xfb, 0xbe, 0xef, 0x9b, 0xef, 0xcd, 0x43, 0x15, 0xff,
+ 0x83, 0xad, 0x5b, 0xbe, 0xa3, 0x07, 0x8d, 0x7d, 0xd6, 0xec, 0xb4, 0x19, 0x07, 0xda, 0x62, 0x0d,
+ 0x11, 0xcc, 0x00, 0x93, 0x33, 0x1f, 0xb8, 0x70, 0x3c, 0xbb, 0xec, 0x73, 0x10, 0x80, 0xd5, 0x68,
+ 0x6d, 0x7e, 0xc3, 0x06, 0xb0, 0xdb, 0x4c, 0x36, 0xb2, 0x3c, 0x0f, 0x84, 0x25, 0x1c, 0xf0, 0x82,
+ 0xb0, 0x9e, 0xbc, 0x46, 0xf8, 0x0d, 0x04, 0xc2, 0x60, 0x0d, 0xe6, 0x89, 0x17, 0xc0, 0xdf, 0x75,
+ 0x58, 0x87, 0xe1, 0xc7, 0x08, 0x1d, 0x8c, 0x7e, 0x98, 0x9e, 0xe5, 0xb2, 0x9c, 0x52, 0x54, 0xb6,
+ 0xd2, 0xd5, 0xec, 0xb0, 0x57, 0xc8, 0x48, 0xf4, 0xad, 0xe5, 0xb2, 0x6d, 0x70, 0x1d, 0xc1, 0x5c,
+ 0x5f, 0x1c, 0x19, 0xe9, 0x29, 0x48, 0x76, 0x90, 0xba, 0xd0, 0xad, 0x0e, 0x14, 0xdf, 0x43, 0xa9,
+ 0x16, 0x50, 0xd3, 0x69, 0x8e, 0xfb, 0x64, 0x86, 0xbd, 0xc2, 0xf5, 0x16, 0xd0, 0x57, 0xcd, 0xb9,
+ 0x1e, 0xab, 0x12, 0x20, 0x3f, 0x93, 0x28, 0xbb, 0x1b, 0x1a, 0x70, 0x3c, 0xdb, 0x90, 0xde, 0x0c,
+ 0x76, 0xd0, 0x61, 0x81, 0xc0, 0x5d, 0xb4, 0xee, 0x42, 0x20, 0x4c, 0x2e, 0x9b, 0x9b, 0x7b, 0xc0,
+ 0x4d, 0x49, 0x2c, 0xdb, 0xae, 0x55, 0xee, 0x94, 0xa3, 0xce, 0xcb, 0x71, 0x63, 0xd5, 0xe2, 0xb0,
+ 0x57, 0xd8, 0x70, 0x63, 0xf8, 0x4c, 0xc9, 0xcb, 0x84, 0x81, 0xe3, 0xe7, 0x38, 0x40, 0x99, 0x28,
+ 0x79, 0x0b, 0x68, 0x2e, 0x29, 0xa9, 0xc9, 0x05, 0xd4, 0x75, 0xa0, 0x55, 0x6d, 0xd8, 0x2b, 0xe4,
+ 0xdd, 0x08, 0xba, 0x40, 0xab, 0x46, 0x4f, 0xf1, 0x23, 0x94, 0x3e, 0x64, 0x9c, 0x42, 0xe0, 0x88,
+ 0xa3, 0xdc, 0x4a, 0x51, 0xd9, 0x5a, 0x0d, 0x43, 0x98, 0x82, 0xf3, 0x21, 0x4c, 0xc1, 0xea, 0x15,
+ 0x94, 0xda, 0x73, 0xda, 0x82, 0x71, 0xf2, 0x1c, 0xa9, 0xd1, 0xdb, 0xc4, 0xdb, 0x28, 0x15, 0xce,
+ 0xcc, 0x38, 0x8e, 0x1b, 0xc3, 0x5e, 0x41, 0x0d, 0x91, 0xb9, 0x76, 0xe3, 0x1a, 0x72, 0xac, 0x20,
+ 0x2c, 0x6f, 0x60, 0x31, 0x8b, 0x4b, 0xce, 0xc7, 0xa2, 0xa3, 0xe4, 0xff, 0x3a, 0x22, 0xcf, 0xd0,
+ 0xda, 0x9c, 0x88, 0x25, 0x2d, 0xec, 0x20, 0xb5, 0x0e, 0x74, 0x51, 0xff, 0x32, 0x33, 0xf9, 0x14,
+ 0xa5, 0xa7, 0xdf, 0x2f, 0x47, 0x5d, 0xf9, 0xbe, 0x82, 0xf0, 0xee, 0x64, 0x34, 0x8c, 0xc9, 0x4b,
+ 0xc5, 0xc7, 0x0a, 0xca, 0xd4, 0x98, 0x88, 0x45, 0x53, 0x8a, 0xcf, 0xd1, 0x19, 0x8f, 0x21, 0x4f,
+ 0x2e, 0x2e, 0x25, 0x9b, 0x9f, 0x7f, 0xfd, 0xfd, 0x96, 0xcc, 0xe2, 0x75, 0xfd, 0xf0, 0xc1, 0x64,
+ 0x63, 0x38, 0x9e, 0x7d, 0x3f, 0x14, 0x87, 0xbf, 0x28, 0xe8, 0x5a, 0x8d, 0x89, 0xf9, 0x8b, 0x3d,
+ 0xe5, 0x0d, 0xc5, 0xc3, 0xcf, 0x6f, 0x9e, 0x5b, 0x45, 0x74, 0x49, 0x5b, 0xc2, 0x77, 0x47, 0xb4,
+ 0x32, 0x7a, 0xbd, 0x3b, 0x1b, 0x96, 0x4f, 0xb3, 0xdd, 0x35, 0x11, 0xf2, 0x11, 0x5d, 0xad, 0x31,
+ 0x31, 0xbb, 0xe3, 0x53, 0xbc, 0x45, 0x03, 0xcc, 0xdf, 0x3c, 0xa7, 0x86, 0x94, 0xa4, 0x82, 0xdb,
+ 0xf8, 0xd6, 0x48, 0x41, 0x0b, 0xa8, 0xde, 0x0d, 0xc3, 0x8e, 0x73, 0x57, 0x8d, 0x1f, 0x7d, 0x4d,
+ 0x39, 0xe9, 0x6b, 0xca, 0x9f, 0xbe, 0xa6, 0x7c, 0x1d, 0x68, 0x89, 0x93, 0x81, 0x96, 0xf8, 0x3d,
+ 0xd0, 0x12, 0xef, 0x9f, 0xd8, 0x8e, 0xd8, 0xef, 0xd0, 0x72, 0x03, 0x5c, 0xdd, 0xe2, 0xae, 0xd5,
+ 0xb4, 0x7c, 0x0e, 0x23, 0xa6, 0xf1, 0x3f, 0xfd, 0xac, 0xe5, 0x4c, 0x53, 0x72, 0xb3, 0x3e, 0xfc,
+ 0x17, 0x00, 0x00, 0xff, 0xff, 0x2f, 0xcc, 0xde, 0x68, 0xbf, 0x05, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -643,7 +651,7 @@ var _SchedulerReporting_serviceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{},
- Metadata: "internal/scheduler/schedulerobjects/reporting.proto",
+ Metadata: "pkg/api/schedulerobjects/scheduler_reporting.proto",
}
func (m *MostRecentForQueue) Marshal() (dAtA []byte, err error) {
@@ -669,7 +677,7 @@ func (m *MostRecentForQueue) MarshalToSizedBuffer(dAtA []byte) (int, error) {
if len(m.QueueName) > 0 {
i -= len(m.QueueName)
copy(dAtA[i:], m.QueueName)
- i = encodeVarintReporting(dAtA, i, uint64(len(m.QueueName)))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(len(m.QueueName)))
i--
dAtA[i] = 0xa
}
@@ -699,7 +707,7 @@ func (m *MostRecentForJob) MarshalToSizedBuffer(dAtA []byte) (int, error) {
if len(m.JobId) > 0 {
i -= len(m.JobId)
copy(dAtA[i:], m.JobId)
- i = encodeVarintReporting(dAtA, i, uint64(len(m.JobId)))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(len(m.JobId)))
i--
dAtA[i] = 0xa
}
@@ -727,7 +735,7 @@ func (m *SchedulingReportRequest) MarshalToSizedBuffer(dAtA []byte) (int, error)
var l int
_ = l
if m.Verbosity != 0 {
- i = encodeVarintReporting(dAtA, i, uint64(m.Verbosity))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(m.Verbosity))
i--
dAtA[i] = 0x18
}
@@ -757,7 +765,7 @@ func (m *SchedulingReportRequest_MostRecentForQueue) MarshalToSizedBuffer(dAtA [
return 0, err
}
i -= size
- i = encodeVarintReporting(dAtA, i, uint64(size))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
@@ -778,7 +786,7 @@ func (m *SchedulingReportRequest_MostRecentForJob) MarshalToSizedBuffer(dAtA []b
return 0, err
}
i -= size
- i = encodeVarintReporting(dAtA, i, uint64(size))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
@@ -808,7 +816,7 @@ func (m *SchedulingReport) MarshalToSizedBuffer(dAtA []byte) (int, error) {
if len(m.Report) > 0 {
i -= len(m.Report)
copy(dAtA[i:], m.Report)
- i = encodeVarintReporting(dAtA, i, uint64(len(m.Report)))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(len(m.Report)))
i--
dAtA[i] = 0xa
}
@@ -836,14 +844,14 @@ func (m *QueueReportRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
var l int
_ = l
if m.Verbosity != 0 {
- i = encodeVarintReporting(dAtA, i, uint64(m.Verbosity))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(m.Verbosity))
i--
dAtA[i] = 0x10
}
if len(m.QueueName) > 0 {
i -= len(m.QueueName)
copy(dAtA[i:], m.QueueName)
- i = encodeVarintReporting(dAtA, i, uint64(len(m.QueueName)))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(len(m.QueueName)))
i--
dAtA[i] = 0xa
}
@@ -873,7 +881,7 @@ func (m *QueueReport) MarshalToSizedBuffer(dAtA []byte) (int, error) {
if len(m.Report) > 0 {
i -= len(m.Report)
copy(dAtA[i:], m.Report)
- i = encodeVarintReporting(dAtA, i, uint64(len(m.Report)))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(len(m.Report)))
i--
dAtA[i] = 0xa
}
@@ -903,7 +911,7 @@ func (m *JobReportRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
if len(m.JobId) > 0 {
i -= len(m.JobId)
copy(dAtA[i:], m.JobId)
- i = encodeVarintReporting(dAtA, i, uint64(len(m.JobId)))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(len(m.JobId)))
i--
dAtA[i] = 0xa
}
@@ -933,15 +941,15 @@ func (m *JobReport) MarshalToSizedBuffer(dAtA []byte) (int, error) {
if len(m.Report) > 0 {
i -= len(m.Report)
copy(dAtA[i:], m.Report)
- i = encodeVarintReporting(dAtA, i, uint64(len(m.Report)))
+ i = encodeVarintSchedulerReporting(dAtA, i, uint64(len(m.Report)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
-func encodeVarintReporting(dAtA []byte, offset int, v uint64) int {
- offset -= sovReporting(v)
+func encodeVarintSchedulerReporting(dAtA []byte, offset int, v uint64) int {
+ offset -= sovSchedulerReporting(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
@@ -959,7 +967,7 @@ func (m *MostRecentForQueue) Size() (n int) {
_ = l
l = len(m.QueueName)
if l > 0 {
- n += 1 + l + sovReporting(uint64(l))
+ n += 1 + l + sovSchedulerReporting(uint64(l))
}
return n
}
@@ -972,7 +980,7 @@ func (m *MostRecentForJob) Size() (n int) {
_ = l
l = len(m.JobId)
if l > 0 {
- n += 1 + l + sovReporting(uint64(l))
+ n += 1 + l + sovSchedulerReporting(uint64(l))
}
return n
}
@@ -987,7 +995,7 @@ func (m *SchedulingReportRequest) Size() (n int) {
n += m.Filter.Size()
}
if m.Verbosity != 0 {
- n += 1 + sovReporting(uint64(m.Verbosity))
+ n += 1 + sovSchedulerReporting(uint64(m.Verbosity))
}
return n
}
@@ -1000,7 +1008,7 @@ func (m *SchedulingReportRequest_MostRecentForQueue) Size() (n int) {
_ = l
if m.MostRecentForQueue != nil {
l = m.MostRecentForQueue.Size()
- n += 1 + l + sovReporting(uint64(l))
+ n += 1 + l + sovSchedulerReporting(uint64(l))
}
return n
}
@@ -1012,7 +1020,7 @@ func (m *SchedulingReportRequest_MostRecentForJob) Size() (n int) {
_ = l
if m.MostRecentForJob != nil {
l = m.MostRecentForJob.Size()
- n += 1 + l + sovReporting(uint64(l))
+ n += 1 + l + sovSchedulerReporting(uint64(l))
}
return n
}
@@ -1024,7 +1032,7 @@ func (m *SchedulingReport) Size() (n int) {
_ = l
l = len(m.Report)
if l > 0 {
- n += 1 + l + sovReporting(uint64(l))
+ n += 1 + l + sovSchedulerReporting(uint64(l))
}
return n
}
@@ -1037,10 +1045,10 @@ func (m *QueueReportRequest) Size() (n int) {
_ = l
l = len(m.QueueName)
if l > 0 {
- n += 1 + l + sovReporting(uint64(l))
+ n += 1 + l + sovSchedulerReporting(uint64(l))
}
if m.Verbosity != 0 {
- n += 1 + sovReporting(uint64(m.Verbosity))
+ n += 1 + sovSchedulerReporting(uint64(m.Verbosity))
}
return n
}
@@ -1053,7 +1061,7 @@ func (m *QueueReport) Size() (n int) {
_ = l
l = len(m.Report)
if l > 0 {
- n += 1 + l + sovReporting(uint64(l))
+ n += 1 + l + sovSchedulerReporting(uint64(l))
}
return n
}
@@ -1066,7 +1074,7 @@ func (m *JobReportRequest) Size() (n int) {
_ = l
l = len(m.JobId)
if l > 0 {
- n += 1 + l + sovReporting(uint64(l))
+ n += 1 + l + sovSchedulerReporting(uint64(l))
}
return n
}
@@ -1079,16 +1087,16 @@ func (m *JobReport) Size() (n int) {
_ = l
l = len(m.Report)
if l > 0 {
- n += 1 + l + sovReporting(uint64(l))
+ n += 1 + l + sovSchedulerReporting(uint64(l))
}
return n
}
-func sovReporting(x uint64) (n int) {
+func sovSchedulerReporting(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
-func sozReporting(x uint64) (n int) {
- return sovReporting(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+func sozSchedulerReporting(x uint64) (n int) {
+ return sovSchedulerReporting(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *MostRecentForQueue) Unmarshal(dAtA []byte) error {
l := len(dAtA)
@@ -1098,7 +1106,7 @@ func (m *MostRecentForQueue) Unmarshal(dAtA []byte) error {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1126,7 +1134,7 @@ func (m *MostRecentForQueue) Unmarshal(dAtA []byte) error {
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1140,11 +1148,11 @@ func (m *MostRecentForQueue) Unmarshal(dAtA []byte) error {
}
intStringLen := int(stringLen)
if intStringLen < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if postIndex > l {
return io.ErrUnexpectedEOF
@@ -1153,12 +1161,12 @@ func (m *MostRecentForQueue) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
default:
iNdEx = preIndex
- skippy, err := skipReporting(dAtA[iNdEx:])
+ skippy, err := skipSchedulerReporting(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
@@ -1180,7 +1188,7 @@ func (m *MostRecentForJob) Unmarshal(dAtA []byte) error {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1208,7 +1216,7 @@ func (m *MostRecentForJob) Unmarshal(dAtA []byte) error {
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1222,11 +1230,11 @@ func (m *MostRecentForJob) Unmarshal(dAtA []byte) error {
}
intStringLen := int(stringLen)
if intStringLen < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if postIndex > l {
return io.ErrUnexpectedEOF
@@ -1235,12 +1243,12 @@ func (m *MostRecentForJob) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
default:
iNdEx = preIndex
- skippy, err := skipReporting(dAtA[iNdEx:])
+ skippy, err := skipSchedulerReporting(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
@@ -1262,7 +1270,7 @@ func (m *SchedulingReportRequest) Unmarshal(dAtA []byte) error {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1290,7 +1298,7 @@ func (m *SchedulingReportRequest) Unmarshal(dAtA []byte) error {
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1303,11 +1311,11 @@ func (m *SchedulingReportRequest) Unmarshal(dAtA []byte) error {
}
}
if msglen < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
postIndex := iNdEx + msglen
if postIndex < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if postIndex > l {
return io.ErrUnexpectedEOF
@@ -1325,7 +1333,7 @@ func (m *SchedulingReportRequest) Unmarshal(dAtA []byte) error {
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1338,11 +1346,11 @@ func (m *SchedulingReportRequest) Unmarshal(dAtA []byte) error {
}
}
if msglen < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
postIndex := iNdEx + msglen
if postIndex < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if postIndex > l {
return io.ErrUnexpectedEOF
@@ -1360,7 +1368,7 @@ func (m *SchedulingReportRequest) Unmarshal(dAtA []byte) error {
m.Verbosity = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1374,12 +1382,12 @@ func (m *SchedulingReportRequest) Unmarshal(dAtA []byte) error {
}
default:
iNdEx = preIndex
- skippy, err := skipReporting(dAtA[iNdEx:])
+ skippy, err := skipSchedulerReporting(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
@@ -1401,7 +1409,7 @@ func (m *SchedulingReport) Unmarshal(dAtA []byte) error {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1429,7 +1437,7 @@ func (m *SchedulingReport) Unmarshal(dAtA []byte) error {
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1443,11 +1451,11 @@ func (m *SchedulingReport) Unmarshal(dAtA []byte) error {
}
intStringLen := int(stringLen)
if intStringLen < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if postIndex > l {
return io.ErrUnexpectedEOF
@@ -1456,12 +1464,12 @@ func (m *SchedulingReport) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
default:
iNdEx = preIndex
- skippy, err := skipReporting(dAtA[iNdEx:])
+ skippy, err := skipSchedulerReporting(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
@@ -1483,7 +1491,7 @@ func (m *QueueReportRequest) Unmarshal(dAtA []byte) error {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1511,7 +1519,7 @@ func (m *QueueReportRequest) Unmarshal(dAtA []byte) error {
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1525,11 +1533,11 @@ func (m *QueueReportRequest) Unmarshal(dAtA []byte) error {
}
intStringLen := int(stringLen)
if intStringLen < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if postIndex > l {
return io.ErrUnexpectedEOF
@@ -1543,7 +1551,7 @@ func (m *QueueReportRequest) Unmarshal(dAtA []byte) error {
m.Verbosity = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1557,12 +1565,12 @@ func (m *QueueReportRequest) Unmarshal(dAtA []byte) error {
}
default:
iNdEx = preIndex
- skippy, err := skipReporting(dAtA[iNdEx:])
+ skippy, err := skipSchedulerReporting(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
@@ -1584,7 +1592,7 @@ func (m *QueueReport) Unmarshal(dAtA []byte) error {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1612,7 +1620,7 @@ func (m *QueueReport) Unmarshal(dAtA []byte) error {
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1626,11 +1634,11 @@ func (m *QueueReport) Unmarshal(dAtA []byte) error {
}
intStringLen := int(stringLen)
if intStringLen < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if postIndex > l {
return io.ErrUnexpectedEOF
@@ -1639,12 +1647,12 @@ func (m *QueueReport) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
default:
iNdEx = preIndex
- skippy, err := skipReporting(dAtA[iNdEx:])
+ skippy, err := skipSchedulerReporting(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
@@ -1666,7 +1674,7 @@ func (m *JobReportRequest) Unmarshal(dAtA []byte) error {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1694,7 +1702,7 @@ func (m *JobReportRequest) Unmarshal(dAtA []byte) error {
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1708,11 +1716,11 @@ func (m *JobReportRequest) Unmarshal(dAtA []byte) error {
}
intStringLen := int(stringLen)
if intStringLen < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if postIndex > l {
return io.ErrUnexpectedEOF
@@ -1721,12 +1729,12 @@ func (m *JobReportRequest) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
default:
iNdEx = preIndex
- skippy, err := skipReporting(dAtA[iNdEx:])
+ skippy, err := skipSchedulerReporting(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
@@ -1748,7 +1756,7 @@ func (m *JobReport) Unmarshal(dAtA []byte) error {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1776,7 +1784,7 @@ func (m *JobReport) Unmarshal(dAtA []byte) error {
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return ErrIntOverflowReporting
+ return ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
@@ -1790,11 +1798,11 @@ func (m *JobReport) Unmarshal(dAtA []byte) error {
}
intStringLen := int(stringLen)
if intStringLen < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if postIndex > l {
return io.ErrUnexpectedEOF
@@ -1803,12 +1811,12 @@ func (m *JobReport) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
default:
iNdEx = preIndex
- skippy, err := skipReporting(dAtA[iNdEx:])
+ skippy, err := skipSchedulerReporting(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
- return ErrInvalidLengthReporting
+ return ErrInvalidLengthSchedulerReporting
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
@@ -1822,7 +1830,7 @@ func (m *JobReport) Unmarshal(dAtA []byte) error {
}
return nil
}
-func skipReporting(dAtA []byte) (n int, err error) {
+func skipSchedulerReporting(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
@@ -1830,7 +1838,7 @@ func skipReporting(dAtA []byte) (n int, err error) {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return 0, ErrIntOverflowReporting
+ return 0, ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
@@ -1847,7 +1855,7 @@ func skipReporting(dAtA []byte) (n int, err error) {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return 0, ErrIntOverflowReporting
+ return 0, ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
@@ -1863,7 +1871,7 @@ func skipReporting(dAtA []byte) (n int, err error) {
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
- return 0, ErrIntOverflowReporting
+ return 0, ErrIntOverflowSchedulerReporting
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
@@ -1876,14 +1884,14 @@ func skipReporting(dAtA []byte) (n int, err error) {
}
}
if length < 0 {
- return 0, ErrInvalidLengthReporting
+ return 0, ErrInvalidLengthSchedulerReporting
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
- return 0, ErrUnexpectedEndOfGroupReporting
+ return 0, ErrUnexpectedEndOfGroupSchedulerReporting
}
depth--
case 5:
@@ -1892,7 +1900,7 @@ func skipReporting(dAtA []byte) (n int, err error) {
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
- return 0, ErrInvalidLengthReporting
+ return 0, ErrInvalidLengthSchedulerReporting
}
if depth == 0 {
return iNdEx, nil
@@ -1902,7 +1910,7 @@ func skipReporting(dAtA []byte) (n int, err error) {
}
var (
- ErrInvalidLengthReporting = fmt.Errorf("proto: negative length found during unmarshaling")
- ErrIntOverflowReporting = fmt.Errorf("proto: integer overflow")
- ErrUnexpectedEndOfGroupReporting = fmt.Errorf("proto: unexpected end of group")
+ ErrInvalidLengthSchedulerReporting = fmt.Errorf("proto: negative length found during unmarshaling")
+ ErrIntOverflowSchedulerReporting = fmt.Errorf("proto: integer overflow")
+ ErrUnexpectedEndOfGroupSchedulerReporting = fmt.Errorf("proto: unexpected end of group")
)
diff --git a/pkg/api/schedulerobjects/scheduler_reporting.pb.gw.go b/pkg/api/schedulerobjects/scheduler_reporting.pb.gw.go
new file mode 100644
index 00000000000..724a16eadf6
--- /dev/null
+++ b/pkg/api/schedulerobjects/scheduler_reporting.pb.gw.go
@@ -0,0 +1,391 @@
+// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
+// source: pkg/api/schedulerobjects/scheduler_reporting.proto
+
+/*
+Package schedulerobjects is a reverse proxy.
+
+It translates gRPC into RESTful JSON APIs.
+*/
+package schedulerobjects
+
+import (
+ "context"
+ "io"
+ "net/http"
+
+ "github.com/golang/protobuf/descriptor"
+ "github.com/golang/protobuf/proto"
+ "github.com/grpc-ecosystem/grpc-gateway/runtime"
+ "github.com/grpc-ecosystem/grpc-gateway/utilities"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+// Suppress "imported and not used" errors
+var _ codes.Code
+var _ io.Reader
+var _ status.Status
+var _ = runtime.String
+var _ = utilities.NewDoubleArray
+var _ = descriptor.ForMessage
+var _ = metadata.Join
+
+var (
+ filter_SchedulerReporting_GetSchedulingReport_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
+)
+
+func request_SchedulerReporting_GetSchedulingReport_0(ctx context.Context, marshaler runtime.Marshaler, client SchedulerReportingClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
+ var protoReq SchedulingReportRequest
+ var metadata runtime.ServerMetadata
+
+ if err := req.ParseForm(); err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
+ }
+ if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_SchedulerReporting_GetSchedulingReport_0); err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
+ }
+
+ msg, err := client.GetSchedulingReport(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
+ return msg, metadata, err
+
+}
+
+func local_request_SchedulerReporting_GetSchedulingReport_0(ctx context.Context, marshaler runtime.Marshaler, server SchedulerReportingServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
+ var protoReq SchedulingReportRequest
+ var metadata runtime.ServerMetadata
+
+ if err := req.ParseForm(); err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
+ }
+ if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_SchedulerReporting_GetSchedulingReport_0); err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
+ }
+
+ msg, err := server.GetSchedulingReport(ctx, &protoReq)
+ return msg, metadata, err
+
+}
+
+var (
+ filter_SchedulerReporting_GetQueueReport_0 = &utilities.DoubleArray{Encoding: map[string]int{"queue_name": 0}, Base: []int{1, 1, 0}, Check: []int{0, 1, 2}}
+)
+
+func request_SchedulerReporting_GetQueueReport_0(ctx context.Context, marshaler runtime.Marshaler, client SchedulerReportingClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
+ var protoReq QueueReportRequest
+ var metadata runtime.ServerMetadata
+
+ var (
+ val string
+ ok bool
+ err error
+ _ = err
+ )
+
+ val, ok = pathParams["queue_name"]
+ if !ok {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "queue_name")
+ }
+
+ protoReq.QueueName, err = runtime.String(val)
+
+ if err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "queue_name", err)
+ }
+
+ if err := req.ParseForm(); err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
+ }
+ if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_SchedulerReporting_GetQueueReport_0); err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
+ }
+
+ msg, err := client.GetQueueReport(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
+ return msg, metadata, err
+
+}
+
+func local_request_SchedulerReporting_GetQueueReport_0(ctx context.Context, marshaler runtime.Marshaler, server SchedulerReportingServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
+ var protoReq QueueReportRequest
+ var metadata runtime.ServerMetadata
+
+ var (
+ val string
+ ok bool
+ err error
+ _ = err
+ )
+
+ val, ok = pathParams["queue_name"]
+ if !ok {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "queue_name")
+ }
+
+ protoReq.QueueName, err = runtime.String(val)
+
+ if err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "queue_name", err)
+ }
+
+ if err := req.ParseForm(); err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
+ }
+ if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_SchedulerReporting_GetQueueReport_0); err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
+ }
+
+ msg, err := server.GetQueueReport(ctx, &protoReq)
+ return msg, metadata, err
+
+}
+
+func request_SchedulerReporting_GetJobReport_0(ctx context.Context, marshaler runtime.Marshaler, client SchedulerReportingClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
+ var protoReq JobReportRequest
+ var metadata runtime.ServerMetadata
+
+ var (
+ val string
+ ok bool
+ err error
+ _ = err
+ )
+
+ val, ok = pathParams["job_id"]
+ if !ok {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "job_id")
+ }
+
+ protoReq.JobId, err = runtime.String(val)
+
+ if err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "job_id", err)
+ }
+
+ msg, err := client.GetJobReport(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
+ return msg, metadata, err
+
+}
+
+func local_request_SchedulerReporting_GetJobReport_0(ctx context.Context, marshaler runtime.Marshaler, server SchedulerReportingServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
+ var protoReq JobReportRequest
+ var metadata runtime.ServerMetadata
+
+ var (
+ val string
+ ok bool
+ err error
+ _ = err
+ )
+
+ val, ok = pathParams["job_id"]
+ if !ok {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "job_id")
+ }
+
+ protoReq.JobId, err = runtime.String(val)
+
+ if err != nil {
+ return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "job_id", err)
+ }
+
+ msg, err := server.GetJobReport(ctx, &protoReq)
+ return msg, metadata, err
+
+}
+
+// RegisterSchedulerReportingHandlerServer registers the http handlers for service SchedulerReporting to "mux".
+// UnaryRPC :call SchedulerReportingServer directly.
+// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
+// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterSchedulerReportingHandlerFromEndpoint instead.
+func RegisterSchedulerReportingHandlerServer(ctx context.Context, mux *runtime.ServeMux, server SchedulerReportingServer) error {
+
+ mux.Handle("GET", pattern_SchedulerReporting_GetSchedulingReport_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
+ ctx, cancel := context.WithCancel(req.Context())
+ defer cancel()
+ var stream runtime.ServerTransportStream
+ ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
+ inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
+ rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+ resp, md, err := local_request_SchedulerReporting_GetSchedulingReport_0(rctx, inboundMarshaler, server, req, pathParams)
+ md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
+ ctx = runtime.NewServerMetadataContext(ctx, md)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+
+ forward_SchedulerReporting_GetSchedulingReport_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
+
+ })
+
+ mux.Handle("GET", pattern_SchedulerReporting_GetQueueReport_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
+ ctx, cancel := context.WithCancel(req.Context())
+ defer cancel()
+ var stream runtime.ServerTransportStream
+ ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
+ inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
+ rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+ resp, md, err := local_request_SchedulerReporting_GetQueueReport_0(rctx, inboundMarshaler, server, req, pathParams)
+ md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
+ ctx = runtime.NewServerMetadataContext(ctx, md)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+
+ forward_SchedulerReporting_GetQueueReport_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
+
+ })
+
+ mux.Handle("GET", pattern_SchedulerReporting_GetJobReport_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
+ ctx, cancel := context.WithCancel(req.Context())
+ defer cancel()
+ var stream runtime.ServerTransportStream
+ ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
+ inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
+ rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+ resp, md, err := local_request_SchedulerReporting_GetJobReport_0(rctx, inboundMarshaler, server, req, pathParams)
+ md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
+ ctx = runtime.NewServerMetadataContext(ctx, md)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+
+ forward_SchedulerReporting_GetJobReport_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
+
+ })
+
+ return nil
+}
+
+// RegisterSchedulerReportingHandlerFromEndpoint is same as RegisterSchedulerReportingHandler but
+// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
+func RegisterSchedulerReportingHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
+ conn, err := grpc.Dial(endpoint, opts...)
+ if err != nil {
+ return err
+ }
+ defer func() {
+ if err != nil {
+ if cerr := conn.Close(); cerr != nil {
+ grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
+ }
+ return
+ }
+ go func() {
+ <-ctx.Done()
+ if cerr := conn.Close(); cerr != nil {
+ grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
+ }
+ }()
+ }()
+
+ return RegisterSchedulerReportingHandler(ctx, mux, conn)
+}
+
+// RegisterSchedulerReportingHandler registers the http handlers for service SchedulerReporting to "mux".
+// The handlers forward requests to the grpc endpoint over "conn".
+func RegisterSchedulerReportingHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
+ return RegisterSchedulerReportingHandlerClient(ctx, mux, NewSchedulerReportingClient(conn))
+}
+
+// RegisterSchedulerReportingHandlerClient registers the http handlers for service SchedulerReporting
+// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "SchedulerReportingClient".
+// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "SchedulerReportingClient"
+// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in
+// "SchedulerReportingClient" to call the correct interceptors.
+func RegisterSchedulerReportingHandlerClient(ctx context.Context, mux *runtime.ServeMux, client SchedulerReportingClient) error {
+
+ mux.Handle("GET", pattern_SchedulerReporting_GetSchedulingReport_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
+ ctx, cancel := context.WithCancel(req.Context())
+ defer cancel()
+ inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
+ rctx, err := runtime.AnnotateContext(ctx, mux, req)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+ resp, md, err := request_SchedulerReporting_GetSchedulingReport_0(rctx, inboundMarshaler, client, req, pathParams)
+ ctx = runtime.NewServerMetadataContext(ctx, md)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+
+ forward_SchedulerReporting_GetSchedulingReport_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
+
+ })
+
+ mux.Handle("GET", pattern_SchedulerReporting_GetQueueReport_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
+ ctx, cancel := context.WithCancel(req.Context())
+ defer cancel()
+ inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
+ rctx, err := runtime.AnnotateContext(ctx, mux, req)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+ resp, md, err := request_SchedulerReporting_GetQueueReport_0(rctx, inboundMarshaler, client, req, pathParams)
+ ctx = runtime.NewServerMetadataContext(ctx, md)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+
+ forward_SchedulerReporting_GetQueueReport_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
+
+ })
+
+ mux.Handle("GET", pattern_SchedulerReporting_GetJobReport_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
+ ctx, cancel := context.WithCancel(req.Context())
+ defer cancel()
+ inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
+ rctx, err := runtime.AnnotateContext(ctx, mux, req)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+ resp, md, err := request_SchedulerReporting_GetJobReport_0(rctx, inboundMarshaler, client, req, pathParams)
+ ctx = runtime.NewServerMetadataContext(ctx, md)
+ if err != nil {
+ runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
+ return
+ }
+
+ forward_SchedulerReporting_GetJobReport_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
+
+ })
+
+ return nil
+}
+
+var (
+ pattern_SchedulerReporting_GetSchedulingReport_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "scheduling-report"}, "", runtime.AssumeColonVerbOpt(true)))
+
+ pattern_SchedulerReporting_GetQueueReport_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2, 2, 3}, []string{"v1", "queue", "queue_name", "scheduler-report"}, "", runtime.AssumeColonVerbOpt(true)))
+
+ pattern_SchedulerReporting_GetJobReport_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2, 2, 3}, []string{"v1", "job", "job_id", "scheduler-report"}, "", runtime.AssumeColonVerbOpt(true)))
+)
+
+var (
+ forward_SchedulerReporting_GetSchedulingReport_0 = runtime.ForwardResponseMessage
+
+ forward_SchedulerReporting_GetQueueReport_0 = runtime.ForwardResponseMessage
+
+ forward_SchedulerReporting_GetJobReport_0 = runtime.ForwardResponseMessage
+)
diff --git a/internal/scheduler/schedulerobjects/reporting.proto b/pkg/api/schedulerobjects/scheduler_reporting.proto
similarity index 53%
rename from internal/scheduler/schedulerobjects/reporting.proto
rename to pkg/api/schedulerobjects/scheduler_reporting.proto
index ed61242f8f4..2d1ac28a3d6 100644
--- a/internal/scheduler/schedulerobjects/reporting.proto
+++ b/pkg/api/schedulerobjects/scheduler_reporting.proto
@@ -1,18 +1,26 @@
syntax = 'proto3';
+
package schedulerobjects;
-option go_package = "github.com/armadaproject/armada/internal/scheduler/schedulerobjects";
+option go_package = "github.com/armadaproject/armada/pkg/api/schedulerobjects";
+
+import "google/api/annotations.proto";
+
+// Deprecated. This will be removed in a future release. Please use GetQueueReport instead.
message MostRecentForQueue {
string queue_name = 1;
}
+// Deprecated. This will be removed in a future release. Please use GetJobReport instead.
message MostRecentForJob {
string job_id = 1;
}
message SchedulingReportRequest {
oneof filter {
+ // Deprecated. This will be removed in a future release. Please use GetQueueReport instead.
MostRecentForQueue most_recent_for_queue = 1;
+ // Deprecated. This will be removed in a future release. Please use GetJobReport instead.
MostRecentForJob most_recent_for_job = 2;
}
@@ -43,9 +51,21 @@ message JobReport {
service SchedulerReporting {
// Return the most recent scheduling report for each executor.
- rpc GetSchedulingReport (SchedulingReportRequest) returns (SchedulingReport);
+ rpc GetSchedulingReport (SchedulingReportRequest) returns (SchedulingReport) {
+ option (google.api.http) = {
+ get: "/v1/scheduling-report"
+ };
+ }
// Return the most recent report scheduling for each executor for the given queue.
- rpc GetQueueReport (QueueReportRequest) returns (QueueReport);
+ rpc GetQueueReport (QueueReportRequest) returns (QueueReport) {
+ option (google.api.http) = {
+ get: "/v1/queue/{queue_name}/scheduler-report"
+ };
+ }
// Return the most recent scheduling report for each executor for the given job.
- rpc GetJobReport (JobReportRequest) returns (JobReport);
+ rpc GetJobReport (JobReportRequest) returns (JobReport) {
+ option (google.api.http) = {
+ get: "/v1/job/{job_id}/scheduler-report"
+ };
+ }
}
diff --git a/pkg/client/with_connection.go b/pkg/client/with_connection.go
index 68dfa9d0b20..4271a36e93b 100644
--- a/pkg/client/with_connection.go
+++ b/pkg/client/with_connection.go
@@ -3,8 +3,8 @@ package client
import (
"google.golang.org/grpc"
- "github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
+ "github.com/armadaproject/armada/pkg/api/schedulerobjects"
)
func WithConnection(apiConnectionDetails *ApiConnectionDetails, action func(*grpc.ClientConn) error) error {
diff --git a/scripts/build-python-client.sh b/scripts/build-python-client.sh
index 5fd23818146..f569b134ae7 100755
--- a/scripts/build-python-client.sh
+++ b/scripts/build-python-client.sh
@@ -3,7 +3,7 @@
# make the python package armada.client, not pkg.api
mkdir -p proto/armada
-cp pkg/api/event.proto pkg/api/submit.proto pkg/api/health.proto pkg/api/job.proto pkg/api/binoculars/binoculars.proto proto/armada
+cp pkg/api/event.proto pkg/api/submit.proto pkg/api/health.proto pkg/api/job.proto pkg/api/schedulerobjects/scheduler_reporting.proto pkg/api/binoculars/binoculars.proto proto/armada
sed -i 's/\([^\/]\)pkg\/api/\1armada/g' proto/armada/*.proto
# generate python stubs
diff --git a/third_party/airflow/armada/__init__.py b/third_party/airflow/armada/__init__.py
index 8190451aae1..fbb216e9259 100644
--- a/third_party/airflow/armada/__init__.py
+++ b/third_party/airflow/armada/__init__.py
@@ -10,8 +10,8 @@ def get_provider_info():
"name": "Armada Airflow Operator",
"description": "Armada Airflow Operator.",
"extra-links": [
- "armada.operators.armada.LookoutLink",
- "armada.operators.armada.DynamicLink",
+ "armada.links.LookoutLink",
+ "armada.links.DynamicLink",
],
"versions": ["1.0.0"],
}
diff --git a/third_party/airflow/armada/links.py b/third_party/airflow/armada/links.py
new file mode 100644
index 00000000000..84d634aa2c3
--- /dev/null
+++ b/third_party/airflow/armada/links.py
@@ -0,0 +1,86 @@
+from __future__ import annotations
+
+import re
+import attrs
+
+from typing import Dict, Optional, Union
+from airflow.models import XCom
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.models import BaseOperator, BaseOperatorLink
+from airflow.models.taskinstancekey import TaskInstanceKey
+
+
+def get_link_value(ti_key: TaskInstanceKey, name: str) -> Optional[str]:
+ return XCom.get_value(ti_key=ti_key, key=f"armada_{name.lower()}_url")
+
+
+def persist_link_value(ti_key: TaskInstanceKey, name: str, value: str):
+ XCom.set(
+ key=f"armada_{name.lower()}_url",
+ value=value,
+ dag_id=ti_key.dag_id,
+ task_id=ti_key.task_id,
+ run_id=ti_key.run_id,
+ map_index=ti_key.map_index,
+ )
+
+
+class LookoutLink(BaseOperatorLink):
+ name = "Lookout"
+
+ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
+ task_state = XCom.get_value(ti_key=ti_key, key="job_context")
+ if not task_state:
+ return ""
+
+ return task_state.get("armada_lookout_url", "")
+
+
+@attrs.define(init=True)
+class DynamicLink(BaseOperatorLink, LoggingMixin):
+ name: str
+
+ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
+ url = get_link_value(ti_key, self.name)
+ if not url:
+ return ""
+ return url
+
+
+class UrlFromLogsExtractor:
+ """Extracts and persists URLs from log messages based on regex patterns."""
+
+ def __init__(self, extra_links: Dict[str, re.Pattern], ti_key: TaskInstanceKey):
+ """
+ :param extra_links: Dictionary of link names to regex patterns for URLs
+ :param ti_key: TaskInstanceKey for XCom
+ """
+ self._extra_links = extra_links
+ self._ti_key = ti_key
+
+ @staticmethod
+ def create(
+ extra_links: Dict[str, Union[str, re.Pattern]], ti_key: TaskInstanceKey
+ ) -> UrlFromLogsExtractor:
+ valid_links = {
+ name: pattern
+ for name, pattern in extra_links.items()
+ if isinstance(pattern, re.Pattern) and not get_link_value(ti_key, name)
+ }
+ return UrlFromLogsExtractor(valid_links, ti_key)
+
+ def extract_and_persist_urls(self, message: str):
+ if not self._extra_links:
+ return
+
+ matches = []
+ for name in self._extra_links:
+ pattern = self._extra_links[name]
+ match = re.search(pattern, message)
+ if match:
+ url = match.group(0)
+ persist_link_value(self._ti_key, name, url)
+ matches.append(name)
+
+ for m in matches:
+ del self._extra_links[m]
diff --git a/third_party/airflow/armada/log_manager.py b/third_party/airflow/armada/log_manager.py
index 2e64e306718..fc9175a419f 100644
--- a/third_party/airflow/armada/log_manager.py
+++ b/third_party/airflow/armada/log_manager.py
@@ -7,12 +7,14 @@
import pendulum
import tenacity
from airflow.utils.log.logging_mixin import LoggingMixin
-from armada.auth import TokenRetriever
from kubernetes import client, config
from pendulum import DateTime
from pendulum.parsing.exceptions import ParserError
from urllib3.exceptions import HTTPError
+from .auth import TokenRetriever
+from .links import UrlFromLogsExtractor
+
class KubernetesPodLogManager(LoggingMixin):
"""Monitor logs of Kubernetes pods asynchronously."""
@@ -54,6 +56,7 @@ def fetch_container_logs(
pod: str,
container: str,
since_time: Optional[DateTime],
+ link_extractor: UrlFromLogsExtractor,
) -> Optional[DateTime]:
"""
Fetches container logs, do not follow container logs.
@@ -81,10 +84,14 @@ def fetch_container_logs(
self.log.exception(f"There was an error reading the kubernetes API: {e}.")
raise
- return self._stream_logs(container, since_time, logs)
+ return self._stream_logs(container, since_time, logs, link_extractor)
def _stream_logs(
- self, container: str, since_time: Optional[DateTime], logs: HTTPResponse
+ self,
+ container: str,
+ since_time: Optional[DateTime],
+ logs: HTTPResponse,
+ link_extractor: UrlFromLogsExtractor,
) -> Optional[DateTime]:
messages: List[str] = []
message_timestamp = None
@@ -97,8 +104,7 @@ def _stream_logs(
if line_timestamp: # detect new log-line (starts with timestamp)
if since_time and line_timestamp <= since_time:
continue
- self._log_container_message(container, messages)
- messages.clear()
+ self._log_container_message(container, messages, link_extractor)
message_timestamp = line_timestamp
messages.append(message)
except HTTPError as e:
@@ -106,12 +112,17 @@ def _stream_logs(
f"Reading of logs interrupted for container {container} with error {e}."
)
- self._log_container_message(container, messages)
+ self._log_container_message(container, messages, link_extractor)
return message_timestamp
- def _log_container_message(self, container: str, messages: List[str]):
- if messages:
- self.log.info("[%s] %s", container, "\n".join(messages))
+ def _log_container_message(
+ self, container: str, messages: List[str], link_extractor: UrlFromLogsExtractor
+ ):
+ message = "\n".join(messages)
+ if message:
+ link_extractor.extract_and_persist_urls(message)
+ self.log.info("[%s] %s", container, message)
+ messages.clear()
def _parse_log_line(self, line: bytes) -> Tuple[DateTime | None, str]:
"""
diff --git a/third_party/airflow/armada/operators/armada.py b/third_party/airflow/armada/operators/armada.py
index 5f6152a2588..094850c91e8 100644
--- a/third_party/airflow/armada/operators/armada.py
+++ b/third_party/airflow/armada/operators/armada.py
@@ -17,12 +17,11 @@
# under the License.
from __future__ import annotations
-import attrs
import dataclasses
import datetime
import os
import time
-from typing import Any, Callable, Dict, Optional, Sequence, Tuple
+from typing import Any, Callable, Dict, Optional, Sequence, Tuple, Union
import jinja2
import tenacity
@@ -30,9 +29,8 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowFailException
-from airflow.models import BaseOperator, BaseOperatorLink, XCom
+from airflow.models import BaseOperator
from airflow.models.taskinstance import TaskInstance
-from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.serialization.serde import deserialize
from airflow.utils.context import Context
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -50,26 +48,7 @@
from ..policies.reattach import external_job_uri, policy
from ..triggers import ArmadaPollJobTrigger
from ..utils import log_exceptions, xcom_pull_for_ti, resolve_parameter_value
-
-
-class LookoutLink(BaseOperatorLink):
- name = "Lookout"
-
- def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
- task_state = XCom.get_value(ti_key=ti_key, key="job_context")
- if not task_state:
- return ""
-
- return task_state.get("armada_lookout_url", "")
-
-
-@attrs.define(init=True)
-class DynamicLink(BaseOperatorLink, LoggingMixin):
- name: str
-
- def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
- url = XCom.get_value(ti_key=ti_key, key=f"armada_{self.name.lower()}_url")
- return url
+from ..links import LookoutLink, DynamicLink, persist_link_value, UrlFromLogsExtractor
class ArmadaOperator(BaseOperator, LoggingMixin):
@@ -118,8 +97,9 @@ class ArmadaOperator(BaseOperator, LoggingMixin):
:param reattach_policy: Operator reattach policy to use (defaults to: never)
:type reattach_policy: Optional[str] | Callable[[JobState, str], bool]
:param kwargs: Additional keyword arguments to pass to the BaseOperator.
-:param extra_links: Extra links to be shown in addition to Lookout URL.
-:type extra_links: Optional[Dict[str, str]]
+:param extra_links: Extra links to be shown in addition to Lookout URL. \
+Regex patterns will be extracted from container logs (taking first match).
+:type extra_links: Optional[Dict[str, Union[str, re.Pattern]]]
:param kwargs: Additional keyword arguments to pass to the BaseOperator.
"""
@@ -149,7 +129,7 @@ def __init__(
"armada_operator", "default_dry_run", fallback=False
),
reattach_policy: Optional[str] | Callable[[JobState, str], bool] = None,
- extra_links: Optional[Dict[str, str]] = None,
+ extra_links: Optional[Dict[str, Union[str, re.Pattern]]] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -326,9 +306,7 @@ def render_extra_links_urls(
continue
try:
rendered_url = jinja_env.from_string(url).render(context)
- self._xcom_push(
- context, key=f"armada_{name.lower()}_url", value=rendered_url
- )
+ persist_link_value(context["ti"].key, name, rendered_url)
except jinja2.TemplateError as e:
self.log.error(f"Error rendering template for {name} ({url}): {e}")
@@ -485,12 +463,16 @@ def _check_job_status_and_fetch_logs(self, context) -> None:
if self._should_have_a_pod_in_k8s() and self.container_logs:
try:
+ link_extractor = UrlFromLogsExtractor.create(
+ self.extra_links, context["ti"].key
+ )
last_log_time = self.pod_manager.fetch_container_logs(
k8s_context=self.job_context.cluster,
namespace=self.job_request.namespace,
pod=f"armada-{self.job_context.job_id}-0",
container=self.container_logs,
since_time=self.job_context.last_log_time,
+ link_extractor=link_extractor,
)
if last_log_time:
self.job_context = dataclasses.replace(
diff --git a/third_party/airflow/test/unit/operators/test_armada.py b/third_party/airflow/test/unit/operators/test_armada.py
index 3706276455c..539f7c9635c 100644
--- a/third_party/airflow/test/unit/operators/test_armada.py
+++ b/third_party/airflow/test/unit/operators/test_armada.py
@@ -1,7 +1,7 @@
import dataclasses
from datetime import timedelta
from typing import Optional
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, patch, ANY
import pytest
from airflow.exceptions import TaskDeferred
@@ -226,6 +226,7 @@ def test_polls_for_logs(context):
pod="armada-test_job-0",
container="alpine",
since_time=None,
+ link_extractor=ANY,
)