Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/trunk' into kafka-18465-r…
Browse files Browse the repository at this point in the history
…emove-metadata-version-methods-for-versions-older-than-3.0

* apache-github/trunk:
  KAFKA-18340: Change Dockerfile to use log4j2 yaml instead log4j properties (apache#18378)
  MINOR: fix flaky RemoteLogManagerTest#testStopPartitionsWithDeletion (apache#18474)
  KAFKA-18311: Enforcing copartitioned topics (4/N) (apache#18397)
  KAFKA-18308; Update CoordinatorSerde (apache#18455)
  KAFKA-18440: Convert AuthorizationException to fatal error in AdminClient (apache#18435)
  KAFKA-17671: Create better documentation for transactions (apache#17454)
  KAFKA-18304; Introduce json converter generator (apache#18458)
  MINOR: Clean up classic group tests (apache#18473)
  KAFKA-18399 Remove ZooKeeper from KafkaApis (2/N): CONTROLLED_SHUTDOWN and ENVELOPE (apache#18422)
  MINOR: improve StreamThread periodic processing log (apache#18430)
  • Loading branch information
ijuma committed Jan 10, 2025
2 parents d6c543d + cd061c8 commit cf995bc
Show file tree
Hide file tree
Showing 27 changed files with 965 additions and 529 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1548,7 +1548,7 @@ project(':group-coordinator') {
"-o", "${projectDir}/build/generated/main/java/org/apache/kafka/coordinator/group/generated",
"-i", "src/main/resources/common/message",
"-m", "MessageDataGenerator", "JsonConverterGenerator",
"-t", "CoordinatorRecordTypeGenerator"
"-t", "CoordinatorRecordTypeGenerator", "CoordinatorRecordJsonConvertersGenerator"
]
inputs.dir("src/main/resources/common/message")
.withPropertyName("messages")
Expand Down Expand Up @@ -1810,7 +1810,7 @@ project(':share-coordinator') {
"-o", "${projectDir}/build/generated/main/java/org/apache/kafka/coordinator/share/generated",
"-i", "src/main/resources/common/message",
"-m", "MessageDataGenerator", "JsonConverterGenerator",
"-t", "CoordinatorRecordTypeGenerator"
"-t", "CoordinatorRecordTypeGenerator", "CoordinatorRecordJsonConvertersGenerator"
]
inputs.dir("src/main/resources/common/message")
.withPropertyName("messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestUtils;
import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;
Expand Down Expand Up @@ -277,23 +276,21 @@ public void updateFailed(Throwable exception) {
// We depend on pending calls to request another metadata update
this.state = State.QUIESCENT;

if (exception instanceof AuthenticationException) {
log.warn("Metadata update failed due to authentication error", exception);
this.fatalException = (ApiException) exception;
} else if (exception instanceof MismatchedEndpointTypeException) {
log.warn("Metadata update failed due to mismatched endpoint type error", exception);
this.fatalException = (ApiException) exception;
} else if (exception instanceof UnsupportedEndpointTypeException) {
log.warn("Metadata update failed due to unsupported endpoint type error", exception);
this.fatalException = (ApiException) exception;
} else if (exception instanceof UnsupportedVersionException) {
if (usingBootstrapControllers) {
log.warn("The remote node is not a CONTROLLER that supports the KIP-919 " +
"DESCRIBE_CLUSTER api.", exception);
} else {
log.warn("The remote node is not a BROKER that supports the METADATA api.", exception);
if (RequestUtils.isFatalException(exception)) {
log.warn("Fatal error during metadata update", exception);
// avoid unchecked/unconfirmed cast to ApiException
if (exception instanceof ApiException) {
this.fatalException = (ApiException) exception;
}

if (exception instanceof UnsupportedVersionException) {
if (usingBootstrapControllers) {
log.warn("The remote node is not a CONTROLLER that supports the KIP-919 " +
"DESCRIBE_CLUSTER api.", exception);
} else {
log.warn("The remote node is not a BROKER that supports the METADATA api.", exception);
}
}
this.fatalException = (ApiException) exception;
} else {
log.info("Metadata update failed", exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
Expand Down Expand Up @@ -77,4 +84,14 @@ public static ByteBuffer serialize(
writable.flip();
return writable.buffer();
}

public static boolean isFatalException(Throwable e) {
return e instanceof AuthenticationException ||
e instanceof AuthorizationException ||
e instanceof MismatchedEndpointTypeException ||
e instanceof SecurityDisabledException ||
e instanceof UnsupportedVersionException ||
e instanceof UnsupportedEndpointTypeException ||
e instanceof UnsupportedForMessageFormatException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;

Expand Down Expand Up @@ -98,6 +99,16 @@ public void testAuthenticationFailure() {
assertTrue(mgr.isReady());
}

@Test
public void testAuthorizationFailure() {
mgr.transitionToUpdatePending(time.milliseconds());
mgr.updateFailed(new AuthorizationException("Authorization failed"));
assertEquals(refreshBackoffMs, mgr.metadataFetchDelayMs(time.milliseconds()));
assertThrows(AuthorizationException.class, mgr::isReady);
mgr.update(mockCluster(), time.milliseconds());
assertTrue(mgr.isReady());
}

@Test
public void testNeedsRebootstrap() {
long rebootstrapTriggerMs = 1000;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class RequestUtilsTest {
@Test
public void testIsFatalException() {
assertTrue(RequestUtils.isFatalException(new AuthenticationException("")));
assertTrue(RequestUtils.isFatalException(new AuthorizationException("")));
assertTrue(RequestUtils.isFatalException(new MismatchedEndpointTypeException("")));
assertTrue(RequestUtils.isFatalException(new SecurityDisabledException("")));
assertTrue(RequestUtils.isFatalException(new UnsupportedEndpointTypeException("")));
assertTrue(RequestUtils.isFatalException(new UnsupportedForMessageFormatException("")));
assertTrue(RequestUtils.isFatalException(new UnsupportedVersionException("")));

// retriable exceptions
assertFalse(RequestUtils.isFatalException(new DisconnectException("")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class CoordinatorRecordSerde implements Serializer<CoordinatorRe
@Override
public byte[] serializeKey(CoordinatorRecord record) {
// Record does not accept a null key.
return MessageUtil.toVersionPrefixedBytes(
return MessageUtil.toCoordinatorTypePrefixedBytes(
record.key().version(),
record.key().message()
);
Expand Down Expand Up @@ -106,10 +106,10 @@ private void readMessage(ApiMessage message, ByteBuffer buffer, short version, S
* Concrete child class must provide implementation which returns appropriate
* type of {@link ApiMessage} objects representing the key.
*
* @param recordVersion - short representing version
* @param recordType - short representing type
* @return ApiMessage object
*/
protected abstract ApiMessage apiMessageKeyFor(short recordVersion);
protected abstract ApiMessage apiMessageKeyFor(short recordType);

/**
* Concrete child class must provide implementation which returns appropriate
Expand Down
57 changes: 0 additions & 57 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

/**
* Logic to handle the various Kafka requests
Expand Down Expand Up @@ -136,10 +135,6 @@ class KafkaApis(val requestChannel: RequestChannel,
info("Shutdown complete.")
}

private def isForwardingEnabled(request: RequestChannel.Request): Boolean = {
metadataSupport.forwardingManager.isDefined && request.context.principalSerde.isPresent
}

private def maybeForwardToController(
request: RequestChannel.Request,
handler: RequestChannel.Request => Unit
Expand Down Expand Up @@ -195,7 +190,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request).exceptionally(handleError)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
Expand Down Expand Up @@ -244,7 +238,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request)
Expand Down Expand Up @@ -290,27 +283,6 @@ class KafkaApis(val requestChannel: RequestChannel,
replicaManager.tryCompleteActions()
}

def handleControlledShutdownRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val controlledShutdownRequest = request.body[ControlledShutdownRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)

def controlledShutdownCallback(controlledShutdownResult: Try[Set[TopicPartition]]): Unit = {
val response = controlledShutdownResult match {
case Success(partitionsRemaining) =>
ControlledShutdownResponse.prepareResponse(Errors.NONE, partitionsRemaining.asJava)

case Failure(throwable) =>
controlledShutdownRequest.getErrorResponse(throwable)
}
requestHelper.sendResponseExemptThrottle(request, response)
}
zkSupport.controller.controlledShutdown(controlledShutdownRequest.data.brokerId, controlledShutdownRequest.data.brokerEpoch, controlledShutdownCallback)
}

/**
* Handle an offset commit request
*/
Expand Down Expand Up @@ -3312,35 +3284,6 @@ class KafkaApis(val requestChannel: RequestChannel,
new DescribeClusterResponse(response.setThrottleTimeMs(requestThrottleMs)))
}

def handleEnvelope(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))

// If forwarding is not yet enabled or this request has been received on an invalid endpoint,
// then we treat the request as unparsable and close the connection.
if (!isForwardingEnabled(request)) {
info(s"Closing connection ${request.context.connectionId} because it sent an `Envelope` " +
"request even though forwarding has not been enabled")
requestChannel.closeConnection(request, Collections.emptyMap())
return
} else if (!request.context.fromPrivilegedListener) {
info(s"Closing connection ${request.context.connectionId} from listener ${request.context.listenerName} " +
s"because it sent an `Envelope` request, which is only accepted on the inter-broker listener " +
s"${config.interBrokerListenerName}.")
requestChannel.closeConnection(request, Collections.emptyMap())
return
} else if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
return
} else if (!zkSupport.controller.isActive) {
requestHelper.sendErrorResponseMaybeThrottle(request, new NotControllerException(
s"Broker $brokerId is not the active controller"))
return
}

EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle(_, requestLocal))
}

def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
val describeProducersRequest = request.body[DescribeProducersRequest]

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2150,9 +2150,9 @@ public void testStopPartitionsWithDeletion() throws RemoteStorageException {
partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true, true));

when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
.thenAnswer(invocation -> listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(eq(followerTopicIdPartition)))
.thenReturn(listRemoteLogSegmentMetadata(followerTopicIdPartition, 3, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
.thenAnswer(invocation -> listRemoteLogSegmentMetadata(followerTopicIdPartition, 3, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
Expand Down
Loading

0 comments on commit cf995bc

Please sign in to comment.