Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call an optional webhook before killing a pod #149

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ Use `UTC`, `Local` or pick a timezone name from the [(IANA) tz database](https:/
| `--dry-run` | don't kill pods, only log what would have been done | true |
| `--log-format` | specify the format of the log messages. Options are text and json | text |
| `--log-caller` | include the calling function name and location in the log messages | false |
| `--webhook` | filter pods by a POST webhook, if non HTTP 200 returned, exclude pod | no webhook calls |

## Related work

Expand Down
39 changes: 38 additions & 1 deletion chaoskube/chaoskube.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package chaoskube

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"regexp"
"time"

Expand Down Expand Up @@ -67,6 +71,8 @@ type Chaoskube struct {
Now func() time.Time

MaxKill int
// Webhook
Webhook url.URL
}

var (
Expand All @@ -90,7 +96,7 @@ var (
// * a logger implementing logrus.FieldLogger to send log output to
// * what specific terminator to use to imbue chaos on victim pods
// * whether to enable/disable dry-run mode
func New(client kubernetes.Interface, labels, annotations, namespaces, namespaceLabels labels.Selector, includedPodNames, excludedPodNames *regexp.Regexp, excludedWeekdays []time.Weekday, excludedTimesOfDay []util.TimePeriod, excludedDaysOfYear []time.Time, timezone *time.Location, minimumAge time.Duration, logger log.FieldLogger, dryRun bool, terminator terminator.Terminator, maxKill int) *Chaoskube {
func New(client kubernetes.Interface, labels, annotations, namespaces, namespaceLabels labels.Selector, includedPodNames, excludedPodNames *regexp.Regexp, excludedWeekdays []time.Weekday, excludedTimesOfDay []util.TimePeriod, excludedDaysOfYear []time.Time, timezone *time.Location, minimumAge time.Duration, logger log.FieldLogger, dryRun bool, terminator terminator.Terminator, maxKill int, Webhook url.URL) *Chaoskube {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "chaoskube"})
Expand All @@ -114,6 +120,7 @@ func New(client kubernetes.Interface, labels, annotations, namespaces, namespace
EventRecorder: recorder,
Now: time.Now,
MaxKill: maxKill,
Webhook: Webhook,
}
}

Expand Down Expand Up @@ -224,6 +231,7 @@ func (c *Chaoskube) Candidates() ([]v1.Pod, error) {
pods = filterByMinimumAge(pods, c.MinimumAge, c.Now())
pods = filterByPodName(pods, c.IncludedPodNames, c.ExcludedPodNames)
pods = filterByOwnerReference(pods)
pods = filterByWebhook(pods, c.Webhook)

return pods, nil
}
Expand Down Expand Up @@ -455,3 +463,32 @@ func filterByOwnerReference(pods []v1.Pod) []v1.Pod {

return filteredList
}

// filterByWebhook filters pods by a POST webhook. Only pods where the webhooks returns an
// HTTP 200 are returned
func filterByWebhook(pods []v1.Pod, url url.URL) []v1.Pod {
// return early if url is not given
if url.String() == "" {
return pods
}

filteredList := []v1.Pod{}

for _, pod := range pods {
postData := new(bytes.Buffer)
err := json.NewEncoder(postData).Encode(pod)
if err != nil {
continue
}
resp, err := http.Post(url.String(), "application/json", postData)
if err != nil {
continue
}

if resp.StatusCode == http.StatusOK {
filteredList = append(filteredList, pod)
}
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make one request for each candidate pod. If you have a cluster with 1000 pods and none of them aren't filtered out by the previous filters, this will call your endpoint 1000 times.

Is that what you had in mind?

If we want to go with the current behaviour we should document it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that was the idea, I understand that with a large number of pods that might not be workable.. Another option would be to call the webhook only for the randomly selected pod in Victim().

Either way would work for my use case, so maybe it's more wise to move things to Victim()? What do you think?


return filteredList
}
53 changes: 39 additions & 14 deletions chaoskube/chaoskube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chaoskube
import (
"context"
"math/rand"
"net/url"
"regexp"
"testing"
"time"
Expand Down Expand Up @@ -53,14 +54,15 @@ func (suite *Suite) TestNew() {
includedPodNames = regexp.MustCompile("foo")
excludedPodNames = regexp.MustCompile("bar")
excludedWeekdays = []time.Weekday{time.Friday}
excludedTimesOfDay = []util.TimePeriod{util.TimePeriod{}}
excludedTimesOfDay = []util.TimePeriod{{}}
excludedDaysOfYear = []time.Time{time.Now()}
minimumAge = time.Duration(42)
dryRun = true
terminator = terminator.NewDeletePodTerminator(client, logger, 10*time.Second)
maxKill = 1
)

webhook, _ := url.Parse("")
chaoskube := New(
client,
labelSelector,
Expand All @@ -78,6 +80,7 @@ func (suite *Suite) TestNew() {
dryRun,
terminator,
maxKill,
*webhook,
)
suite.Require().NotNil(chaoskube)

Expand Down Expand Up @@ -115,6 +118,7 @@ func (suite *Suite) TestRunContextCanceled() {
false,
10,
1,
url.URL{},
)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -132,19 +136,24 @@ func (suite *Suite) TestCandidates() {
labelSelector string
annotationSelector string
namespaceSelector string
webhook string
pods []map[string]string
}{
{"", "", "", []map[string]string{foo, bar}},
{"app=foo", "", "", []map[string]string{foo}},
{"app!=foo", "", "", []map[string]string{bar}},
{"", "chaos=foo", "", []map[string]string{foo}},
{"", "chaos!=foo", "", []map[string]string{bar}},
{"", "", "default", []map[string]string{foo}},
{"", "", "default,testing", []map[string]string{foo, bar}},
{"", "", "!testing", []map[string]string{foo}},
{"", "", "!default,!testing", []map[string]string{}},
{"", "", "default,!testing", []map[string]string{foo}},
{"", "", "default,!default", []map[string]string{}},
{"", "", "", "", []map[string]string{foo, bar}},
{"app=foo", "", "", "", []map[string]string{foo}},
{"app!=foo", "", "", "", []map[string]string{bar}},
{"", "chaos=foo", "", "", []map[string]string{foo}},
{"", "chaos!=foo", "", "", []map[string]string{bar}},
{"", "", "default", "", []map[string]string{foo}},
{"", "", "default,testing", "", []map[string]string{foo, bar}},
{"", "", "!testing", "", []map[string]string{foo}},
{"", "", "!default,!testing", "", []map[string]string{}},
{"", "", "default,!testing", "", []map[string]string{foo}},
{"", "", "default,!default", "", []map[string]string{}},
{"", "", "", "https://httpbin.org/status/404", []map[string]string{}},
{"", "", "", "https://httpbin.org/get", []map[string]string{}},
{"", "", "", "https://httpbin.org/status/200", []map[string]string{foo, bar}},
{"", "", "", "https://httpbin.org/post", []map[string]string{foo, bar}},
} {
labelSelector, err := labels.Parse(tt.labelSelector)
suite.Require().NoError(err)
Expand All @@ -155,6 +164,9 @@ func (suite *Suite) TestCandidates() {
namespaceSelector, err := labels.Parse(tt.namespaceSelector)
suite.Require().NoError(err)

webhook, err := url.Parse(tt.webhook)
suite.Require().NoError(err)

chaoskube := suite.setupWithPods(
labelSelector,
annotationSelector,
Expand All @@ -169,6 +181,7 @@ func (suite *Suite) TestCandidates() {
time.Duration(0),
false,
10,
*webhook,
)

suite.assertCandidates(chaoskube, tt.pods)
Expand Down Expand Up @@ -213,6 +226,7 @@ func (suite *Suite) TestCandidatesNamespaceLabels() {
time.Duration(0),
false,
10,
url.URL{},
)

suite.assertCandidates(chaoskube, tt.pods)
Expand Down Expand Up @@ -255,6 +269,7 @@ func (suite *Suite) TestCandidatesPodNameRegexp() {
time.Duration(0),
false,
10,
url.URL{},
)

suite.assertCandidates(chaoskube, tt.pods)
Expand Down Expand Up @@ -294,6 +309,7 @@ func (suite *Suite) TestVictim() {
time.Duration(0),
false,
10,
url.URL{},
)

suite.assertVictim(chaoskube, tt.victim)
Expand Down Expand Up @@ -347,6 +363,7 @@ func (suite *Suite) TestVictims() {
false,
10,
tt.maxKill,
url.URL{},
)
suite.createPods(chaoskube.Client, podsInfo)

Expand All @@ -371,6 +388,7 @@ func (suite *Suite) TestNoVictimReturnsError() {
false,
10,
1,
url.URL{},
)

_, err := chaoskube.Victims()
Expand Down Expand Up @@ -404,6 +422,7 @@ func (suite *Suite) TestDeletePod() {
time.Duration(0),
tt.dryRun,
10,
url.URL{},
)

victim := util.NewPod("default", "foo", v1.PodRunning)
Expand Down Expand Up @@ -433,6 +452,7 @@ func (suite *Suite) TestDeletePodNotFound() {
false,
10,
1,
url.URL{},
)

victim := util.NewPod("default", "foo", v1.PodRunning)
Expand Down Expand Up @@ -663,6 +683,7 @@ func (suite *Suite) TestTerminateVictim() {
time.Duration(0),
false,
10,
url.URL{},
)
chaoskube.Now = tt.now

Expand Down Expand Up @@ -693,6 +714,7 @@ func (suite *Suite) TestTerminateNoVictimLogsInfo() {
false,
10,
1,
url.URL{},
)

err := chaoskube.TerminateVictims()
Expand Down Expand Up @@ -723,7 +745,7 @@ func (suite *Suite) assertVictim(chaoskube *Chaoskube, expected map[string]strin
suite.assertVictims(chaoskube, []map[string]string{expected})
}

func (suite *Suite) setupWithPods(labelSelector labels.Selector, annotations labels.Selector, namespaces labels.Selector, namespaceLabels labels.Selector, includedPodNames *regexp.Regexp, excludedPodNames *regexp.Regexp, excludedWeekdays []time.Weekday, excludedTimesOfDay []util.TimePeriod, excludedDaysOfYear []time.Time, timezone *time.Location, minimumAge time.Duration, dryRun bool, gracePeriod time.Duration) *Chaoskube {
func (suite *Suite) setupWithPods(labelSelector labels.Selector, annotations labels.Selector, namespaces labels.Selector, namespaceLabels labels.Selector, includedPodNames *regexp.Regexp, excludedPodNames *regexp.Regexp, excludedWeekdays []time.Weekday, excludedTimesOfDay []util.TimePeriod, excludedDaysOfYear []time.Time, timezone *time.Location, minimumAge time.Duration, dryRun bool, gracePeriod time.Duration, webhook url.URL) *Chaoskube {
chaoskube := suite.setup(
labelSelector,
annotations,
Expand All @@ -739,6 +761,7 @@ func (suite *Suite) setupWithPods(labelSelector labels.Selector, annotations lab
dryRun,
gracePeriod,
1,
webhook,
)

for _, namespace := range []v1.Namespace{
Expand Down Expand Up @@ -774,7 +797,7 @@ func (suite *Suite) createPods(client kubernetes.Interface, podsInfo []podInfo)
}
}

func (suite *Suite) setup(labelSelector labels.Selector, annotations labels.Selector, namespaces labels.Selector, namespaceLabels labels.Selector, includedPodNames *regexp.Regexp, excludedPodNames *regexp.Regexp, excludedWeekdays []time.Weekday, excludedTimesOfDay []util.TimePeriod, excludedDaysOfYear []time.Time, timezone *time.Location, minimumAge time.Duration, dryRun bool, gracePeriod time.Duration, maxKill int) *Chaoskube {
func (suite *Suite) setup(labelSelector labels.Selector, annotations labels.Selector, namespaces labels.Selector, namespaceLabels labels.Selector, includedPodNames *regexp.Regexp, excludedPodNames *regexp.Regexp, excludedWeekdays []time.Weekday, excludedTimesOfDay []util.TimePeriod, excludedDaysOfYear []time.Time, timezone *time.Location, minimumAge time.Duration, dryRun bool, gracePeriod time.Duration, maxKill int, webhook url.URL) *Chaoskube {
logOutput.Reset()

client := fake.NewSimpleClientset()
Expand All @@ -797,6 +820,7 @@ func (suite *Suite) setup(labelSelector labels.Selector, annotations labels.Sele
dryRun,
terminator.NewDeletePodTerminator(client, nullLogger, gracePeriod),
maxKill,
webhook,
)
}

Expand Down Expand Up @@ -901,6 +925,7 @@ func (suite *Suite) TestMinimumAge() {
false,
10,
1,
url.URL{},
)
chaoskube.Now = tt.now

Expand Down
2 changes: 2 additions & 0 deletions examples/chaoskube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ spec:
- --timezone=UTC
# exclude all pods that haven't been running for at least one hour
- --minimum-age=1h
# check with a webhook before killing a pod
- --webhook=https://httpbin.org/post
# terminate pods for real: this disables dry-run mode which is on by default
- --no-dry-run
securityContext:
Expand Down
13 changes: 13 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"net/http"
_ "net/http/pprof"
"net/url"
"os"
"os/signal"
"path"
Expand Down Expand Up @@ -53,6 +54,7 @@ var (
dryRun bool
debug bool
metricsAddress string
webhook string
gracePeriod time.Duration
logFormat string
logCaller bool
Expand All @@ -68,6 +70,7 @@ func init() {
kingpin.Flag("namespace-labels", "A set of labels to restrict the list of affected namespaces. Defaults to everything.").StringVar(&nsLabelString)
kingpin.Flag("included-pod-names", "Regular expression that defines which pods to include. All included by default.").RegexpVar(&includedPodNames)
kingpin.Flag("excluded-pod-names", "Regular expression that defines which pods to exclude. None excluded by default.").RegexpVar(&excludedPodNames)
kingpin.Flag("webhook", "An HTTP webhook to execute before killing a pod").StringVar(&webhook)
kingpin.Flag("excluded-weekdays", "A list of weekdays when termination is suspended, e.g. Sat,Sun").StringVar(&excludedWeekdays)
kingpin.Flag("excluded-times-of-day", "A list of time periods of a day when termination is suspended, e.g. 22:00-08:00").StringVar(&excludedTimesOfDay)
kingpin.Flag("excluded-days-of-year", "A list of days of a year when termination is suspended, e.g. Apr1,Dec24").StringVar(&excludedDaysOfYear)
Expand Down Expand Up @@ -123,6 +126,7 @@ func main() {
"metricsAddress": metricsAddress,
"gracePeriod": gracePeriod,
"logFormat": logFormat,
"webhook": webhook,
}).Debug("reading config")

log.WithFields(log.Fields{
Expand Down Expand Up @@ -185,6 +189,14 @@ func main() {
}
timezoneName, offset := time.Now().In(parsedTimezone).Zone()

parsedWebhook, err := url.Parse(webhook)
if err != nil {
log.WithFields(log.Fields{
"webhook": webhook,
"err": err,
}).Fatal("failed to parse webhook")
}

log.WithFields(log.Fields{
"name": timezoneName,
"location": parsedTimezone,
Expand All @@ -208,6 +220,7 @@ func main() {
dryRun,
terminator.NewDeletePodTerminator(client, log.StandardLogger(), gracePeriod),
maxKill,
*parsedWebhook,
)

if metricsAddress != "" {
Expand Down