Skip to content

Commit

Permalink
Merge branch 'master' into add-jsonb-backend
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq authored Feb 26, 2024
2 parents bfb9368 + 817f7d3 commit cd07748
Show file tree
Hide file tree
Showing 19 changed files with 138 additions and 111 deletions.
2 changes: 1 addition & 1 deletion client/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "armada_client"
version = "0.2.8"
version = "0.2.9"
description = "Armada gRPC API python client"
readme = "README.md"
requires-python = ">=3.7"
Expand Down
10 changes: 6 additions & 4 deletions cmd/armada/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ func main() {
})

// Expose profiling endpoints if enabled.
pprofServer := profiling.SetupPprofHttpServer(config.PprofPort)
g.Go(func() error {
return serve.ListenAndServe(ctx, pprofServer)
})
if config.PprofPort != nil {
pprofServer := profiling.SetupPprofHttpServer(*config.PprofPort)
g.Go(func() error {
return serve.ListenAndServe(ctx, pprofServer)
})
}

// TODO This starts a separate HTTP server. Is that intended? Should we have a single mux for everything?
// TODO: Run in errgroup
Expand Down
16 changes: 9 additions & 7 deletions cmd/binoculars/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ func main() {
log.Info("Starting...")

// Expose profiling endpoints if enabled.
pprofServer := profiling.SetupPprofHttpServer(config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
if config.PprofPort != nil {
pprofServer := profiling.SetupPprofHttpServer(*config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
}

stopSignal := make(chan os.Signal, 1)
signal.Notify(stopSignal, syscall.SIGINT, syscall.SIGTERM)
Expand Down
16 changes: 9 additions & 7 deletions cmd/executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ func main() {
common.LoadConfig(&config, "./config/executor", userSpecifiedConfigs)

// Expose profiling endpoints if enabled.
pprofServer := profiling.SetupPprofHttpServer(config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
if config.PprofPort != nil {
pprofServer := profiling.SetupPprofHttpServer(*config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
}

mux := http.NewServeMux()
startupCompleteCheck := health.NewStartupCompleteChecker()
Expand Down
16 changes: 9 additions & 7 deletions cmd/fakeexecutor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ func main() {
v := common.LoadConfig(&config, "./config/executor", userSpecifiedConfigs)

// Expose profiling endpoints if enabled.
pprofServer := profiling.SetupPprofHttpServer(config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
if config.PprofPort != nil {
pprofServer := profiling.SetupPprofHttpServer(*config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
}

var nodes []*context.NodeSpec
e := common.UnmarshalKey(v, "nodes", &nodes)
Expand Down
16 changes: 9 additions & 7 deletions cmd/lookoutv2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,15 @@ func main() {
common.LoadConfig(&config, "./config/lookoutv2", userSpecifiedConfigs)

// Expose profiling endpoints if enabled.
pprofServer := profiling.SetupPprofHttpServer(config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
if config.PprofPort != nil {
pprofServer := profiling.SetupPprofHttpServer(*config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
}

log.SetLevel(log.DebugLevel)

Expand Down
5 changes: 5 additions & 0 deletions deployment/armada/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ spec:
- containerPort: {{ .Values.applicationConfig.httpPort }}
protocol: TCP
name: rest
{{- if .Values.applicationConfig.pprofPort }}
- containerPort: {{ .Values.applicationConfig.pprofPort }}
protocol: TCP
name: pprof
{{- end }}
volumeMounts:
- name: user-config
mountPath: /config/application_config.yaml
Expand Down
5 changes: 5 additions & 0 deletions deployment/binoculars/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ spec:
- containerPort: {{ .Values.applicationConfig.httpPort }}
protocol: TCP
name: web
{{- if .Values.applicationConfig.pprofPort }}
- containerPort: {{ .Values.applicationConfig.pprofPort }}
protocol: TCP
name: pprof
{{- end }}
volumeMounts:
- name: user-config
mountPath: /config/application_config.yaml
Expand Down
6 changes: 6 additions & 0 deletions deployment/event-ingester/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ spec:
{{- end }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
ports:
{{- if .Values.applicationConfig.pprofPort }}
- containerPort: {{ .Values.applicationConfig.pprofPort }}
protocol: TCP
name: pprof
{{- end }}
volumeMounts:
- name: user-config
mountPath: /config/application_config.yaml
Expand Down
5 changes: 5 additions & 0 deletions deployment/executor/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ spec:
- containerPort: 9001
protocol: TCP
name: metrics
{{- if .Values.applicationConfig.pprofPort }}
- containerPort: {{ .Values.applicationConfig.pprofPort }}
protocol: TCP
name: pprof
{{- end }}
volumeMounts:
- name: user-config
mountPath: /config/application_config.yaml
Expand Down
6 changes: 6 additions & 0 deletions deployment/lookout-ingester-v2/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ spec:
{{- end }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
ports:
{{- if .Values.applicationConfig.pprofPort }}
- containerPort: {{ .Values.applicationConfig.pprofPort }}
protocol: TCP
name: pprof
{{- end }}
volumeMounts:
- name: user-config
mountPath: /config/application_config.yaml
Expand Down
5 changes: 5 additions & 0 deletions deployment/lookout-v2/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ spec:
- containerPort: {{ .Values.applicationConfig.apiPort }}
protocol: TCP
name: web
{{- if .Values.applicationConfig.pprofPort }}
- containerPort: {{ .Values.applicationConfig.pprofPort }}
protocol: TCP
name: pprof
{{- end }}
volumeMounts:
- name: user-config
mountPath: /config/application_config.yaml
Expand Down
5 changes: 5 additions & 0 deletions deployment/scheduler/templates/scheduler-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ spec:
- containerPort: {{ .Values.scheduler.applicationConfig.metrics.port }}
protocol: TCP
name: metrics
{{- if .Values.scheduler.applicationConfig.pprofPort }}
- containerPort: {{ .Values.scheduler.applicationConfig.pprofPort }}
protocol: TCP
name: pprof
{{- end }}
volumeMounts:
- name: user-config
mountPath: /config/application_config.yaml
Expand Down
5 changes: 2 additions & 3 deletions internal/common/profiling/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (
// this function replaces http.DefaultServeMux with a new mux. This to ensure profiling endpoints
// are exposed on a separate mux available only from localhost.Hence, this function should be called
// before adding any other endpoints to http.DefaultServeMux.
// 2. If port is non-nil, returns a http server serving net/http/pprof endpoints on localhost:port.
// If port is nil, returns nil.
func SetupPprofHttpServer(port *uint16) *http.Server {
// 2. Returns a http server serving net/http/pprof endpoints on localhost:port.
func SetupPprofHttpServer(port uint16) *http.Server {
pprofMux := http.DefaultServeMux
http.DefaultServeMux = http.NewServeMux()
return &http.Server{
Expand Down
16 changes: 9 additions & 7 deletions internal/eventingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ func Run(config *configuration.EventIngesterConfiguration) {
log.Info("Event Ingester Starting")

// Expose profiling endpoints if enabled.
pprofServer := profiling.SetupPprofHttpServer(config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
if config.PprofPort != nil {
pprofServer := profiling.SetupPprofHttpServer(*config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
}

metrics := metrics.Get()

Expand Down
64 changes: 18 additions & 46 deletions internal/lookout/ui/src/hooks/useJobsTableData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,37 +83,29 @@ const columnToGroupSortFieldMap = new Map<ColumnId, string>([
[StandardColumnId.JobSet, "jobSet"],
])

// Return ordering to request to API based on column
function getOrder(
lookoutOrder: LookoutColumnOrder,
lookoutFilters: JobFilter[],
groupedColumns: ColumnId[],
isJobFetch: boolean,
): JobOrder {
const defaultJobOrder: JobOrder = {
field: "jobId",
direction: "DESC",
}
const defaultJobOrder: JobOrder = {
field: "jobId",
direction: "DESC",
}

const defaultGroupOrder: JobOrder = {
field: "count",
direction: "DESC",
}
const defaultGroupOrder: JobOrder = {
field: "count",
direction: "DESC",
}

// Return ordering to request to API based on column
function getOrder(lookoutOrder: LookoutColumnOrder, isJobFetch: boolean): JobOrder {
let field = ""
if (isJobFetch) {
if (!columnToJobSortFieldMap.has(lookoutOrder.id as ColumnId)) {
return defaultJobOrder
}
field = columnToJobSortFieldMap.get(lookoutOrder.id as ColumnId) as string
} else {
if (
!canOrderByField(lookoutFilters, groupedColumns, lookoutOrder.id) ||
!columnToGroupSortFieldMap.has(lookoutOrder.id as ColumnId)
) {
// If it is JobGroups, always return the order value here which might be overridden later
if (!columnToGroupSortFieldMap.has(lookoutOrder.id as ColumnId)) {
return defaultGroupOrder
}

field = columnToGroupSortFieldMap.get(lookoutOrder.id as ColumnId) as string
}

Expand All @@ -123,31 +115,6 @@ function getOrder(
}
}

function canOrderByField(lookoutFilters: JobFilter[], groupedColumns: string[], id: string): boolean {
// A function that determines whether we can see the field in the UI, therefore allowing us to sort by it
// Extract 'field' values from lookoutFilters, excluding the column
const filterFields = lookoutFilters.map((filter) => filter.field).filter((field) => field !== id)

// Exclude column and everything after from groupedColumns for direct comparison
const index = groupedColumns.findIndex((col) => col === id)
const filteredGroupedColumns = index >= 0 ? groupedColumns.slice(0, index) : groupedColumns

// Check if the lengths are different
if (filterFields.length !== filteredGroupedColumns.length) {
return false
}

// Check each element for an exact match
for (let i = 0; i < filterFields.length; i++) {
if (filterFields[i] !== filteredGroupedColumns[i]) {
return false
}
}

// If all checks pass, return true
return true
}

export const useFetchJobsTableData = ({
groupedColumns,
visibleColumns,
Expand Down Expand Up @@ -185,7 +152,7 @@ export const useFetchJobsTableData = ({
const isJobFetch = expandedLevel === groupingLevel

const filterValues = getFiltersForRows(lookoutFilters, columnMatches, parentRowInfo?.rowIdPartsPath ?? [])
const order = getOrder(lookoutOrder, filterValues, groupedColumns, isJobFetch)
const order = getOrder(lookoutOrder, isJobFetch)
const rowRequest: FetchRowRequest = {
filters: filterValues,
activeJobSets: activeJobSets,
Expand All @@ -205,6 +172,11 @@ export const useFetchJobsTableData = ({
const groupedCol = groupedColumns[expandedLevel]
const groupedField = columnToGroupedField(groupedCol)

// Override the group order if needed
if (rowRequest.order.field !== groupedCol) {
rowRequest.order = defaultGroupOrder
}

// Only relevant if we are grouping by annotations: Filter by all remaining annotations in the group by filter
rowRequest.filters.push(...getFiltersForGroupedAnnotations(groupedColumns.slice(expandedLevel + 1)))

Expand Down
16 changes: 9 additions & 7 deletions internal/lookoutingesterv2/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ func Run(config *configuration.LookoutIngesterV2Configuration) {
}

// Expose profiling endpoints if enabled.
pprofServer := profiling.SetupPprofHttpServer(config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
if config.PprofPort != nil {
pprofServer := profiling.SetupPprofHttpServer(*config.PprofPort)
go func() {
ctx := armadacontext.Background()
if err := serve.ListenAndServe(ctx, pprofServer); err != nil {
logging.WithStacktrace(ctx, err).Error("pprof server failure")
}
}()
}

converter := instructions.NewInstructionConverter(m, config.UserAnnotationPrefix, compressor, config.UseLegacyEventConversion)

Expand Down
10 changes: 6 additions & 4 deletions internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ func Run(config schedulerconfig.Configuration) error {
// ////////////////////////////////////////////////////////////////////////
// Profiling
// ////////////////////////////////////////////////////////////////////////
pprofServer := profiling.SetupPprofHttpServer(config.PprofPort)
g.Go(func() error {
return serve.ListenAndServe(ctx, pprofServer)
})
if config.PprofPort != nil {
pprofServer := profiling.SetupPprofHttpServer(*config.PprofPort)
g.Go(func() error {
return serve.ListenAndServe(ctx, pprofServer)
})
}

// ////////////////////////////////////////////////////////////////////////
// Health Checks
Expand Down
Loading

0 comments on commit cd07748

Please sign in to comment.