Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experiment - use first taskrun as anchor pod #1

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config-feature-flags.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ data:
# See more in the workspace documentation about Affinity Assistant
# https://github.com/tektoncd/pipeline/blob/main/docs/workspaces.md#affinity-assistant-and-specifying-workspace-order-in-a-pipeline
# or https://github.com/tektoncd/pipeline/pull/2630 for more info.
disable-affinity-assistant: "false"
disable-affinity-assistant: "true"
# Setting this flag to "true" will prevent Tekton scanning attached
# service accounts and injecting any credentials it finds into your
# Steps.
Expand Down
34 changes: 34 additions & 0 deletions examples/experiment/basic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
name: demo-pipeline-four-tasks
spec:
tasks:
- name: say-hello
taskSpec:
steps:
- image: busybox
command: ["/bin/sh", "-c"]
args:
- echo hello
- name: say-word
taskSpec:
steps:
- image: busybox
command: ["/bin/sh", "-c"]
args:
- echo word
- name: say-hello-again
taskSpec:
steps:
- image: busybox
command: ["/bin/sh", "-c"]
args:
- echo hello again
- name: say-word-again
taskSpec:
steps:
- image: busybox
command: ["/bin/sh", "-c"]
args:
- echo world again
8 changes: 8 additions & 0 deletions examples/experiment/basicrun.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
generateName: demo-pipeline-four-tasks-
spec:
pipelineRef:
name: demo-pipeline-four-tasks

69 changes: 69 additions & 0 deletions pkg/internal/experimentpod/transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2021 The Tekton Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package experimentpod

import (
"context"
"fmt"

"github.com/tektoncd/pipeline/pkg/pod"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// NewTransformer returns a pod.Transformer that will pod affinity if needed
func NewTransformer(_ context.Context, annotations map[string]string) pod.Transformer {
return func(p *corev1.Pod) (*corev1.Pod, error) {
// if it is an anchor pod, don't append pod affinity
if isFirstPod := annotations["first-pod"]; isFirstPod == "true" {
return p, nil
}

if p.Spec.Affinity == nil {
p.Spec.Affinity = &corev1.Affinity{}
}

anchorPod, ok := annotations["anchor-pod"]
if !ok {
return p, fmt.Errorf("missing anchor pod")
}

mergeAffinityWithAnchorPod(p.Spec.Affinity, anchorPod)
return p, nil
}
}

func mergeAffinityWithAnchorPod(affinity *corev1.Affinity, anchorPodName string) {
podAffinityTerm := podAffinityTermUsingAnchorPod(anchorPodName)

if affinity.PodAffinity == nil {
affinity.PodAffinity = &corev1.PodAffinity{}
}

affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution =
append(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution, *podAffinityTerm)
}

func podAffinityTermUsingAnchorPod(anchorPodName string) *corev1.PodAffinityTerm {
return &corev1.PodAffinityTerm{LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"anchor-pod": anchorPodName,
},
},
TopologyKey: "kubernetes.io/hostname",
}
}
6 changes: 6 additions & 0 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.
return errorutils.NewAggregate(errs)
}

func getExperimentAnchorPodName(pipelineRunName string) string {
hashBytes := sha256.Sum256([]byte(pipelineRunName))
hashString := fmt.Sprintf("%x", hashBytes)
return fmt.Sprintf("%s-%s", "anchor-pod", hashString[:10])
}

func getAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string {
hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName))
hashString := fmt.Sprintf("%x", hashBytes)
Expand Down
19 changes: 16 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ const (
ReasonResourceVerificationFailed = "ResourceVerificationFailed"
// ReasonCreateRunFailed indicates that the pipeline fails to create the taskrun or other run resources
ReasonCreateRunFailed = "CreateRunFailed"
FirstPod = "first-pod"
AnchorPod = "anchor-pod"
)

// constants used as kind descriptors for various types of runs; these constants
Expand Down Expand Up @@ -784,7 +786,11 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
return err
}
default:
rpt.TaskRun, err = c.createTaskRun(ctx, rpt.TaskRunName, nil, rpt, pr)
isFirstTaskRun := false
if pipelineRunFacts.State.IsBeforeFirstTaskRun() {
isFirstTaskRun = true
}
rpt.TaskRun, err = c.createTaskRun(ctx, rpt.TaskRunName, nil, rpt, pr, isFirstTaskRun)
if err != nil {
recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rpt.TaskRunName, err)
err = fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rpt.TaskRunName, rpt.PipelineTask.Name, pr.Name, err)
Expand Down Expand Up @@ -812,7 +818,9 @@ func (c *Reconciler) createTaskRuns(ctx context.Context, rpt *resources.Resolved
matrixCombinations := rpt.PipelineTask.Matrix.FanOut()
for i, taskRunName := range rpt.TaskRunNames {
params := matrixCombinations[i]
taskRun, err := c.createTaskRun(ctx, taskRunName, params, rpt, pr)

//ignore the matrixed case for prototype
taskRun, err := c.createTaskRun(ctx, taskRunName, params, rpt, pr, false)
if err != nil {
return nil, err
}
Expand All @@ -821,7 +829,7 @@ func (c *Reconciler) createTaskRuns(ctx context.Context, rpt *resources.Resolved
return taskRuns, nil
}

func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, params v1beta1.Params, rpt *resources.ResolvedPipelineTask, pr *v1beta1.PipelineRun) (*v1beta1.TaskRun, error) {
func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, params v1beta1.Params, rpt *resources.ResolvedPipelineTask, pr *v1beta1.PipelineRun, isFirstTaskrRun bool) (*v1beta1.TaskRun, error) {
ctx, span := c.tracerProvider.Tracer(TracerName).Start(ctx, "createTaskRun")
defer span.End()
logger := logging.FromContext(ctx)
Expand All @@ -847,6 +855,11 @@ func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, para
ComputeResources: taskRunSpec.ComputeResources,
}}

if isFirstTaskrRun {
tr.Annotations[FirstPod] = "true"
}
tr.Annotations[AnchorPod] = getExperimentAnchorPodName(pr.Name)

// Add current spanContext as annotations to TaskRun
// so that tracing can be continued under the same traceId
if spanContext, err := getMarshalledSpanFromContext(ctx); err == nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/internal/affinityassistant"
"github.com/tektoncd/pipeline/pkg/internal/computeresources"
"github.com/tektoncd/pipeline/pkg/internal/experimentpod"
resolutionutil "github.com/tektoncd/pipeline/pkg/internal/resolution"
podconvert "github.com/tektoncd/pipeline/pkg/pod"
tknreconciler "github.com/tektoncd/pipeline/pkg/reconciler"
Expand Down Expand Up @@ -719,6 +720,7 @@ func (c *Reconciler) createPod(ctx context.Context, ts *v1beta1.TaskSpec, tr *v1
pod, err := podbuilder.Build(ctx, tr, *ts,
computeresources.NewTransformer(ctx, tr.Namespace, c.limitrangeLister),
affinityassistant.NewTransformer(ctx, tr.Annotations),
experimentpod.NewTransformer(ctx, tr.Annotations),
)
if err != nil {
return nil, fmt.Errorf("translating TaskSpec to Pod: %w", err)
Expand Down