diff --git a/ib.py b/ib.py index cbc99010d..bd706691a 100755 --- a/ib.py +++ b/ib.py @@ -20,7 +20,7 @@ {'name': 'gerrit-review', 'depends_on': ['images-base']}, {'name': 'github-trigger', 'depends_on': ['images-base']}, {'name': 'github-review', 'depends_on': ['images-base']}, - {'name': 'collector-api'}, + {'name': 'collector-api', 'depends_on': ['images-base']}, {'name': 'job'}, {'name': 'opa'}, {'name': 'gc', 'depends_on': ['images-base']}, diff --git a/src/api/handlers/job_api.py b/src/api/handlers/job_api.py index dc18713ec..4fb4feb2d 100644 --- a/src/api/handlers/job_api.py +++ b/src/api/handlers/job_api.py @@ -480,6 +480,16 @@ class Archive(Resource): def post(self): job_id = g.token['job']['id'] + j = g.db.execute_one_dict(''' + SELECT id + FROM job + WHERE id = %s + AND (state = 'running' OR end_date > NOW() - INTERVAL '5 minutes') + ''', [job_id]) + + if not j: + abort(401, 'Unauthorized') + for f in request.files: stream = request.files[f].stream key = '%s/%s' % (job_id, f) diff --git a/src/api/handlers/projects/jobs.py b/src/api/handlers/projects/jobs.py index 716435ed7..8f88f4281 100644 --- a/src/api/handlers/projects/jobs.py +++ b/src/api/handlers/projects/jobs.py @@ -307,15 +307,11 @@ def get(self, project_id, job_id): # First restart j['name'] = j['name'] + '.1' - logger.error(json.dumps(old_id_job, indent=4)) - for j in jobs: for dep in j['dependencies']: if dep['job-id'] in old_id_job: dep['job'] = old_id_job[dep['job-id']]['name'] dep['job-id'] = old_id_job[dep['job-id']]['id'] - else: - logger.error('%s not found', dep['job']) for j in jobs: g.db.execute(''' diff --git a/src/collector-api/server.py b/src/collector-api/server.py index e6f102e66..76fa9b9e4 100644 --- a/src/collector-api/server.py +++ b/src/collector-api/server.py @@ -26,6 +26,9 @@ def get(self): return {'status': 200} def handle_entry(entry): + if 'kubernetes' not in entry: + return + e = entry['kubernetes'] pod_path = os.path.join(storage_path, e['pod_id']) @@ -35,11 +38,9 @@ def handle_entry(entry): metadata_path = os.path.join(pod_path, "metadata.json") log_path = os.path.join(pod_path, e['container_name'] +".log") - if not os.path.exists(metadata_path): with open(metadata_path, 'w+') as metadata_file: md = { - 'namespace_id': e['namespace_id'], 'namespace_name': e['namespace_name'], 'pod_id': e['pod_id'], 'pod_name': e['pod_name'], @@ -58,7 +59,9 @@ def handle_entry(entry): if 'log' in entry: with open(log_path, 'a+') as log_file: - log_file.write(entry['log']) + log = entry['log'] + log = log.replace('\x00', '\n') + log_file.write(log) @api.route('/api/log') class Console(Resource): @@ -114,6 +117,9 @@ def get(self, pod_id, container_name): def main(): # pragma: no cover app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 * 4 + if not os.path.exists(storage_path): + os.makedirs(storage_path) + port = int(os.environ.get('INFRABOX_PORT', 8080)) logger.info('Starting Server on port %s', port) app.run(host='0.0.0.0', port=port) diff --git a/src/dashboard-client/src/models/Job.js b/src/dashboard-client/src/models/Job.js index 7945875e8..438e611c8 100644 --- a/src/dashboard-client/src/models/Job.js +++ b/src/dashboard-client/src/models/Job.js @@ -3,6 +3,7 @@ import Notification from '../models/Notification' import NotificationService from '../services/NotificationService' import NewAPIService from '../services/NewAPIService' import store from '../store' +import router from '../router' const Convert = require('ansi-to-html') class Section { diff --git a/src/openpolicyagent/policies/job.rego b/src/openpolicyagent/policies/job.rego index 13476db97..20648f200 100644 --- a/src/openpolicyagent/policies/job.rego +++ b/src/openpolicyagent/policies/job.rego @@ -57,8 +57,15 @@ allow { allow { api.method = "POST" api.path = ["api", "job", suffix] - job_suffix := {"cache", "archive", "output", "create_jobs", "consoleupdate", "stats", "markup", "badge", "testresult"} + job_suffix := {"cache", "output", "create_jobs", "consoleupdate", "stats", "markup", "badge", "testresult"} suffix = job_suffix[_] api.token.type = "job" api.token.job.state = job_state[_] -} \ No newline at end of file +} + +# Allow POST access to /api/job/archive for valid job tokens (for service uploads) +allow { + api.method = "POST" + api.path = ["api", "job", "archive"] + api.token.type = "job" +} diff --git a/src/services/aks/pkg/controller/akscluster/akscluster_controller.go b/src/services/aks/pkg/controller/akscluster/akscluster_controller.go index 3e481636e..5cad2505b 100644 --- a/src/services/aks/pkg/controller/akscluster/akscluster_controller.go +++ b/src/services/aks/pkg/controller/akscluster/akscluster_controller.go @@ -514,8 +514,6 @@ func retrieveLogs(cr *v1alpha1.AKSCluster, cluster *RemoteCluster, log *logrus.E return } - log.Info(string(*data)) - err = json.Unmarshal(*data, &pods) if err != nil { log.Errorf("Failed to collected pod list: %v", err) @@ -532,7 +530,7 @@ func retrieveLogs(cr *v1alpha1.AKSCluster, cluster *RemoteCluster, log *logrus.E continue } - filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + pod.PodID + ".txt" + filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt" err = uploadToArchive(cr, log, data, filename) if err != nil { log.Warningf("Failed to upload log to archive: %v", err) @@ -568,6 +566,12 @@ func injectCollector(cluster *RemoteCluster, log *logrus.Entry) error { return err } + err = kubectlApply(cluster, newFluentbitConfigMap(), log) + if err != nil { + log.Errorf("Failed to create fluent bit config map: %v", err) + return err + } + err = kubectlApply(cluster, newCollectorDaemonSet(), log) if err != nil { log.Errorf("Failed to create collector daemon set: %v", err) @@ -691,6 +695,59 @@ func newCollectorDeployment() *appsv1.Deployment { } } +func newFluentbitConfigMap() *v1.ConfigMap{ + return &v1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "infrabox-fluent-bit", + Namespace: "infrabox-collector", + }, + Data: map[string]string { + "parsers.conf": ` +[PARSER] + Name docker_utf8 + Format json + Time_Key time + Time_Format %Y-%m-%dT%H:%M:%S.%L + Time_Keep On + Decode_Field_as escaped_utf8 log do_next + Decode_Field_as escaped log +`, + "fluent-bit.conf": ` +[SERVICE] + Flush 2 + Daemon Off + Log_Level info + Parsers_File parsers.conf +[INPUT] + Name tail + Path /var/log/containers/*.log + Parser docker_utf8 + Tag kube.* + Refresh_Interval 2 + Mem_Buf_Limit 50MB + Skip_Long_Lines On +[FILTER] + Name kubernetes + Match kube.* + Kube_URL https://kubernetes.default.svc.cluster.local:443 + Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token +[OUTPUT] + Name http + Match * + Host infrabox-collector-api.infrabox-collector + Port 80 + URI /api/log + Format json +`, + }, + } +} + func newCollectorDaemonSet() *appsv1.DaemonSet { return &appsv1.DaemonSet{ TypeMeta: metav1.TypeMeta{ @@ -698,23 +755,23 @@ func newCollectorDaemonSet() *appsv1.DaemonSet { APIVersion: "extensions/v1beta1", }, ObjectMeta: metav1.ObjectMeta{ - Name: "infrabox-collector-fluentd", + Name: "infrabox-collector-fluent-bit", Namespace: "infrabox-collector", }, Spec: appsv1.DaemonSetSpec{ Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": "fluentd.collector.infrabox.net", + "app": "fluentbit.collector.infrabox.net", }, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ - Name: "fluentd", - Image: "quay.io/infrabox/collector-fluentd", + Name: "fluent-bit", + Image: "fluent/fluent-bit:0.13", Resources: v1.ResourceRequirements{ Limits: v1.ResourceList{ - "memory": resource.MustParse("200Mi"), + "memory": resource.MustParse("100Mi"), }, Requests: v1.ResourceList{ "cpu": resource.MustParse("100m"), @@ -728,10 +785,14 @@ func newCollectorDaemonSet() *appsv1.DaemonSet { Name: "varlibdockercontainers", MountPath: "/var/lib/docker/containers", ReadOnly: true, - }}, - Env: []v1.EnvVar{{ - Name: "INFRABOX_COLLECTOR_ENDPOINT", - Value: "http://infrabox-collector-api.infrabox-collector/api/log", + }, { + Name: "config", + MountPath: "/fluent-bit/etc/parsers.conf", + SubPath: "parsers.conf", + }, { + Name: "config", + MountPath: "/fluent-bit/etc/fluent-bit.conf", + SubPath: "fluent-bit.conf", }}, }}, Volumes: []v1.Volume{{ @@ -748,11 +809,18 @@ func newCollectorDaemonSet() *appsv1.DaemonSet { Path: "/var/log", }, }, + }, { + Name: "config", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "infrabox-fluent-bit", + }, + }, + }, }}, }, }, }, } } - -// newPodForCR returns a busybox pod with the same name/namespace as the cr diff --git a/src/services/gardener/pkg/stub/handler.go b/src/services/gardener/pkg/stub/handler.go index 8859dfd1c..fd1938150 100644 --- a/src/services/gardener/pkg/stub/handler.go +++ b/src/services/gardener/pkg/stub/handler.go @@ -408,8 +408,6 @@ func retrieveLogs(cr *v1alpha1.ShootCluster, cluster *RemoteCluster, log *logrus return } - log.Info(string(*data)) - err = json.Unmarshal(*data, &pods) if err != nil { log.Errorf("Failed to collected pod list: %v", err) @@ -426,7 +424,7 @@ func retrieveLogs(cr *v1alpha1.ShootCluster, cluster *RemoteCluster, log *logrus continue } - filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + pod.PodID + ".txt" + filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt" err = uploadToArchive(cr, log, data, filename) if err != nil { log.Warningf("Failed to upload log to archive: %v", err) @@ -469,6 +467,12 @@ func injectCollector(cluster *RemoteCluster, log *logrus.Entry) error { return err } + err = client.Create(newFluentbitConfigMap(), log) + if err != nil && !errors.IsAlreadyExists(err) { + log.Errorf("Failed to create collector fluentbit config map: %v", err) + return err + } + err = client.Create(newCollectorDaemonSet(), log) if err != nil && !errors.IsAlreadyExists(err) { log.Errorf("Failed to create collector daemon set: %v", err) @@ -478,6 +482,59 @@ func injectCollector(cluster *RemoteCluster, log *logrus.Entry) error { return nil } +func newFluentbitConfigMap() *v1.ConfigMap{ + return &v1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "infrabox-fluent-bit", + Namespace: "infrabox-collector", + }, + Data: map[string]string { + "parsers.conf": ` +[PARSER] + Name docker_utf8 + Format json + Time_Key time + Time_Format %Y-%m-%dT%H:%M:%S.%L + Time_Keep On + Decode_Field_as escaped_utf8 log do_next + Decode_Field_as escaped log +`, + "fluent-bit.conf": ` +[SERVICE] + Flush 2 + Daemon Off + Log_Level info + Parsers_File parsers.conf +[INPUT] + Name tail + Path /var/log/containers/*.log + Parser docker_utf8 + Tag kube.* + Refresh_Interval 2 + Mem_Buf_Limit 50MB + Skip_Long_Lines On +[FILTER] + Name kubernetes + Match kube.* + Kube_URL https://kubernetes.default.svc.cluster.local:443 + Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token +[OUTPUT] + Name http + Match * + Host infrabox-collector-api.infrabox-collector + Port 80 + URI /api/log + Format json +`, + }, + } +} + func newCollectorNamespace() *v1.Namespace { return &v1.Namespace{ TypeMeta: metav1.TypeMeta{ @@ -578,23 +635,23 @@ func newCollectorDaemonSet() *appsv1.DaemonSet { APIVersion: "extensions/v1beta1", }, ObjectMeta: metav1.ObjectMeta{ - Name: "infrabox-collector-fluentd", + Name: "infrabox-collector-fluent-bit", Namespace: "infrabox-collector", }, Spec: appsv1.DaemonSetSpec{ Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": "fluentd.collector.infrabox.net", + "app": "fluentbit.collector.infrabox.net", }, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ - Name: "fluentd", - Image: "quay.io/infrabox/collector-fluentd", + Name: "fluent-bit", + Image: "fluent/fluent-bit:0.13", Resources: v1.ResourceRequirements{ Limits: v1.ResourceList{ - "memory": resource.MustParse("200Mi"), + "memory": resource.MustParse("100Mi"), }, Requests: v1.ResourceList{ "cpu": resource.MustParse("100m"), @@ -608,10 +665,14 @@ func newCollectorDaemonSet() *appsv1.DaemonSet { Name: "varlibdockercontainers", MountPath: "/var/lib/docker/containers", ReadOnly: true, - }}, - Env: []v1.EnvVar{{ - Name: "INFRABOX_COLLECTOR_ENDPOINT", - Value: "http://infrabox-collector-api.infrabox-collector/api/log", + }, { + Name: "config", + MountPath: "/fluent-bit/etc/parsers.conf", + SubPath: "parsers.conf", + }, { + Name: "config", + MountPath: "/fluent-bit/etc/fluent-bit.conf", + SubPath: "fluent-bit.conf", }}, }}, Volumes: []v1.Volume{{ @@ -628,6 +689,15 @@ func newCollectorDaemonSet() *appsv1.DaemonSet { Path: "/var/log", }, }, + }, { + Name: "config", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "infrabox-fluent-bit", + }, + }, + }, }}, }, }, diff --git a/src/services/gcp/pkg/stub/handler.go b/src/services/gcp/pkg/stub/handler.go index f304e63c2..50623e375 100644 --- a/src/services/gcp/pkg/stub/handler.go +++ b/src/services/gcp/pkg/stub/handler.go @@ -16,6 +16,8 @@ import ( "strconv" "strings" + goerrors "errors" + "k8s.io/client-go/discovery" "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" @@ -425,6 +427,10 @@ func doCollectorRequest(cluster *RemoteCluster, log *logrus.Entry, endpoint stri return nil, err } + if resp.StatusCode != 200 { + return &bodyText, goerrors.New(string(bodyText)) + } + return &bodyText, nil } @@ -469,7 +475,10 @@ func uploadToArchive(cr *v1alpha1.GKECluster, log *logrus.Entry, data *[]byte, f } bodyText, err := ioutil.ReadAll(response.Body) - log.Info(string(bodyText)) + + if response.StatusCode != 200 { + return goerrors.New(string(bodyText)) + } return nil } @@ -512,8 +521,6 @@ func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.E return } - log.Info(string(*data)) - err = json.Unmarshal(*data, &pods) if err != nil { log.Errorf("Failed to collected pod list: %v", err) @@ -530,7 +537,7 @@ func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.E continue } - filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + pod.PodID + ".txt" + filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt" err = uploadToArchive(cr, log, data, filename) if err != nil { log.Warningf("Failed to upload log to archive: %v", err) @@ -572,6 +579,12 @@ func injectCollector(cluster *RemoteCluster, log *logrus.Entry) error { return err } + err = client.Create(newFluentbitConfigMap(), log) + if err != nil && !errors.IsAlreadyExists(err) { + log.Errorf("Failed to create collector fluentbit config map: %v", err) + return err + } + err = client.Create(newCollectorDaemonSet(), log) if err != nil && !errors.IsAlreadyExists(err) { log.Errorf("Failed to create collector daemon set: %v", err) @@ -780,7 +793,7 @@ func newCollectorDeployment() *appsv1.Deployment { Spec: v1.PodSpec{ Containers: []v1.Container{{ Name: "api", - Image: "quay.io/infrabox/collector-api", + Image: "quay.io/infrabox/collector-api", }}, }, }, @@ -788,6 +801,59 @@ func newCollectorDeployment() *appsv1.Deployment { } } +func newFluentbitConfigMap() *v1.ConfigMap{ + return &v1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "infrabox-fluent-bit", + Namespace: "infrabox-collector", + }, + Data: map[string]string { + "parsers.conf": ` +[PARSER] + Name docker_utf8 + Format json + Time_Key time + Time_Format %Y-%m-%dT%H:%M:%S.%L + Time_Keep On + Decode_Field_as escaped_utf8 log do_next + Decode_Field_as escaped log +`, + "fluent-bit.conf": ` +[SERVICE] + Flush 2 + Daemon Off + Log_Level info + Parsers_File parsers.conf +[INPUT] + Name tail + Path /var/log/containers/*.log + Parser docker_utf8 + Tag kube.* + Refresh_Interval 2 + Mem_Buf_Limit 50MB + Skip_Long_Lines On +[FILTER] + Name kubernetes + Match kube.* + Kube_URL https://kubernetes.default.svc.cluster.local:443 + Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token +[OUTPUT] + Name http + Match * + Host infrabox-collector-api.infrabox-collector + Port 80 + URI /api/log + Format json +`, + }, + } +} + func newCollectorDaemonSet() *appsv1.DaemonSet { return &appsv1.DaemonSet{ TypeMeta: metav1.TypeMeta{ @@ -795,23 +861,23 @@ func newCollectorDaemonSet() *appsv1.DaemonSet { APIVersion: "extensions/v1beta1", }, ObjectMeta: metav1.ObjectMeta{ - Name: "infrabox-collector-fluentd", + Name: "infrabox-collector-fluent-bit", Namespace: "infrabox-collector", }, Spec: appsv1.DaemonSetSpec{ Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": "fluentd.collector.infrabox.net", + "app": "fluentbit.collector.infrabox.net", }, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ - Name: "fluentd", - Image: "quay.io/infrabox/collector-fluentd", + Name: "fluent-bit", + Image: "fluent/fluent-bit:0.13", Resources: v1.ResourceRequirements{ Limits: v1.ResourceList{ - "memory": resource.MustParse("200Mi"), + "memory": resource.MustParse("100Mi"), }, Requests: v1.ResourceList{ "cpu": resource.MustParse("100m"), @@ -825,10 +891,14 @@ func newCollectorDaemonSet() *appsv1.DaemonSet { Name: "varlibdockercontainers", MountPath: "/var/lib/docker/containers", ReadOnly: true, - }}, - Env: []v1.EnvVar{{ - Name: "INFRABOX_COLLECTOR_ENDPOINT", - Value: "http://infrabox-collector-api.infrabox-collector/api/log", + }, { + Name: "config", + MountPath: "/fluent-bit/etc/parsers.conf", + SubPath: "parsers.conf", + }, { + Name: "config", + MountPath: "/fluent-bit/etc/fluent-bit.conf", + SubPath: "fluent-bit.conf", }}, }}, Volumes: []v1.Volume{{ @@ -845,6 +915,15 @@ func newCollectorDaemonSet() *appsv1.DaemonSet { Path: "/var/log", }, }, + }, { + Name: "config", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "infrabox-fluent-bit", + }, + }, + }, }}, }, },