diff --git a/src/services/gcp/Gopkg.lock b/src/services/gcp/Gopkg.lock index 98b9e1879..9a5d8e662 100644 --- a/src/services/gcp/Gopkg.lock +++ b/src/services/gcp/Gopkg.lock @@ -25,6 +25,21 @@ revision = "346938d642f2ec3594ed81d874461961cd0faa76" version = "v1.1.0" +[[projects]] + digest = "1:ba9a3ee3359edeeec2c6c1ddb6976db4b7ca6ad045e73b43471e32f01b5786e4" + name = "github.com/dsnet/compress" + packages = [ + ".", + "bzip2", + "bzip2/internal/sais", + "internal", + "internal/errors", + "internal/prefix", + ] + pruneopts = "" + revision = "da652975a8eea9fa0735aba8056747a751db0bd3" + version = "v0.0.1" + [[projects]] digest = "1:1f8d5d186d2f0da45060d0383dcf4c165f17bcc3d309f3a509e7df206c5a4f37" name = "github.com/emicklei/go-restful" @@ -117,6 +132,14 @@ revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265" version = "v1.1.0" +[[projects]] + digest = "1:6a6322a15aa8e99bd156fbba0aae4e5d67b4bb05251d860b348a45dfdcba9cce" + name = "github.com/golang/snappy" + packages = ["."] + pruneopts = "" + revision = "2a8bb927dd31d8daada140a5d09578521ce5c36a" + version = "v0.0.1" + [[projects]] branch = "master" digest = "1:1e5b1e14524ed08301977b7b8e10c719ed853cbf3f24ecb66fae783a46f207a6" @@ -211,6 +234,14 @@ pruneopts = "" revision = "60711f1a8329503b04e1c88535f419d0bb440bff" +[[projects]] + digest = "1:e26c759de83ce6fe27f99082458a7ad9fcc51a95215b4f5bbacd1345b61bc6ba" + name = "github.com/mholt/archiver" + packages = ["."] + pruneopts = "" + revision = "d572b2e8b82726cee9476d1b9d63a7fe9b601ff1" + version = "v3.1.1" + [[projects]] digest = "1:0c0ff2a89c1bb0d01887e1dac043ad7efbf3ec77482ef058ac423d13497e16fd" name = "github.com/modern-go/concurrent" @@ -227,6 +258,14 @@ revision = "1df9eeb2bb81f327b96228865c5687bc2194af3f" version = "1.0.0" +[[projects]] + digest = "1:b93a939355dc613ca1c23bf39f6d4c207da431a31dd8af001cc334ad24cab9d3" + name = "github.com/nwaples/rardecode" + packages = ["."] + pruneopts = "" + revision = "cc3e4b2381762c56dbcdf9f93be03349a1dc1c14" + version = "v1.0.0" + [[projects]] branch = "master" digest = "1:c1700a35272968086ace964c327b07427e3a62065314287a26684ce12af415bb" @@ -260,6 +299,17 @@ revision = "5f041e8faa004a95c88a202771f4cc3e991971e6" version = "v2.0.1" +[[projects]] + digest = "1:02e6dc9c030387868a684f7754fd88c98ae9fc04c27f04aedceae998f00b9bbf" + name = "github.com/pierrec/lz4" + packages = [ + ".", + "internal/xxh32", + ] + pruneopts = "" + revision = "d705d4371bfccdf47f10e45584e896026c83616f" + version = "v2.2.3" + [[projects]] digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" name = "github.com/pmezard/go-difflib" @@ -303,6 +353,27 @@ revision = "ffdc059bfe9ce6a4e144ba849dbedead332c6053" version = "v1.3.0" +[[projects]] + digest = "1:cc4c87dc4fa2a87abd2a0901cbd8c0ca10a4a83929d62947de0ad111ab830e01" + name = "github.com/ulikunitz/xz" + packages = [ + ".", + "internal/hash", + "internal/xlog", + "lzma", + ] + pruneopts = "" + revision = "6f934d456d51e742b4eeab20d925a827ef22320a" + version = "v0.5.6" + +[[projects]] + branch = "master" + digest = "1:5d5ea0c53c32b0465b910eb1d98b045c2f14c416880c54dd5356a1c6b4569041" + name = "github.com/xi2/xz" + packages = ["."] + pruneopts = "" + revision = "48954b6210f8d154cb5f8484d3a3e1f83489309e" + [[projects]] branch = "master" digest = "1:23475506e81e16ca25540cb95db19d0746a7e9ca001e7a06463493bb58761754" @@ -565,6 +636,7 @@ analyzer-version = 1 input-imports = [ "github.com/golang/mock/gomock", + "github.com/mholt/archiver", "github.com/operator-framework/operator-sdk/pkg/sdk", "github.com/operator-framework/operator-sdk/pkg/sdk/action", "github.com/operator-framework/operator-sdk/pkg/sdk/handler", @@ -576,6 +648,8 @@ "github.com/stretchr/testify/assert", "github.com/stretchr/testify/require", "k8s.io/api/apps/v1", + "k8s.io/api/apps/v1beta1", + "k8s.io/api/apps/v1beta2", "k8s.io/api/batch/v1", "k8s.io/api/core/v1", "k8s.io/api/extensions/v1beta1", @@ -597,6 +671,8 @@ "k8s.io/client-go/kubernetes", "k8s.io/client-go/kubernetes/fake", "k8s.io/client-go/kubernetes/typed/apps/v1", + "k8s.io/client-go/kubernetes/typed/apps/v1beta1", + "k8s.io/client-go/kubernetes/typed/apps/v1beta2", "k8s.io/client-go/kubernetes/typed/batch/v1", "k8s.io/client-go/kubernetes/typed/core/v1", "k8s.io/client-go/kubernetes/typed/extensions/v1beta1", diff --git a/src/services/gcp/cmd/gcp/main.go b/src/services/gcp/cmd/gcp/main.go index fec869cc3..021654bd2 100644 --- a/src/services/gcp/cmd/gcp/main.go +++ b/src/services/gcp/cmd/gcp/main.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "context" + "os" "runtime" "os/exec" "io/ioutil" @@ -24,7 +25,7 @@ func printVersion() { func main() { printVersion() - gcpserviceaccount := "/var/run/infrabox.net/gcp/service_account.json" + gcpserviceaccount := "/var/run/infrabox.net/gcp/service_account.json" logrus.Info("Activating GCP service account") authCmd := exec.Command("gcloud", "auth", "activate-service-account", "--key-file", gcpserviceaccount) @@ -57,10 +58,23 @@ func main() { if err != nil { logrus.Fatalf("Failed to get watch gcp: %v", err) } - resyncPeriod := 5 + resyncPeriod := 20 logrus.Infof("Watching %s, %s, %s, %d", resource, kind, namespace, resyncPeriod) - logrus.SetLevel(logrus.WarnLevel) + logLevel := os.Getenv("LOG_LEVEL") + + switch logLevel { + case "debug": + logrus.SetLevel(logrus.DebugLevel) + case "info": + logrus.SetLevel(logrus.InfoLevel) + case "warn": + logrus.SetLevel(logrus.WarnLevel) + case "error": + logrus.SetLevel(logrus.ErrorLevel) + default: + logrus.SetLevel(logrus.InfoLevel) + } sdk.Watch(resource, kind, namespace, resyncPeriod) sdk.Handle(stub.NewHandler()) diff --git a/src/services/gcp/infrabox-service-gcp/templates/deployment.yaml b/src/services/gcp/infrabox-service-gcp/templates/deployment.yaml index 3db18a52a..51d8037e8 100644 --- a/src/services/gcp/infrabox-service-gcp/templates/deployment.yaml +++ b/src/services/gcp/infrabox-service-gcp/templates/deployment.yaml @@ -20,6 +20,10 @@ spec: release: {{ .Release.Name }} spec: serviceAccountName: infrabox-service-gcp + {{ if .Values.imagePullSecret }} + imagePullSecrets: + - name: {{ .Values.imagePullSecret }} + {{ end }} containers: - name: {{ .Chart.Name }} command: ["./gcp"] @@ -36,6 +40,8 @@ spec: value: infrabox-worker - name: MAX_NUM_CLUSTERS value: {{ .Values.max_clusters | quote }} + - name: LOG_LEVEL + value: {{ .Values.log_level | quote }} volumes: - name: service-account secret: diff --git a/src/services/gcp/infrabox-service-gcp/values.yaml b/src/services/gcp/infrabox-service-gcp/values.yaml index cc3cce963..5968d9b48 100644 --- a/src/services/gcp/infrabox-service-gcp/values.yaml +++ b/src/services/gcp/infrabox-service-gcp/values.yaml @@ -4,4 +4,9 @@ image: repository: quay.io/infrabox/service-gcp tag: latest +imagePullSecret: + max_clusters: 30 + +# info | debug | warn | error +log_level: info diff --git a/src/services/gcp/pkg/stub/handler.go b/src/services/gcp/pkg/stub/handler.go index 380548006..8f552aec0 100644 --- a/src/services/gcp/pkg/stub/handler.go +++ b/src/services/gcp/pkg/stub/handler.go @@ -1,951 +1,1042 @@ package stub import ( - "bytes" - "crypto/tls" - "crypto/x509" - b64 "encoding/base64" - "encoding/json" - "fmt" - "io/ioutil" - "mime/multipart" - "net/http" - "os" - "os/exec" - "strconv" - "strings" - "time" - - uuid "github.com/satori/go.uuid" - - "github.com/sap/infrabox/src/services/gcp/pkg/apis/gcp/v1alpha1" - "github.com/sap/infrabox/src/services/gcp/pkg/stub/cleaner" - - goerrors "errors" - - "k8s.io/client-go/discovery" - "k8s.io/client-go/discovery/cached" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - - "github.com/operator-framework/operator-sdk/pkg/sdk/action" - "github.com/operator-framework/operator-sdk/pkg/sdk/handler" - "github.com/operator-framework/operator-sdk/pkg/sdk/types" - "github.com/operator-framework/operator-sdk/pkg/util/k8sutil" - - "github.com/sirupsen/logrus" - - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/intstr" + "bytes" + "crypto/tls" + "crypto/x509" + b64 "encoding/base64" + "encoding/json" + "fmt" + "io/ioutil" + "mime/multipart" + "net/http" + "os" + "os/exec" + "path" + "strconv" + "strings" + "time" + + uuid "github.com/satori/go.uuid" + + "github.com/sap/infrabox/src/services/gcp/pkg/apis/gcp/v1alpha1" + "github.com/sap/infrabox/src/services/gcp/pkg/stub/cleaner" + + goerrors "errors" + + "k8s.io/client-go/discovery" + "k8s.io/client-go/discovery/cached" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/operator-framework/operator-sdk/pkg/sdk/action" + "github.com/operator-framework/operator-sdk/pkg/sdk/handler" + "github.com/operator-framework/operator-sdk/pkg/sdk/types" + "github.com/operator-framework/operator-sdk/pkg/util/k8sutil" + + "github.com/sirupsen/logrus" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/mholt/archiver" ) type MasterAuth struct { - ClientCertificate string - ClientKey string - ClusterCaCertificate string - Username string - Password string + ClientCertificate string + ClientKey string + ClusterCaCertificate string + Username string + Password string } type RemoteCluster struct { - Name string - Status string - Endpoint string - MasterAuth MasterAuth + Name string + Status string + Endpoint string + MasterAuth MasterAuth } func NewHandler() handler.Handler { - return &Handler{} + return &Handler{} } type Handler struct{} +func setClusterName(cr *v1alpha1.GKECluster, log *logrus.Entry) error { + finalizers := cr.GetFinalizers() + if len(finalizers) == 0 { + cr.SetFinalizers([]string{"gcp.service.infrabox.net"}) + cr.Status.Status = "pending" + u := uuid.NewV4() + + cr.Status.ClusterName = "ib-" + u.String() + return action.Update(cr) + } + return nil +} + +func createCluster(cr *v1alpha1.GKECluster, log *logrus.Entry) (*v1alpha1.GKEClusterStatus, error) { + limit := os.Getenv("MAX_NUM_CLUSTERS") + status := cr.Status + + if limit != "" { + gkeclusters, err := getRemoteClusters(log) + if err != nil && !errors.IsNotFound(err) { + log.Errorf("Could not get GKE Clusters: %v", err) + return nil, err + } + + l, err := strconv.Atoi(limit) + + if err != nil { + log.Errorf("Failed to parse cluster limit: %v", err) + return nil, err + } + + if len(gkeclusters) >= l { + status.Status = "pending" + status.Message = "Cluster limit reached, waiting..." + log.Debug(status.Message) + return &status, nil + } + } + + log.Infof("Create GKE cluster %s", cr.Status.ClusterName) + + args := []string{"container", "clusters", + "create", cr.Status.ClusterName, + "--async", + "--enable-autorepair", + "--scopes=gke-default,storage-rw", + "--zone", cr.Spec.Zone, + } + + if cr.Spec.DiskSize != 0 { + args = append(args, "--disk-size") + args = append(args, strconv.Itoa(int(cr.Spec.DiskSize))) + } + + if cr.Spec.MachineType != "" { + args = append(args, "--machine-type") + args = append(args, cr.Spec.MachineType) + } + + if cr.Spec.EnableNetworkPolicy { + args = append(args, "--enable-network-policy") + } + + if cr.Spec.NumNodes != 0 { + args = append(args, "--num-nodes") + args = append(args, strconv.Itoa(int(cr.Spec.NumNodes))) + } + + if cr.Spec.Preemptible { + args = append(args, "--preemptible") + } + + if cr.Spec.EnableAutoscaling { + args = append(args, "--enable-autoscaling") + + if cr.Spec.MaxNodes != 0 { + args = append(args, "--max-nodes") + args = append(args, strconv.Itoa(int(cr.Spec.MaxNodes))) + } + + if cr.Spec.MinNodes != 0 { + args = append(args, "--min-nodes") + args = append(args, strconv.Itoa(int(cr.Spec.MinNodes))) + } + } + + if cr.Spec.ClusterVersion != "" { + // find out the exact cluster version + version, err := getExactClusterVersion(cr, log) + + if err != nil { + return nil, err + } + + args = append(args, "--cluster-version", version) + } + + cmd := exec.Command("gcloud", args...) + out, err := cmd.CombinedOutput() + + if err != nil { + log.Errorf("Failed to create GKE Cluster: %v", err) + log.Error(string(out)) + return nil, err + } + + status.Status = "pending" + status.Message = "Cluster is being created" + return &status, nil +} + func syncGKECluster(cr *v1alpha1.GKECluster, log *logrus.Entry) (*v1alpha1.GKEClusterStatus, error) { - if cr.Status.Status == "ready" || cr.Status.Status == "error" { - return &cr.Status, nil - } - - finalizers := cr.GetFinalizers() - if len(finalizers) == 0 { - cr.SetFinalizers([]string{"gcp.service.infrabox.net"}) - cr.Status.Status = "pending" - u := uuid.NewV4() - - cr.Status.ClusterName = "ib-" + u.String() - err := action.Update(cr) - if err != nil { - log.Errorf("Failed to set finalizers: %v", err) - return nil, err - } - } - - // Get the GKE Cluster - gkecluster, err := getRemoteCluster(cr.Status.ClusterName, log) - if err != nil && !errors.IsNotFound(err) { - log.Errorf("Could not get GKE Cluster: %v", err) - return nil, err - } - - if gkecluster == nil { - // Check if we already reached the maximum number of running clusters - limit := os.Getenv("MAX_NUM_CLUSTERS") - create := true - - if limit != "" { - gkeclusters, err := getRemoteClusters(log) - if err != nil && !errors.IsNotFound(err) { - log.Errorf("Could not get GKE Clusters: %v", err) - return nil, err - } - - l, err := strconv.Atoi(limit) - - if err != nil { - log.Errorf("Failed to parse cluster limit: %v", err) - return nil, err - } - - if len(gkeclusters) >= l { - create = false - } - } - - // Create the cluser - if create { - args := []string{"container", "clusters", - "create", cr.Status.ClusterName, - "--async", - "--enable-autorepair", - "--scopes=gke-default,storage-rw", - "--zone", cr.Spec.Zone, - } - - if cr.Spec.DiskSize != 0 { - args = append(args, "--disk-size") - args = append(args, strconv.Itoa(int(cr.Spec.DiskSize))) - } - - if cr.Spec.MachineType != "" { - args = append(args, "--machine-type") - args = append(args, cr.Spec.MachineType) - } - - if cr.Spec.EnableNetworkPolicy { - args = append(args, "--enable-network-policy") - } - - if cr.Spec.NumNodes != 0 { - args = append(args, "--num-nodes") - args = append(args, strconv.Itoa(int(cr.Spec.NumNodes))) - } - - if cr.Spec.Preemptible { - args = append(args, "--preemptible") - } - - if cr.Spec.EnableAutoscaling { - args = append(args, "--enable-autoscaling") - - if cr.Spec.MaxNodes != 0 { - args = append(args, "--max-nodes") - args = append(args, strconv.Itoa(int(cr.Spec.MaxNodes))) - } - - if cr.Spec.MinNodes != 0 { - args = append(args, "--min-nodes") - args = append(args, strconv.Itoa(int(cr.Spec.MinNodes))) - } - } - - if cr.Spec.ClusterVersion != "" { - // find out the exact cluster version - version, err := getExactClusterVersion(cr, log) - - if err != nil { - return nil, err - } - - args = append(args, "--cluster-version", version) - } - - cmd := exec.Command("gcloud", args...) - out, err := cmd.CombinedOutput() - - if err != nil { - log.Errorf("Failed to create GKE Cluster: %v", err) - log.Error(string(out)) - return nil, err - } - - status := cr.Status - status.Status = "pending" - status.Message = "Cluster is being created" - return &status, nil - } else { - status := cr.Status - status.Status = "pending" - status.Message = "Cluster limit reached, waiting..." - return &status, nil - } - } else { - if err != nil { - log.Errorf("Failed to create secret: %v", err) - return nil, err - } - - if gkecluster.Status == "RUNNING" { - err = injectCollector(gkecluster, log) - if err != nil { - log.Errorf("Failed to inject collector: %v", err) - return nil, err - } - - err = action.Create(newSecret(cr, gkecluster)) - if err != nil && !errors.IsAlreadyExists(err) { - log.Errorf("Failed to create secret: %v", err) - return nil, err - } - - status := cr.Status - status.Status = "ready" - status.Message = "Cluster ready" - return &status, nil - } - } - - return &cr.Status, nil + if cr.Status.Status == "ready" || cr.Status.Status == "error" { + return &cr.Status, nil + } + + if err := setClusterName(cr, log); err != nil { + log.Errorf("Failed to set finalizers: %v", err) + return nil, err + } + // Get the GKE Cluster + gkecluster, err := getRemoteCluster(cr.Status.ClusterName, log) + if err != nil && !errors.IsNotFound(err) { + log.Errorf("Could not get GKE Cluster: %v", err) + return nil, err + } + + if gkecluster == nil { + return createCluster(cr, log) + } else { + if gkecluster.Status == "RUNNING" { + err = injectCollector(gkecluster, log) + if err != nil { + log.Errorf("Failed to inject collector: %v", err) + return nil, err + } + + err = action.Create(newSecret(cr, gkecluster)) + if err != nil && !errors.IsAlreadyExists(err) { + log.Errorf("Failed to create secret: %v", err) + return nil, err + } + + log.Infof("GKE cluster %s is ready", cr.Status.ClusterName) + + status := cr.Status + status.Status = "ready" + status.Message = "Cluster ready" + return &status, nil + } + if gkecluster.Status == "ERROR" { + log.Errorf("Error creating cluster %s", cr.Status.ClusterName) + return nil, goerrors.New("error creating GKE cluster") + } + } + + return &cr.Status, nil +} + +func deleteRemoteCluster(cr *v1alpha1.GKECluster, log *logrus.Entry) error { + log.Infof("Deleting cluster %s", cr.Status.ClusterName) + cmd := exec.Command("gcloud", "-q", "container", "clusters", "delete", cr.Status.ClusterName, "--async", "--zone", cr.Spec.Zone) + out, err := cmd.CombinedOutput() + + if err != nil { + log.Errorf("Failed to delete cluster: %v", err) + log.Error(string(out)) + } + return err +} + +func collectLogs(c *RemoteCluster, cr *v1alpha1.GKECluster, log *logrus.Entry, started chan int) { + logPath := path.Join("/tmp", cr.Status.ClusterName) + err := os.Mkdir(logPath, os.ModePerm) + if err != nil { + log.Warningf("Failed to create pod logs dir, won't collect pod logs %v", err) + close(started) + return + } + close(started) + + done := make(chan error) + go retrieveLogs(cr, c, log, logPath, done) + + defer func() { + if _, err := os.Stat(logPath); !os.IsNotExist(err) { + _ = os.RemoveAll(logPath) + } + }() + + for { + select { + case <-time.After(time.Minute * 5): + log.Infof("timeout collecting logs for %s", cr.Status.ClusterName) + return + case <-done: + log.Infof("finished collecting logs for %s", cr.Status.ClusterName) + return + } + } +} + +func cleanUpCrd(cr *v1alpha1.GKECluster, log *logrus.Entry) error { + secretName := cr.ObjectMeta.Labels["service.infrabox.net/secret-name"] + secret := v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: cr.Namespace, + }, + } + + err := action.Delete(&secret) + if err != nil && !errors.IsNotFound(err) { + log.Errorf("Failed to delete secret: %v", err) + return err + } + + cr.SetFinalizers([]string{}) + err = action.Update(cr) + if err != nil { + log.Errorf("Failed to remove finalizers: %v", err) + return err + } + + err = action.Delete(cr) + if err != nil && !errors.IsNotFound(err) { + log.Errorf("Failed to delete cr: %v", err) + return err + } + + return nil +} + +func checkTimeout(cr *v1alpha1.GKECluster, log *logrus.Entry) error { + t, err := time.Parse(time.RFC1123, cr.Status.FirstCleanedAt) + if err != nil { + log.Debugf("couldn't parse stored timestamp ('%s', err: %s) => reset it", cr.Status.FirstCleanedAt, err.Error()) + if err = setAndUpdateFirstCleaned(cr, log); err != nil { + log.Errorf("couldn't set first cleaned timestamp: %v", err) + return err + } + } else { + waitDur := time.Minute * 5 + sinceFirstCleaned := time.Since(t).Truncate(time.Second) + if sinceFirstCleaned < waitDur { + log.Debugf("timestamp FirstCleaned: %s => %s since then. Wait until %s have elapsed since first cleaning", cr.Status.FirstCleanedAt, sinceFirstCleaned, waitDur) + } else { + log.Debugf("timestamp FirstCleaned: %s => %s since then. Proceed with deleting cluster", cr.Status.FirstCleanedAt, sinceFirstCleaned) + cr.Status.Message = "deleting cluster" + if err = action.Update(cr); err != nil { + log.Errorf("Failed to update status: %v", err) + return err + } + } + } + return nil } func deleteGKECluster(cr *v1alpha1.GKECluster, log *logrus.Entry) error { - if cr.Status.Status != "deleting" { - cr.Status.Status = "deleting" - cr.Status.Message = "collecting logs" - - err := action.Update(cr) - if err != nil { - log.Errorf("Failed to update status: %v", err) - return err - } - } - - // Get the GKE Cluster - gkecluster, err := getRemoteCluster(cr.Status.ClusterName, log) - if err != nil && !errors.IsNotFound(err) { - log.Errorf("Failed to get GKE Cluster: %v", err) - return err - } - - if gkecluster != nil { - if cr.Status.Message == "collecting logs" { - // only try it once when the cluster is still running - retrieveLogs(cr, gkecluster, log) - - cr.Status.Message = "cleaning cluster" - err := action.Update(cr) - if err != nil { - log.Errorf("Failed to update status: %v", err) - return err - } - } - - if cr.Status.Message == "cleaning cluster" { - if cr.Status.FirstCleanedAt == "" { - isClean, err := cleanupK8s(gkecluster, log) - if err != nil { - return err - } else if !isClean { // don't proceed if cluster isn't clean - return nil - } - - if err = setAndUpdateFirstCleaned(cr, log); err != nil { - return err - } - - } else { // cluster was cleaned before -> check if it happened more than 5 mins ago - t, err := time.Parse(time.RFC1123, cr.Status.FirstCleanedAt) - if err != nil { - log.Debugf("couldn't parse stored timestamp ('%s', err: %s) => reset it", cr.Status.FirstCleanedAt, err.Error()) - if err = setAndUpdateFirstCleaned(cr, log); err != nil { - log.Errorf("couldn't set first cleaned timestamp: %v", err) - return err - } - } else { - waitDur := time.Minute * 5 - sinceFirstCleaned := time.Since(t).Truncate(time.Second) - if sinceFirstCleaned < waitDur { - log.Debugf("timestamp FirstCleaned: %s => %s since then. Wait until %s have elapsed since first cleaning", cr.Status.FirstCleanedAt, sinceFirstCleaned, waitDur) - } else { - log.Debug("timestamp FirstCleaned: %s => %s since then. Proceed with deleting cluster", cr.Status.FirstCleanedAt, sinceFirstCleaned) - cr.Status.Message = "deleting cluster" - if err = action.Update(cr); err != nil { - log.Errorf("Failed to update status: %v", err) - return err - } - } - } - } - } - - // Cluster still exists, delete it - if cr.Status.Message == "deleting cluster" { - cmd := exec.Command("gcloud", "-q", "container", "clusters", "delete", cr.Status.ClusterName, "--async", "--zone", cr.Spec.Zone) - out, err := cmd.CombinedOutput() - - if err != nil { - log.Errorf("Failed to delete cluster: %v", err) - log.Error(string(out)) - return err - } - } - - return nil - } - - secretName := cr.ObjectMeta.Labels["service.infrabox.net/secret-name"] - secret := v1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: secretName, - Namespace: cr.Namespace, - }, - } - - err = action.Delete(&secret) - if err != nil && !errors.IsNotFound(err) { - log.Errorf("Failed to delete secret: %v", err) - return err - } - - cr.SetFinalizers([]string{}) - err = action.Update(cr) - if err != nil { - log.Errorf("Failed to remove finalizers: %v", err) - return err - } - - err = action.Delete(cr) - if err != nil && !errors.IsNotFound(err) { - log.Errorf("Failed to delete cr: %v", err) - return err - } - - return nil + // Get the GKE Cluster + gkecluster, err := getRemoteCluster(cr.Status.ClusterName, log) + if err != nil && !errors.IsNotFound(err) { + log.Errorf("Failed to get GKE Cluster: %v", err) + return err + } + + if gkecluster == nil { + if err = cleanUpCrd(cr, log); err != nil { + log.Errorf("Failed to delete GKECluster CRD") + } + log.Infof("GKE cluster %s removed", cr.Status.ClusterName) + return err + } + + if cr.Status.Status != "deleting" { + cr.Status.Status = "deleting" + + // Don't collect logs for abnormal clusters + if gkecluster.Status != "RUNNING" { + cr.Status.Message = "cleaning cluster" + if err = action.Update(cr); err != nil { + log.Errorf("Failed to update status: %v", err) + return err + } + return nil + } + + cr.Status.Message = "collecting logs" + + if err := setAndUpdateFirstCleaned(cr, log); err != nil { + return err + } + + log.Infof("Start clean up GKE cluster %s", cr.Status.ClusterName) + + started := make(chan int) + go collectLogs(gkecluster, cr, log, started) + <- started + } + + switch cr.Status.Message { + case "collecting logs": + if _, err := os.Stat(path.Join("/tmp", cr.Status.ClusterName)); os.IsNotExist(err) { + cr.Status.Message = "cleaning cluster" + err := action.Update(cr) + if err != nil { + log.Errorf("Failed to update status: %v", err) + return err + } + } + + case "cleaning cluster": + isClean, err := cleanupK8s(gkecluster, log) + if err != nil { + _ = checkTimeout(cr, log) + return err + } else if !isClean { // don't proceed if cluster isn't clean + _ = checkTimeout(cr, log) + return nil + } + + cr.Status.Message = "deleting cluster" + if err = action.Update(cr); err != nil { + log.Errorf("Failed to update status: %v", err) + return err + } + + case "deleting cluster": + // cluster is being deleted + if gkecluster.Status == "STOPPING" { + return nil + } + if err = deleteRemoteCluster(cr, log); err != nil { + log.Errorf("Error delete gke cluster %s", cr.Status.ClusterName) + return err + } + } + + return nil } func setAndUpdateFirstCleaned(cr *v1alpha1.GKECluster, log *logrus.Entry) error { - cr.Status.FirstCleanedAt = time.Now().Format(time.RFC1123) - log.Debug("set first-cleaned timestamp to ", cr.Status.FirstCleanedAt) - err := action.Update(cr) - if err != nil { - log.Errorf("Failed to update status: %v", err) - } - return err + cr.Status.FirstCleanedAt = time.Now().Format(time.RFC1123) + log.Debug("set first-cleaned timestamp to ", cr.Status.FirstCleanedAt) + err := action.Update(cr) + if err != nil { + log.Errorf("Failed to update status: %v", err) + } + return err } func cleanupK8s(cluster *RemoteCluster, log *logrus.Entry) (bool, error) { - remoteClusterSdk, err := newRemoteClusterSDK(cluster) - if err != nil { - return false, err - } - - cs, err := kubernetes.NewForConfig(remoteClusterSdk.kubeConfig) - if err != nil { - log.Errorf("Failed to create clientset from given kubeconfig: %v", err) - return false, err - } - - isClean, err := cleaner.NewK8sCleaner(cs, log).Cleanup() - return isClean, err + remoteClusterSdk, err := newRemoteClusterSDK(cluster) + if err != nil { + return false, err + } + + cs, err := kubernetes.NewForConfig(remoteClusterSdk.kubeConfig) + if err != nil { + log.Errorf("Failed to create clientset from given kubeconfig: %v", err) + return false, err + } + + isClean, err := cleaner.NewK8sCleaner(cs, log).Cleanup() + return isClean, err } func (h *Handler) Handle(ctx types.Context, event types.Event) error { - switch o := event.Object.(type) { - case *v1alpha1.GKECluster: - ns := o - if event.Deleted { - return nil - } - - log := logrus.WithFields(logrus.Fields{ - "namespace": ns.Namespace, - "name": ns.Name, - }) - - delTimestamp := ns.GetDeletionTimestamp() - if delTimestamp != nil { - return deleteGKECluster(ns, log) - } else { - status, err := syncGKECluster(ns, log) - - if err != nil { - ns.Status.Status = "error" - ns.Status.Message = err.Error() - err = action.Update(ns) - return err - } else { - if ns.Status.Status != status.Status || ns.Status.Message != status.Message { - ns.Status = *status - err = action.Update(ns) - return err - } - } - } - } - return nil + switch o := event.Object.(type) { + case *v1alpha1.GKECluster: + ns := o + if event.Deleted { + return nil + } + + log := logrus.WithFields(logrus.Fields{ + "namespace": ns.Namespace, + "name": ns.Name, + }) + + delTimestamp := ns.GetDeletionTimestamp() + if delTimestamp != nil { + return deleteGKECluster(ns, log) + } else { + status, err := syncGKECluster(ns, log) + + if err != nil { + ns.Status.Status = "error" + ns.Status.Message = err.Error() + err = action.Update(ns) + return err + } else { + if ns.Status.Status != status.Status || ns.Status.Message != status.Message { + ns.Status = *status + err = action.Update(ns) + return err + } + } + } + } + return nil } func getLabels(cr *v1alpha1.GKECluster) map[string]string { - return map[string]string{} + return map[string]string{} } type ServerConfig struct { - ValidMasterVersions []string `json:"validMasterVersions"` - ValidNodeVersions []string `json:"validNodeVersions"` + ValidMasterVersions []string `json:"validMasterVersions"` + ValidNodeVersions []string `json:"validNodeVersions"` } func getExactClusterVersion(cr *v1alpha1.GKECluster, log *logrus.Entry) (string, error) { - cmd := exec.Command("gcloud", "container", "get-server-config", - "--format", "json", - "--zone", cr.Spec.Zone) + cmd := exec.Command("gcloud", "container", "get-server-config", + "--format", "json", + "--zone", cr.Spec.Zone) - out, err := cmd.Output() + out, err := cmd.Output() - if err != nil { - log.Errorf("Could not get server config: %v", err) - return "", err - } + if err != nil { + log.Errorf("Could not get server config: %v", err) + return "", err + } - var config ServerConfig - err = json.Unmarshal(out, &config) + var config ServerConfig + err = json.Unmarshal(out, &config) - if err != nil { - log.Errorf("Could not parse cluster config: %v", err) - return "", err - } + if err != nil { + log.Errorf("Could not parse cluster config: %v", err) + return "", err + } - for _, v := range config.ValidMasterVersions { - if strings.HasPrefix(v, cr.Spec.ClusterVersion) { - return v, nil - } - } + for _, v := range config.ValidMasterVersions { + if strings.HasPrefix(v, cr.Spec.ClusterVersion) { + return v, nil + } + } - return "", fmt.Errorf("Could not find a valid cluster version match for %v", cr.Spec.ClusterVersion) + return "", fmt.Errorf("Could not find a valid cluster version match for %v", cr.Spec.ClusterVersion) } func getRemoteCluster(name string, log *logrus.Entry) (*RemoteCluster, error) { - cmd := exec.Command("gcloud", "container", "clusters", "list", - "--filter", "name="+name, "--format", "json") + cmd := exec.Command("gcloud", "container", "clusters", "list", + "--filter", "name="+name, "--format", "json") - out, err := cmd.Output() + out, err := cmd.Output() - if err != nil { - log.Errorf("Could not list clusters: %v", err) - return nil, err - } + if err != nil { + log.Errorf("Could not list clusters: %v", err) + return nil, err + } - var gkeclusters []RemoteCluster - err = json.Unmarshal(out, &gkeclusters) + var gkeclusters []RemoteCluster + err = json.Unmarshal(out, &gkeclusters) - if err != nil { - log.Errorf("Could not parse cluster list: %v", err) - return nil, err - } + if err != nil { + log.Errorf("Could not parse cluster list: %v", err) + return nil, err + } - if len(gkeclusters) == 0 { - return nil, nil - } + if len(gkeclusters) == 0 { + return nil, nil + } - return &gkeclusters[0], nil + return &gkeclusters[0], nil } func getRemoteClusters(log *logrus.Entry) ([]RemoteCluster, error) { - cmd := exec.Command("gcloud", "container", "clusters", "list", - "--format", "json") + cmd := exec.Command("gcloud", "container", "clusters", "list", + "--format", "json") - out, err := cmd.Output() + out, err := cmd.Output() - if err != nil { - log.Errorf("Could not list clusters: %v", err) - return nil, err - } + if err != nil { + log.Errorf("Could not list clusters: %v", err) + return nil, err + } - var gkeclusters []RemoteCluster - err = json.Unmarshal(out, &gkeclusters) + var gkeclusters []RemoteCluster + err = json.Unmarshal(out, &gkeclusters) - if err != nil { - log.Errorf("Could not parse cluster list: %v", err) - return nil, err - } + if err != nil { + log.Errorf("Could not parse cluster list: %v", err) + return nil, err + } - return gkeclusters, nil + return gkeclusters, nil } func newSecret(cluster *v1alpha1.GKECluster, gke *RemoteCluster) *v1.Secret { - caCrt, _ := b64.StdEncoding.DecodeString(gke.MasterAuth.ClusterCaCertificate) - clientKey, _ := b64.StdEncoding.DecodeString(gke.MasterAuth.ClientKey) - clientCrt, _ := b64.StdEncoding.DecodeString(gke.MasterAuth.ClientCertificate) - - secretName := cluster.ObjectMeta.Labels["service.infrabox.net/secret-name"] - - return &v1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: secretName, - Namespace: cluster.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(cluster, schema.GroupVersionKind{ - Group: v1alpha1.SchemeGroupVersion.Group, - Version: v1alpha1.SchemeGroupVersion.Version, - Kind: "Cluster", - }), - }, - }, - Type: "Opaque", - Data: map[string][]byte{ - "ca.crt": []byte(caCrt), - "client.key": []byte(clientKey), - "client.crt": []byte(clientCrt), - "username": []byte(gke.MasterAuth.Username), - "password": []byte(gke.MasterAuth.Password), - "endpoint": []byte("https://" + gke.Endpoint), - }, - } + caCrt, _ := b64.StdEncoding.DecodeString(gke.MasterAuth.ClusterCaCertificate) + clientKey, _ := b64.StdEncoding.DecodeString(gke.MasterAuth.ClientKey) + clientCrt, _ := b64.StdEncoding.DecodeString(gke.MasterAuth.ClientCertificate) + + secretName := cluster.ObjectMeta.Labels["service.infrabox.net/secret-name"] + + return &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: cluster.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(cluster, schema.GroupVersionKind{ + Group: v1alpha1.SchemeGroupVersion.Group, + Version: v1alpha1.SchemeGroupVersion.Version, + Kind: "Cluster", + }), + }, + }, + Type: "Opaque", + Data: map[string][]byte{ + "ca.crt": []byte(caCrt), + "client.key": []byte(clientKey), + "client.crt": []byte(clientCrt), + "username": []byte(gke.MasterAuth.Username), + "password": []byte(gke.MasterAuth.Password), + "endpoint": []byte("https://" + gke.Endpoint), + }, + } } func doCollectorRequest(cluster *RemoteCluster, log *logrus.Entry, endpoint string) (*[]byte, error) { - caCrt, _ := b64.StdEncoding.DecodeString(cluster.MasterAuth.ClusterCaCertificate) - - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCrt) - - tlsConfig := &tls.Config{ - RootCAs: caCertPool, - } - tlsConfig.BuildNameToCertificate() - transport := &http.Transport{TLSClientConfig: tlsConfig} - client := &http.Client{Transport: transport} - - req, err := http.NewRequest("GET", "https://"+cluster.Endpoint+"/api/v1/namespaces/infrabox-collector/services/infrabox-collector-api:80/proxy"+endpoint, nil) - if err != nil { - log.Errorf("Failed to create new request: %v", err) - return nil, err - } - - req.SetBasicAuth(cluster.MasterAuth.Username, cluster.MasterAuth.Password) - - resp, err := client.Do(req) - if err != nil { - log.Errorf("Failed to GET remote pod list: %v", err) - return nil, err - } - - bodyText, err := ioutil.ReadAll(resp.Body) - if err != nil { - log.Errorf("Failed to read response body: %v", err) - return nil, err - } - - if resp.StatusCode != 200 { - return &bodyText, goerrors.New(string(bodyText)) - } - - return &bodyText, nil + caCrt, _ := b64.StdEncoding.DecodeString(cluster.MasterAuth.ClusterCaCertificate) + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCrt) + + tlsConfig := &tls.Config{ + RootCAs: caCertPool, + } + tlsConfig.BuildNameToCertificate() + transport := &http.Transport{TLSClientConfig: tlsConfig} + client := &http.Client{Transport: transport} + + req, err := http.NewRequest("GET", "https://"+cluster.Endpoint+"/api/v1/namespaces/infrabox-collector/services/infrabox-collector-api:80/proxy"+endpoint, nil) + if err != nil { + log.Errorf("Failed to create new request: %v", err) + return nil, err + } + + req.SetBasicAuth(cluster.MasterAuth.Username, cluster.MasterAuth.Password) + + resp, err := client.Do(req) + if err != nil { + log.Errorf("Failed to GET remote pod list: %v", err) + return nil, err + } + + bodyText, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Errorf("Failed to read response body: %v", err) + return nil, err + } + + if resp.StatusCode != 200 { + return &bodyText, goerrors.New(string(bodyText)) + } + + return &bodyText, nil } func uploadToArchive(cr *v1alpha1.GKECluster, log *logrus.Entry, data *[]byte, filename string) error { - annotations := cr.GetAnnotations() - root_url, _ := annotations["infrabox.net/root-url"] - job_token, _ := annotations["infrabox.net/job-token"] - - body := new(bytes.Buffer) - writer := multipart.NewWriter(body) - part, err := writer.CreateFormFile(filename, filename) - if err != nil { - log.Warningf("Failed to create form file: %v", err) - return err - } - - part.Write(*data) - err = writer.Close() - if err != nil { - log.Warningf("Failed to clise writer: %v", err) - return err - } - - req, err := http.NewRequest("POST", root_url+"/api/job/archive", body) - - if err != nil { - log.Warningf("Failed to create request: %v", err) - return err - } - - req.Header.Set("Content-Type", writer.FormDataContentType()) - req.Header.Set("Authorization", "token "+job_token) - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: tr} - response, err := client.Do(req) - - if err != nil { - log.Warningf("Failed to execute request: %v", err) - return err - } - - bodyText, err := ioutil.ReadAll(response.Body) - - if response.StatusCode != 200 { - return goerrors.New(string(bodyText)) - } - - return nil + annotations := cr.GetAnnotations() + root_url, _ := annotations["infrabox.net/root-url"] + job_token, _ := annotations["infrabox.net/job-token"] + + body := new(bytes.Buffer) + writer := multipart.NewWriter(body) + part, err := writer.CreateFormFile(filename, filename) + if err != nil { + log.Warningf("Failed to create form file: %v", err) + return err + } + + part.Write(*data) + err = writer.Close() + if err != nil { + log.Warningf("Failed to clise writer: %v", err) + return err + } + + req, err := http.NewRequest("POST", root_url+"/api/job/archive", body) + + if err != nil { + log.Warningf("Failed to create request: %v", err) + return err + } + + req.Header.Set("Content-Type", writer.FormDataContentType()) + req.Header.Set("Authorization", "token "+job_token) + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + response, err := client.Do(req) + + if err != nil { + log.Warningf("Failed to execute request: %v", err) + return err + } + + bodyText, err := ioutil.ReadAll(response.Body) + + if response.StatusCode != 200 { + return goerrors.New(string(bodyText)) + } + + return nil } type CollectedPod struct { - NamespaceID string `json:"namespace_id"` - PodID string `json:"pod_id"` - Pod string `json:"pod_name"` - Containers []string `json:"containers"` - Namespace string `json:"namespace_name"` + NamespaceID string `json:"namespace_id"` + PodID string `json:"pod_id"` + Pod string `json:"pod_name"` + Containers []string `json:"containers"` + Namespace string `json:"namespace_name"` } -func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.Entry) { - log.Info("Collecting data from remote cluster") - - annotations := cr.GetAnnotations() - _, ok := annotations["infrabox.net/root-url"] - if !ok { - log.Warning("infrabox.net/root-url not set, not retrieving logs") - return - } - - _, ok = annotations["infrabox.net/job-id"] - if !ok { - log.Warning("infrabox.net/job-id not set, not retrieving logs") - return - } - - _, ok = annotations["infrabox.net/job-token"] - if !ok { - log.Warning("infrabox.net/job-token not set, not retrieving logs") - return - } - - var pods []CollectedPod - data, err := doCollectorRequest(cluster, log, "/api/pods") - - if err != nil { - log.Errorf("Failed to get collected pod list: %v", err) - return - } - - err = json.Unmarshal(*data, &pods) - if err != nil { - log.Errorf("Failed to collected pod list: %v", err) - return - } - - for _, pod := range pods { - for _, container := range pod.Containers { - log.Info("Collecting logs for pod: ", pod.PodID) - data, err := doCollectorRequest(cluster, log, "/api/pods/"+pod.PodID+"/log/"+container) - - if err != nil { - log.Warningf("Failed to get collected pod logs: %v", err) - continue - } - - filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".log" - err = uploadToArchive(cr, log, data, filename) - if err != nil { - log.Warningf("Failed to upload log to archive: %v", err) - continue - } - } - } +func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.Entry, logPath string, done chan error) { + log.Infof("Collecting data from GKE cluster %s", cluster.Name) + defer close(done) + + annotations := cr.GetAnnotations() + _, ok := annotations["infrabox.net/root-url"] + if !ok { + log.Warning("infrabox.net/root-url not set, not retrieving logs") + return + } + + _, ok = annotations["infrabox.net/job-id"] + if !ok { + log.Warning("infrabox.net/job-id not set, not retrieving logs") + return + } + + _, ok = annotations["infrabox.net/job-token"] + if !ok { + log.Warning("infrabox.net/job-token not set, not retrieving logs") + return + } + + var pods []CollectedPod + data, err := doCollectorRequest(cluster, log, "/api/pods") + + if err != nil { + log.Errorf("Failed to get collected pod list: %v", err) + return + } + + err = json.Unmarshal(*data, &pods) + if err != nil { + log.Errorf("Failed to collected pod list: %v", err) + return + } + + for _, pod := range pods { + for _, container := range pod.Containers { + log.Debug("Collecting logs for pod: ", pod.PodID) + data, err := doCollectorRequest(cluster, log, "/api/pods/"+pod.PodID+"/log/"+container) + + if err != nil { + log.Warningf("Failed to get collected pod logs: %v", err) + continue + } + + filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt" + filename = path.Join(logPath, filename) + if err := ioutil.WriteFile(filename, *data, os.ModePerm); err != nil { + log.Debugf("Failed to write pod logs: %v", err) + continue + } + } + } + + archivePath := path.Join(logPath, "pods_log.zip") + err = archiver.Archive([]string{logPath}, archivePath) + if err != nil { + log.Debugf("Failed to archive log: %v", err) + return + } + + archiveData, err := ioutil.ReadFile(archivePath) + if err != nil { + log.Debugf("Failed to archive log: %v", err) + return + } + err = uploadToArchive(cr, log, &archiveData, archivePath) + if err != nil { + log.Warningf("Failed to upload log to archive: %v", err) + } } func injectCollector(cluster *RemoteCluster, log *logrus.Entry) error { - client, err := newRemoteClusterSDK(cluster) - - if err != nil { - log.Errorf("Failed to create remote cluster client: %v", err) - return err - } - - err = client.Create(newCollectorNamespace(), log) - if err != nil && !errors.IsAlreadyExists(err) { - log.Errorf("Failed to create collector deployment: %v", err) - return err - } - - err = client.Create(newCollectorCRB(), log) - if err != nil && !errors.IsAlreadyExists(err) { - log.Errorf("Failed to create collector crb: %v", err) - return err - } - - err = client.Create(newCollectorDeployment(), log) - if err != nil && !errors.IsAlreadyExists(err) { - log.Errorf("Failed to create collector deployment: %v", err) - return err - } - - err = client.Create(newCollectorService(), log) - if err != nil && !errors.IsAlreadyExists(err) { - log.Errorf("Failed to create collector service: %v", err) - return err - } - - err = client.Create(newFluentbitConfigMap(), log) - if err != nil && !errors.IsAlreadyExists(err) { - log.Errorf("Failed to create collector fluentbit config map: %v", err) - return err - } - - err = client.Create(newCollectorDaemonSet(), log) - if err != nil && !errors.IsAlreadyExists(err) { - log.Errorf("Failed to create collector daemon set: %v", err) - return err - } - - return nil + client, err := newRemoteClusterSDK(cluster) + + if err != nil { + log.Errorf("Failed to create remote cluster client: %v", err) + return err + } + + err = client.Create(newCollectorNamespace(), log) + if err != nil && !errors.IsAlreadyExists(err) { + log.Errorf("Failed to create collector deployment: %v", err) + return err + } + + err = client.Create(newCollectorCRB(), log) + if err != nil && !errors.IsAlreadyExists(err) { + log.Errorf("Failed to create collector crb: %v", err) + return err + } + + err = client.Create(newCollectorDeployment(), log) + if err != nil && !errors.IsAlreadyExists(err) { + log.Errorf("Failed to create collector deployment: %v", err) + return err + } + + err = client.Create(newCollectorService(), log) + if err != nil && !errors.IsAlreadyExists(err) { + log.Errorf("Failed to create collector service: %v", err) + return err + } + + err = client.Create(newFluentbitConfigMap(), log) + if err != nil && !errors.IsAlreadyExists(err) { + log.Errorf("Failed to create collector fluentbit config map: %v", err) + return err + } + + err = client.Create(newCollectorDaemonSet(), log) + if err != nil && !errors.IsAlreadyExists(err) { + log.Errorf("Failed to create collector daemon set: %v", err) + return err + } + + return nil } type RemoteClusterSDK struct { - kubeConfig *rest.Config - cluster *RemoteCluster - clientPool dynamic.ClientPool - restMapper *discovery.DeferredDiscoveryRESTMapper + kubeConfig *rest.Config + cluster *RemoteCluster + clientPool dynamic.ClientPool + restMapper *discovery.DeferredDiscoveryRESTMapper } func (r *RemoteClusterSDK) Create(object types.Object, log *logrus.Entry) (err error) { - _, namespace, err := k8sutil.GetNameAndNamespace(object) - - if err != nil { - log.Errorf("Failed to get namespace: %v", err) - return err - } - - gvk := object.GetObjectKind().GroupVersionKind() - apiVersion, kind := gvk.ToAPIVersionAndKind() - - resourceClient, _, err := r.getRemoteResourceClient(apiVersion, kind, namespace) - if err != nil { - return fmt.Errorf("failed to get resource client: %v", err) - } - - unstructObj := k8sutil.UnstructuredFromRuntimeObject(object) - unstructObj, err = resourceClient.Create(unstructObj) - if err != nil { - log.Errorf("Failed to create object: %v", err) - return err - } - - // Update the arg object with the result - err = k8sutil.UnstructuredIntoRuntimeObject(unstructObj, object) - if err != nil { - return fmt.Errorf("failed to unmarshal the retrieved data: %v", err) - } - - return nil + _, namespace, err := k8sutil.GetNameAndNamespace(object) + + if err != nil { + log.Errorf("Failed to get namespace: %v", err) + return err + } + + gvk := object.GetObjectKind().GroupVersionKind() + apiVersion, kind := gvk.ToAPIVersionAndKind() + + resourceClient, _, err := r.getRemoteResourceClient(apiVersion, kind, namespace) + if err != nil { + return fmt.Errorf("failed to get resource client: %v", err) + } + + unstructObj := k8sutil.UnstructuredFromRuntimeObject(object) + unstructObj, err = resourceClient.Create(unstructObj) + if err != nil { + log.Errorf("Failed to create object: %v", err) + return err + } + + // Update the arg object with the result + err = k8sutil.UnstructuredIntoRuntimeObject(unstructObj, object) + if err != nil { + return fmt.Errorf("failed to unmarshal the retrieved data: %v", err) + } + + return nil } func newRemoteClusterSDK(cluster *RemoteCluster) (*RemoteClusterSDK, error) { - caCrt, err := b64.StdEncoding.DecodeString(cluster.MasterAuth.ClusterCaCertificate) - clientKey, _ := b64.StdEncoding.DecodeString(cluster.MasterAuth.ClientKey) - clientCrt, _ := b64.StdEncoding.DecodeString(cluster.MasterAuth.ClientCertificate) - - if err != nil { - return nil, err - } - - tlsClientConfig := rest.TLSClientConfig{} - tlsClientConfig.CAData = caCrt - tlsClientConfig.CertData = clientCrt - tlsClientConfig.KeyData = clientKey - - kubeConfig := &rest.Config{ - Host: cluster.Endpoint, - TLSClientConfig: tlsClientConfig, - Username: cluster.MasterAuth.Username, - Password: cluster.MasterAuth.Password, - } - - kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) - - cachedDiscoveryClient := cached.NewMemCacheClient(kubeClient.Discovery()) - restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient, meta.InterfacesForUnstructured) - restMapper.Reset() - kubeConfig.ContentConfig = dynamic.ContentConfig() - clientPool := dynamic.NewClientPool(kubeConfig, restMapper, dynamic.LegacyAPIPathResolverFunc) - - return &RemoteClusterSDK{ - kubeConfig: kubeConfig, - clientPool: clientPool, - cluster: cluster, - restMapper: restMapper, - }, nil + caCrt, err := b64.StdEncoding.DecodeString(cluster.MasterAuth.ClusterCaCertificate) + clientKey, _ := b64.StdEncoding.DecodeString(cluster.MasterAuth.ClientKey) + clientCrt, _ := b64.StdEncoding.DecodeString(cluster.MasterAuth.ClientCertificate) + + if err != nil { + return nil, err + } + + tlsClientConfig := rest.TLSClientConfig{} + tlsClientConfig.CAData = caCrt + tlsClientConfig.CertData = clientCrt + tlsClientConfig.KeyData = clientKey + + kubeConfig := &rest.Config{ + Host: cluster.Endpoint, + TLSClientConfig: tlsClientConfig, + Username: cluster.MasterAuth.Username, + Password: cluster.MasterAuth.Password, + } + + kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) + + cachedDiscoveryClient := cached.NewMemCacheClient(kubeClient.Discovery()) + restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient, meta.InterfacesForUnstructured) + restMapper.Reset() + kubeConfig.ContentConfig = dynamic.ContentConfig() + clientPool := dynamic.NewClientPool(kubeConfig, restMapper, dynamic.LegacyAPIPathResolverFunc) + + return &RemoteClusterSDK{ + kubeConfig: kubeConfig, + clientPool: clientPool, + cluster: cluster, + restMapper: restMapper, + }, nil } func apiResource(gvk schema.GroupVersionKind, restMapper *discovery.DeferredDiscoveryRESTMapper) (*metav1.APIResource, error) { - mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - return nil, fmt.Errorf("failed to get the resource REST mapping for GroupVersionKind(%s): %v", gvk.String(), err) - } - resource := &metav1.APIResource{ - Name: mapping.Resource, - Namespaced: mapping.Scope == meta.RESTScopeNamespace, - Kind: gvk.Kind, - } - return resource, nil + mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return nil, fmt.Errorf("failed to get the resource REST mapping for GroupVersionKind(%s): %v", gvk.String(), err) + } + resource := &metav1.APIResource{ + Name: mapping.Resource, + Namespaced: mapping.Scope == meta.RESTScopeNamespace, + Kind: gvk.Kind, + } + return resource, nil } func (r *RemoteClusterSDK) getRemoteResourceClient(apiVersion, kind, namespace string) (dynamic.ResourceInterface, string, error) { - gv, err := schema.ParseGroupVersion(apiVersion) - if err != nil { - return nil, "", fmt.Errorf("failed to parse apiVersion: %v", err) - } - - gvk := schema.GroupVersionKind{ - Group: gv.Group, - Version: gv.Version, - Kind: kind, - } - - client, err := r.clientPool.ClientForGroupVersionKind(gvk) - if err != nil { - return nil, "", fmt.Errorf("failed to get client for GroupVersionKind(%s): %v", gvk.String(), err) - } - resource, err := apiResource(gvk, r.restMapper) - if err != nil { - return nil, "", fmt.Errorf("failed to get resource type: %v", err) - } - pluralName := resource.Name - resourceClient := client.Resource(resource, namespace) - return resourceClient, pluralName, nil + gv, err := schema.ParseGroupVersion(apiVersion) + if err != nil { + return nil, "", fmt.Errorf("failed to parse apiVersion: %v", err) + } + + gvk := schema.GroupVersionKind{ + Group: gv.Group, + Version: gv.Version, + Kind: kind, + } + + client, err := r.clientPool.ClientForGroupVersionKind(gvk) + if err != nil { + return nil, "", fmt.Errorf("failed to get client for GroupVersionKind(%s): %v", gvk.String(), err) + } + resource, err := apiResource(gvk, r.restMapper) + if err != nil { + return nil, "", fmt.Errorf("failed to get resource type: %v", err) + } + pluralName := resource.Name + resourceClient := client.Resource(resource, namespace) + return resourceClient, pluralName, nil } func newCollectorNamespace() *v1.Namespace { - return &v1.Namespace{ - TypeMeta: metav1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "infrabox-collector", - }, - } + return &v1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "infrabox-collector", + }, + } } func newCollectorCRB() *rbacv1.ClusterRoleBinding { - return &rbacv1.ClusterRoleBinding{ - TypeMeta: metav1.TypeMeta{ - Kind: "ClusterRoleBinding", - APIVersion: "rbac.authorization.k8s.io/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "infrabox-collector-crb", - Namespace: "infrabox-collector", - }, - Subjects: []rbacv1.Subject{{ - Kind: "ServiceAccount", - Name: "default", - Namespace: "infrabox-collector", - }}, - RoleRef: rbacv1.RoleRef{ - Kind: "ClusterRole", - Name: "cluster-admin", - APIGroup: "rbac.authorization.k8s.io", - }, - } + return &rbacv1.ClusterRoleBinding{ + TypeMeta: metav1.TypeMeta{ + Kind: "ClusterRoleBinding", + APIVersion: "rbac.authorization.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "infrabox-collector-crb", + Namespace: "infrabox-collector", + }, + Subjects: []rbacv1.Subject{{ + Kind: "ServiceAccount", + Name: "default", + Namespace: "infrabox-collector", + }}, + RoleRef: rbacv1.RoleRef{ + Kind: "ClusterRole", + Name: "cluster-admin", + APIGroup: "rbac.authorization.k8s.io", + }, + } } func newCollectorService() *v1.Service { - return &v1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "infrabox-collector-api", - Namespace: "infrabox-collector", - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{{ - Name: "http", - Port: 80, - TargetPort: intstr.FromInt(8080), - }}, - Selector: map[string]string{ - "app": "api.collector.infrabox.net", - }, - }, - } + return &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "infrabox-collector-api", + Namespace: "infrabox-collector", + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{ + Name: "http", + Port: 80, + TargetPort: intstr.FromInt(8080), + }}, + Selector: map[string]string{ + "app": "api.collector.infrabox.net", + }, + }, + } } func newCollectorDeployment() *appsv1.Deployment { - var replicas int32 = 1 - return &appsv1.Deployment{ - TypeMeta: metav1.TypeMeta{ - Kind: "Deployment", - APIVersion: "extensions/v1beta1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "infrabox-collector-api", - Namespace: "infrabox-collector", - }, - Spec: appsv1.DeploymentSpec{ - Replicas: &replicas, - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": "api.collector.infrabox.net", - }, - }, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app": "api.collector.infrabox.net", - }, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{{ - Name: "api", - Image: "quay.io/infrabox/collector-api", - }}, - }, - }, - }, - } + var replicas int32 = 1 + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "extensions/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "infrabox-collector-api", + Namespace: "infrabox-collector", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "api.collector.infrabox.net", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "api.collector.infrabox.net", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "api", + Image: "quay.io/infrabox/collector-api", + }}, + }, + }, + }, + } } func newFluentbitConfigMap() *v1.ConfigMap { - return &v1.ConfigMap{ - TypeMeta: metav1.TypeMeta{ - Kind: "ConfigMap", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "infrabox-fluent-bit", - Namespace: "infrabox-collector", - }, - Data: map[string]string{ - "parsers.conf": ` + return &v1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "infrabox-fluent-bit", + Namespace: "infrabox-collector", + }, + Data: map[string]string{ + "parsers.conf": ` [PARSER] Name docker_utf8 Format json @@ -955,7 +1046,7 @@ func newFluentbitConfigMap() *v1.ConfigMap { Decode_Field_as escaped_utf8 log do_next Decode_Field_as escaped log `, - "fluent-bit.conf": ` + "fluent-bit.conf": ` [SERVICE] Flush 2 Daemon Off @@ -983,83 +1074,83 @@ func newFluentbitConfigMap() *v1.ConfigMap { URI /api/log Format json `, - }, - } + }, + } } func newCollectorDaemonSet() *appsv1.DaemonSet { - return &appsv1.DaemonSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "DaemonSet", - APIVersion: "extensions/v1beta1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "infrabox-collector-fluent-bit", - Namespace: "infrabox-collector", - }, - Spec: appsv1.DaemonSetSpec{ - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app": "fluentbit.collector.infrabox.net", - }, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{{ - Name: "fluent-bit", - Image: "fluent/fluent-bit:0.13", - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - "memory": resource.MustParse("100Mi"), - }, - Requests: v1.ResourceList{ - "cpu": resource.MustParse("100m"), - "memory": resource.MustParse("100Mi"), - }, - }, - VolumeMounts: []v1.VolumeMount{{ - Name: "varlog", - MountPath: "/var/log", - }, { - Name: "varlibdockercontainers", - MountPath: "/var/lib/docker/containers", - ReadOnly: true, - }, { - Name: "config", - MountPath: "/fluent-bit/etc/parsers.conf", - SubPath: "parsers.conf", - }, { - Name: "config", - MountPath: "/fluent-bit/etc/fluent-bit.conf", - SubPath: "fluent-bit.conf", - }}, - }}, - Volumes: []v1.Volume{{ - Name: "varlibdockercontainers", - VolumeSource: v1.VolumeSource{ - HostPath: &v1.HostPathVolumeSource{ - Path: "/var/lib/docker/containers", - }, - }, - }, { - Name: "varlog", - VolumeSource: v1.VolumeSource{ - HostPath: &v1.HostPathVolumeSource{ - Path: "/var/log", - }, - }, - }, { - Name: "config", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "infrabox-fluent-bit", - }, - }, - }, - }}, - }, - }, - }, - } + return &appsv1.DaemonSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + APIVersion: "extensions/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "infrabox-collector-fluent-bit", + Namespace: "infrabox-collector", + }, + Spec: appsv1.DaemonSetSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "fluentbit.collector.infrabox.net", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "fluent-bit", + Image: "fluent/fluent-bit:0.13", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "memory": resource.MustParse("100Mi"), + }, + Requests: v1.ResourceList{ + "cpu": resource.MustParse("100m"), + "memory": resource.MustParse("100Mi"), + }, + }, + VolumeMounts: []v1.VolumeMount{{ + Name: "varlog", + MountPath: "/var/log", + }, { + Name: "varlibdockercontainers", + MountPath: "/var/lib/docker/containers", + ReadOnly: true, + }, { + Name: "config", + MountPath: "/fluent-bit/etc/parsers.conf", + SubPath: "parsers.conf", + }, { + Name: "config", + MountPath: "/fluent-bit/etc/fluent-bit.conf", + SubPath: "fluent-bit.conf", + }}, + }}, + Volumes: []v1.Volume{{ + Name: "varlibdockercontainers", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/var/lib/docker/containers", + }, + }, + }, { + Name: "varlog", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/var/log", + }, + }, + }, { + Name: "config", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "infrabox-fluent-bit", + }, + }, + }, + }}, + }, + }, + }, + } }