Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][Zeta][Log] Fix zeta parallelStream log trace for job #8456

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,34 @@
package org.apache.seatunnel.api.tracing;

import java.util.concurrent.Callable;
import java.util.function.Supplier;

/**
* Callable that sets MDC context before calling the delegate and clears it afterwards.
*
* @param <V>
*/
public class MDCCallable<V> implements Callable<V> {
private final MDCContext context;
private final Supplier<MDCContext> contextSupplier;
private final Callable<V> delegate;

public MDCCallable(Callable<V> delegate) {
this(MDCContext.current(), delegate);
}

public MDCCallable(MDCContext context, Callable<V> delegate) {
this.context = context;
this(() -> context, delegate);
}

public MDCCallable(Supplier<MDCContext> contextSupplier, Callable<V> delegate) {
this.contextSupplier = contextSupplier;
this.delegate = delegate;
}

@Override
public V call() throws Exception {
try {
context.put();

try (MDCContext ignored = contextSupplier.get().activate()) {
return delegate.call();
} finally {
context.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,29 @@
package org.apache.seatunnel.api.tracing;

import java.util.Comparator;
import java.util.function.Supplier;

public class MDCComparator<T> implements Comparator<T> {
private final MDCContext context;
private final Supplier<MDCContext> contextSupplier;
private final Comparator<T> delegate;

public MDCComparator(Comparator<T> delegate) {
this(MDCContext.current(), delegate);
}

public MDCComparator(MDCContext context, Comparator<T> delegate) {
this.context = context;
this(() -> context, delegate);
}

public MDCComparator(Supplier<MDCContext> contextSupplier, Comparator<T> delegate) {
this.contextSupplier = contextSupplier;
this.delegate = delegate;
}

@Override
public int compare(T o1, T o2) {
try {
context.put();
try (MDCContext ignored = contextSupplier.get().activate()) {
return delegate.compare(o1, o2);
} finally {
context.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,29 @@
package org.apache.seatunnel.api.tracing;

import java.util.function.Consumer;
import java.util.function.Supplier;

public class MDCConsumer<T> implements Consumer<T> {
private final MDCContext context;
private final Supplier<MDCContext> contextSupplier;
private final Consumer<T> delegate;

public MDCConsumer(Consumer<T> delegate) {
this(MDCContext.current(), delegate);
}

public MDCConsumer(MDCContext context, Consumer<T> delegate) {
this.context = context;
this(() -> context, delegate);
}

public MDCConsumer(Supplier<MDCContext> contextSupplier, Consumer<T> delegate) {
this.contextSupplier = contextSupplier;
this.delegate = delegate;
}

@Override
public void accept(T t) {
try {
context.put();
try (MDCContext ignored = contextSupplier.get().activate()) {
delegate.accept(t);
} finally {
context.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,37 @@

import org.slf4j.MDC;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;

import java.io.Closeable;
import java.io.Serializable;

/**
* MDC context for tracing.
*
* <p>reference: https://www.slf4j.org/manual.html#mdc
*
* <p>Example:
*
* <pre>
* try (MDCContext ctx = MDCContext.of(jobId, pipelineId, taskId).activate()) {
* // do something
* new Thread(new MDCRunnable(MDCContext.current(), new Runnable() {
* @Override
* public void run() {
* // do something
* }
* }))
* .start();
* }
* // MDC context will be restored after the try block
* </pre>
*/
@Slf4j
@Builder
@EqualsAndHashCode
public class MDCContext implements Serializable {
private static final MDCContext EMPTY = MDCContext.builder().build();
public class MDCContext implements Serializable, Closeable {
private static final MDCContext EMPTY = new MDCContext(null, null, null);
private static final String EMPTY_TO_STRING = "NA";

public static final String JOB_ID = "ST-JID";
Expand All @@ -44,55 +59,24 @@ public class MDCContext implements Serializable {
private final Long jobId;
private final Long pipelineId;
private final Long taskId;
private transient volatile MDCContext toRestore;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache parent context


public static MDCContext of(long jobId) {
return MDCContext.builder().jobId(jobId).build();
}

public static MDCContext of(long jobId, long pipelineId) {
return MDCContext.builder().jobId(jobId).pipelineId(pipelineId).build();
}

public static MDCContext of(long jobId, long pipelineId, long taskId) {
return MDCContext.builder().jobId(jobId).pipelineId(pipelineId).taskId(taskId).build();
}

public static MDCContext current() {
return MDCContext.builder()
.jobId(MDC.get(JOB_ID) != null ? Long.parseLong(MDC.get(JOB_ID)) : null)
.pipelineId(
MDC.get(PIPELINE_ID) != null ? Long.parseLong(MDC.get(PIPELINE_ID)) : null)
.taskId(MDC.get(TASK_ID) != null ? Long.parseLong(MDC.get(TASK_ID)) : null)
.build();
public MDCContext(Long jobId, Long pipelineId, Long taskId) {
this.jobId = jobId;
this.pipelineId = pipelineId;
this.taskId = taskId;
}

public static MDCContext valueOf(String s) {
if (EMPTY_TO_STRING.equals(s)) {
return EMPTY;
public synchronized MDCContext activate() {
if (this == EMPTY) {
return this;
}

String[] arr = s.split("/");
Long jobId = Long.parseLong(arr[0]);
Long pipelineId = Long.parseLong(arr[1]);
Long taskId = Long.parseLong(arr[2]);
if (pipelineId == 0 || taskId == 0) {
return MDCContext.of(jobId);
}
return MDCContext.of(jobId, pipelineId, taskId);
}

@Override
public String toString() {
if (jobId != null) {
return String.format(
"%d/%d/%d",
jobId, pipelineId == null ? 0 : pipelineId, taskId == null ? 0 : taskId);
} else {
return EMPTY_TO_STRING;
if (this.toRestore != null) {
throw new IllegalStateException("MDCContext is already activated");
}
}
this.toRestore = current();

public void put() {
try {
if (jobId != null) {
MDC.put(JOB_ID, String.valueOf(jobId));
Expand All @@ -107,9 +91,18 @@ public void put() {
log.error("Failed to put MDC context", e);
throw e;
}
return this;
}

public void clear() {
public synchronized MDCContext deactivate() {
if (this == EMPTY) {
return this;
}

if (this.toRestore == null) {
throw new IllegalStateException("MDCContext is not activated");
}

try {
MDC.remove(JOB_ID);
MDC.remove(PIPELINE_ID);
Expand All @@ -118,5 +111,71 @@ public void clear() {
log.error("Failed to clear MDC context", e);
throw e;
}

if (this.toRestore != null) {
this.toRestore.activate();
}

return this;
}

@Override
public void close() {
deactivate();
}

@Override
public String toString() {
if (this == EMPTY) {
return EMPTY_TO_STRING;
}
return String.format(
"%d/%d/%d",
jobId, pipelineId == null ? 0 : pipelineId, taskId == null ? 0 : taskId);
}

public static MDCContext of(long jobId) {
return new MDCContext(jobId, null, null);
}

public static MDCContext of(long jobId, long pipelineId) {
return new MDCContext(jobId, pipelineId, null);
}

public static MDCContext of(long jobId, long pipelineId, long taskId) {
return new MDCContext(jobId, pipelineId, taskId);
}

public static MDCContext of(MDCContext context) {
return new MDCContext(context.jobId, context.pipelineId, context.taskId);
}

public static MDCContext current() {
String jobId = MDC.get(JOB_ID);
if (jobId == null) {
return EMPTY;
}

String pipelineId = MDC.get(PIPELINE_ID);
String taskId = MDC.get(TASK_ID);
return new MDCContext(
Long.parseLong(jobId),
pipelineId != null ? Long.parseLong(pipelineId) : null,
taskId != null ? Long.parseLong(taskId) : null);
}

public static MDCContext valueOf(String s) {
if (EMPTY_TO_STRING.equals(s)) {
return EMPTY;
}

String[] arr = s.split("/");
Long jobId = Long.parseLong(arr[0]);
Long pipelineId = Long.parseLong(arr[1]);
Long taskId = Long.parseLong(arr[2]);
if (pipelineId == 0 || taskId == 0) {
return MDCContext.of(jobId);
}
return MDCContext.of(jobId, pipelineId, taskId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ public MDCExecutor(MDCContext context, Executor delegate) {

@Override
public void execute(Runnable command) {
delegate.execute(new MDCRunnable(context, command));
delegate.execute(new MDCRunnable(MDCContext.of(context), command));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,25 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(new MDCCallable<>(context, task));
return delegate.submit(new MDCCallable<>(MDCContext.of(context), task));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(new MDCRunnable(context, task), result);
return delegate.submit(new MDCRunnable(MDCContext.of(context), task), result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate.submit(new MDCRunnable(context, task));
return delegate.submit(new MDCRunnable(MDCContext.of(context), task));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return delegate.invokeAll(
tasks.stream()
.map(task -> new MDCCallable<>(context, task))
.map(task -> new MDCCallable<>(MDCContext.of(context), task))
.collect(Collectors.toList()));
}

Expand All @@ -93,7 +93,7 @@ public <T> List<Future<T>> invokeAll(
throws InterruptedException {
return delegate.invokeAll(
tasks.stream()
.map(task -> new MDCCallable<>(context, task))
.map(task -> new MDCCallable<>(MDCContext.of(context), task))
.collect(Collectors.toList()),
timeout,
unit);
Expand All @@ -104,7 +104,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return delegate.invokeAny(
tasks.stream()
.map(task -> new MDCCallable<>(context, task))
.map(task -> new MDCCallable<>(MDCContext.of(context), task))
.collect(Collectors.toList()));
}

Expand All @@ -113,7 +113,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(
tasks.stream()
.map(task -> new MDCCallable<>(context, task))
.map(task -> new MDCCallable<>(MDCContext.of(context), task))
.collect(Collectors.toList()),
timeout,
unit);
Expand Down
Loading
Loading