Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add check for final logs once a pod is finished to ensure we got them… #1153

Draft
wants to merge 3 commits into
base: devel
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 126 additions & 1 deletion pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,10 @@
// known issues around this, as logstream can terminate due to log rotation
// or 4 hr timeout
defer streamWait.Done()
var sinceTime time.Time
podNamespace := kw.pod.Namespace
podName := kw.pod.Name
logStream, err := kw.kubeLoggingConnectionHandler(false, time.Time{})
logStream, err := kw.kubeLoggingConnectionHandler(true, sinceTime)
if err != nil {
return
}
Expand All @@ -292,6 +293,16 @@
*stdoutErr,
)
}
// After primary log streaming, retrieve remaining logs
err = kw.retrieveRemainingLogs(stdout, &sinceTime)
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Error(
"Error retrieving remaining logs for pod %s/%s. Error: %s",
podNamespace,
podName,
err,
)
}
}

func (kw *KubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *STDoutWriter, stdinErr *error, stdoutErr *error) {
Expand Down Expand Up @@ -404,6 +415,29 @@
}

logStream.Close()

// Check if the pod has terminated
podIsTerminated, err := kw.isPodTerminated()
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Error("Error checking pod status for %s/%s: %s", podNamespace, podName, err)
*stdoutErr = err

return
}
if podIsTerminated {
// Retrieve any remaining logs
err = kw.retrieveRemainingLogs(stdout, &sinceTime)
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Error(
"Error retrieving remaining logs for pod %s/%s. Error: %s",
podNamespace,
podName,
err,
)
}

break
}
}
}

Expand Down Expand Up @@ -596,6 +630,97 @@
return nil
}

func (kw *KubeUnit) retrieveRemainingLogs(stdout io.Writer, sinceTime *time.Time) error {
podNamespace := kw.pod.Namespace
podName := kw.pod.Name

// Set PodLogOptions to retrieve logs since the last timestamp
podOptions := &corev1.PodLogOptions{
Container: "worker",
Follow: false,
Previous: false,
Timestamps: true,
}

if sinceTime != nil {
podOptions.SinceTime = &metav1.Time{Time: *sinceTime}
}

// Get the logs
logReq := KubeAPIWrapperInstance.GetLogs(kw.clientset, podNamespace, podName, podOptions)
logStream, err := logReq.Stream(kw.GetContext())
if err != nil {
// Handle case where no logs are available
if apierrors.IsNotFound(err) {
kw.GetWorkceptor().nc.GetLogger().Info("No additional logs to retrieve for pod %s/%s", podNamespace, podName)

return nil
}
kw.GetWorkceptor().nc.GetLogger().Error("Error retrieving remaining logs for pod %s/%s: %s", podNamespace, podName, err)

return err
}
defer logStream.Close()

// Read and process the logs
streamReader := bufio.NewReader(logStream)
logsRetrieved := false

for {
line, err := streamReader.ReadString('\n')
if err != nil {
if err == io.EOF {
break
}
kw.GetWorkceptor().nc.GetLogger().Error("Error reading remaining logs for pod %s/%s: %s", podNamespace, podName, err)
return err

Check failure on line 676 in pkg/workceptor/kubernetes.go

View workflow job for this annotation

GitHub Actions / lint-receptor

return with no blank line before (nlreturn)
}

// Process the log line
split := strings.SplitN(line, " ", 2)
if len(split) != 2 {
continue // Skip malformed lines
}

timeStamp := ParseTime(split[0])
if timeStamp.After(*sinceTime) {
msg := split[1]

// Write the log message to stdout
_, err = stdout.Write([]byte(msg))
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Error("Error writing remaining logs for pod %s/%s: %s", podNamespace, podName, err)
return err

Check failure on line 693 in pkg/workceptor/kubernetes.go

View workflow job for this annotation

GitHub Actions / lint-receptor

return with no blank line before (nlreturn)
}
logsRetrieved = true
}
}

if !logsRetrieved {
kw.GetWorkceptor().nc.GetLogger().Info("No new logs retrieved for pod %s/%s", podNamespace, podName)
}

return nil
}

func (kw *KubeUnit) isPodTerminated() (bool, error) {
podNamespace := kw.pod.Namespace
podName := kw.pod.Name

// Get the latest pod status
pod, err := KubeAPIWrapperInstance.Get(kw.GetContext(), kw.clientset, podNamespace, podName, metav1.GetOptions{})
if err != nil {
return false, err
}

// Check if the pod has terminated
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
return true, nil
}

return false, nil
}

func (kw *KubeUnit) runWorkUsingLogger() {
skipStdin := true

Expand Down
Loading