Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick QueryLogger #9681 #9689

Open
wants to merge 3 commits into
base: 0.28.3-cc-docker-ksql.129.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public int runScript(final String scriptFile) {
LOGGER.error("An error occurred while running a script file. Error = "
+ exception.getMessage(), exception);

terminal.printError(ErrorMessageUtil.buildErrorMessage(exception),
terminal.printError(ErrorMessageUtil.buildErrorMessageWithStatements(exception),
exception.toString());
}

Expand All @@ -296,7 +296,7 @@ public int runCommand(final String command) {
LOGGER.error("An error occurred while running a command. Error = "
+ exception.getMessage(), exception);

terminal.printError(ErrorMessageUtil.buildErrorMessage(exception),
terminal.printError(ErrorMessageUtil.buildErrorMessageWithStatements(exception),
exception.toString());
}

Expand All @@ -319,7 +319,7 @@ public int runInteractively() {
} catch (final Exception exception) {
LOGGER.error("An error occurred while running a command. Error = "
+ exception.getMessage(), exception);
terminal.printError(ErrorMessageUtil.buildErrorMessage(exception),
terminal.printError(ErrorMessageUtil.buildErrorMessageWithStatements(exception),
exception.toString());
}
terminal.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public void printErrorMessage(final KsqlErrorMessage errorMessage) {
if (errorMessage instanceof KsqlStatementErrorMessage) {
printKsqlEntityList(((KsqlStatementErrorMessage) errorMessage).getEntities());
}
printError(errorMessage.getMessage(), errorMessage.toString());
printError(errorMessage.toString(), errorMessage.toString());
}

public void printError(final String shortMsg, final String fullMsg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ private void addToHistory(final String line) {
history.save();
} catch (final IOException e) {
LOGGER.error("Error saving history file", e);
terminal.writer()
.println("Error saving history file:" + ErrorMessageUtil.buildErrorMessage(e));
terminal.writer().println(
"Error saving history file:" + ErrorMessageUtil.buildErrorMessageWithStatements(e)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static void validateClient(

writer.println("");
writer.println("The server responded with the following error: ");
writer.println(ErrorMessageUtil.buildErrorMessage(exception));
writer.println(ErrorMessageUtil.buildErrorMessageWithStatements(exception));
writer.println(StringUtils.repeat('*', CONSOLE_WIDTH));
writer.println();
} finally {
Expand Down
28 changes: 28 additions & 0 deletions ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage;
import io.confluent.ksql.rest.entity.ServerInfo;
import io.confluent.ksql.rest.entity.StreamedRow.DataRow;
import io.confluent.ksql.rest.server.KsqlRestConfig;
Expand Down Expand Up @@ -1169,6 +1170,33 @@ public void shouldSubstituteVariablesOnRunCommand() {
containsString("Created query with ID CSAS_SHOULDRUNCOMMAND"));
}

@Test
public void shouldPrintStatementInError() throws Exception {
// Given:
final KsqlRestClient mockRestClient = givenMockRestClient();
when(mockRestClient.makeKsqlRequest(anyString(), anyLong()))
.thenReturn(RestResponse.erroneous(
NOT_ACCEPTABLE.code(),
new KsqlStatementErrorMessage(
Errors.toErrorCode(NOT_ACCEPTABLE.code()),
"error message",
"this is a statement",
new KsqlEntityList()
)
));

final StringBuilder builder = new StringBuilder();
builder.append("CREATE STREAM shouldRunCommand AS SELECT * FROM Asdf;");

// When:
localCli.runCommand(builder.toString());

// Then:
final String outputString = terminal.getOutputString();
assertThat(outputString, containsString("error message"));
assertThat(outputString, containsString("this is a statement"));
}

@Test
public void shouldThrowWhenTryingToSetDeniedProperty() throws Exception {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,32 @@ public static String buildErrorMessage(final Throwable throwable) {
return causeMsg.isEmpty() ? msg : msg + System.lineSeparator() + causeMsg;
}

/**
* Build an error message containing the message and potential query text
* of each throwable in the chain.
*
* <p>Throwable messages are separated by new lines.
*
* @param throwable the top level error.
* @return the error message.
*/
public static String buildErrorMessageWithStatements(final Throwable throwable) {
if (throwable == null) {
return "";
}

final List<String> messages = dedup(getErrorMessagesWithStatements(throwable));

final String msg = messages.remove(0);

final String causeMsg = messages.stream()
.filter(s -> !s.isEmpty())
.map(cause -> WordUtils.wrap(PREFIX + cause, 80, "\n\t", true))
.collect(Collectors.joining(System.lineSeparator()));

return causeMsg.isEmpty() ? msg : msg + System.lineSeparator() + causeMsg;
}

/**
* Build a list containing the error message for each throwable in the chain.
*
Expand All @@ -66,6 +92,18 @@ public static List<String> getErrorMessages(final Throwable e) {
.collect(Collectors.toList());
}

/**
* Build a list containing the error message for each throwable in the chain.
*
* @param e the top level error.
* @return the list of error messages.
*/
public static List<String> getErrorMessagesWithStatements(final Throwable e) {
return getThrowables(e).stream()
.map(ErrorMessageUtil::getErrorMessageWithStatement)
.collect(Collectors.toList());
}

private static String getErrorMessage(final Throwable e) {
if (e instanceof ConnectException) {
return "Could not connect to the server. "
Expand All @@ -75,6 +113,25 @@ private static String getErrorMessage(final Throwable e) {
}
}

private static String getErrorMessageWithStatement(final Throwable e) {
if (e instanceof ConnectException) {
return "Could not connect to the server. "
+ "Please check the server details are correct and that the server is running.";
} else if (e instanceof KsqlStatementException) {
final String message = e.getMessage() == null ? e.toString() : e.getMessage();
final String statement = ((KsqlStatementException) e).getSqlStatement();
final String result;
if (statement == null || statement.isEmpty()) {
result = message;
} else {
result = message + "\nStatement: " + statement;
}
return result;
} else {
return e.getMessage() == null ? e.toString() : e.getMessage();
}
}

private static List<Throwable> getThrowables(final Throwable e) {
final List<Throwable> list = new ArrayList<>();
Throwable cause = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,33 @@
public class KsqlStatementException extends KsqlException {

private final String sqlStatement;
private final boolean isProblemWithStatement;
private final String rawMessage;

public KsqlStatementException(final String message, final String sqlStatement) {
super(buildMessage(message, sqlStatement));
super(message);
this.rawMessage = message == null ? "" : message;
this.sqlStatement = sqlStatement == null ? "" : sqlStatement;
this.isProblemWithStatement = true;
}

public KsqlStatementException(final String message,
final String sqlStatement,
final boolean isProblemWithStatement) {
super(message);
this.rawMessage = message == null ? "" : message;
this.sqlStatement = sqlStatement == null ? "" : sqlStatement;
this.isProblemWithStatement = isProblemWithStatement;
}

public KsqlStatementException(
final String message,
final String sqlStatement,
final Throwable cause) {
super(buildMessage(message, sqlStatement), cause);
super(message, cause);
this.rawMessage = message == null ? "" : message;
this.sqlStatement = sqlStatement == null ? "" : sqlStatement;
this.isProblemWithStatement = true;
}

public String getSqlStatement() {
Expand All @@ -43,12 +55,7 @@ public String getRawMessage() {
return rawMessage;
}

private static String buildMessage(final String message, final String sqlStatement) {
return message + System.lineSeparator() + "Statement: " + sqlStatement;
}

@Override
public String toString() {
return buildMessage(rawMessage, "<retracted>");
public boolean isProblemWithStatement() {
return isProblemWithStatement;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlanConfluent @vpapavas Does this need to be in the final PR?

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.logging.query.QueryLogger;
import io.confluent.ksql.logicalplanner.LogicalPlan;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetaStoreImpl;
Expand Down Expand Up @@ -220,10 +221,11 @@ ExecuteResult execute(final KsqlPlan plan, final boolean restoreInProgress) {
// must be executed.
if (persistentQueryType == KsqlConstants.PersistentQueryType.CREATE_SOURCE
&& !isSourceTableMaterializationEnabled()) {
LOG.info(String.format(
"Source table query '%s' won't be materialized because '%s' is disabled.",
plan.getStatementText(),
KsqlConfig.KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED));
final String message = String.format(
"Source table query won't be materialized because '%s' is disabled.",
KsqlConfig.KSQL_SOURCE_TABLE_MATERIALIZATION_ENABLED
);
QueryLogger.info(message, plan.getStatementText());
return ExecuteResult.of(ddlResult.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,18 @@ public String visitListStreams(final ListStreamsContext context) {
return stringBuilder.toString();
}

@Override
public String visitListTables(final SqlBaseParser.ListTablesContext context) {
final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW();
final StringBuilder stringBuilder = new StringBuilder(listOrVisit.toString() + " TABLES");

if (context.EXTENDED() != null) {
stringBuilder.append(" EXTENDED");
}

return stringBuilder.toString();
}

@Override
public String visitListFunctions(final ListFunctionsContext context) {
final TerminalNode listOrVisit = context.LIST() != null ? context.LIST() : context.SHOW();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.engine.rewrite.QueryAnonymizer;
import io.confluent.ksql.parser.ParsingException;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -60,33 +59,52 @@ public static void configure(final KsqlConfig config) {
anonymizeQueries = config.getBoolean(KsqlConfig.KSQL_QUERYANONYMIZER_ENABLED);
}

private static void log(final Level level, final Object message, final Statement query) {
final String queryString = SqlFormatter.formatSql(query);
log(level, message, queryString);
}

private static void log(final Level level, final Object message, final String query) {
log(level, message, query, null);
}

private static void log(final Level level,
final Object message,
final String query,
final Throwable t) {
try {
final String anonQuery = anonymizeQueries
? anonymizer.anonymize(query) : query;
final QueryGuid queryGuids = buildGuids(query, anonQuery);
logger.log(level, buildPayload(message, anonQuery, queryGuids));
} catch (ParsingException e) {
if (logger.isDebugEnabled()) {
Logger.getRootLogger()
.log(
Level.DEBUG,
String.format("Failed to parse a query in query logger, message: %s", message));
}
final QueryLoggerMessage payload = buildPayload(message, anonQuery, queryGuids);
innerLog(level, payload, t);
} catch (final Exception e) {
final String unparsable = "<unparsable query>";
final QueryLoggerMessage payload = buildPayload(
message,
unparsable,
buildGuids(query, unparsable)
);
innerLog(level, payload, t);
}
}

private static void log(final Level level, final Object message, final Statement query) {
final String queryString = SqlFormatter.formatSql(query);
log(level, message, queryString);
private static void innerLog(final Level level,
final QueryLoggerMessage payload,
final Throwable t) {
if (t == null) {
logger.log(level, payload);
} else {
logger.log(level, payload, t);
}
}

private static QueryGuid buildGuids(final String query, final String anonymizedQuery) {
return new QueryGuid(namespace, query, anonymizedQuery);
}

private static QueryLoggerMessage buildPayload(final Object message, final String query,
final QueryGuid guid) {
final QueryGuid guid) {
return new QueryLoggerMessage(message, query, guid);
}

Expand All @@ -110,6 +128,10 @@ public static void warn(final Object message, final String query) {
log(Level.WARN, message, query);
}

public static void warn(final Object message, final String query, final Throwable t) {
log(Level.WARN, message, query, t);
}

public static void warn(final Object message, final Statement query) {
log(Level.WARN, message, query);
}
Expand All @@ -121,4 +143,8 @@ public static void error(final Object message, final String query) {
public static void error(final Object message, final Statement query) {
log(Level.ERROR, message, query);
}

public static void error(final String message, final String query, final Throwable t) {
log(Level.ERROR, message, query, t);
}
}
Loading