Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add and distribute IQv2 information in KIP-1071 #18278

Open
wants to merge 8 commits into
base: kip1071
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor;
Expand Down Expand Up @@ -2340,12 +2341,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
);

scheduleStreamsGroupSessionTimeout(groupId, memberId);

List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitions = maybeBuildEndpointToPartitions(group);
// Prepare the response.
StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
.setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs);
.setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs)
.setPartitionsByUserEndpoint(endpointToPartitions);

// The assignment is only provided in the following cases:
// 1. The member sent a full request.
Expand Down Expand Up @@ -2378,6 +2380,27 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
}

private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildEndpointToPartitions(StreamsGroup group) {
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<>();
EndpointToPartitionsManager endpointToPartitionsManager = new EndpointToPartitionsManager();
// Build the endpoint to topic partition information
final Map<String, StreamsGroupMember> members = group.members();
for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) {
final String memberIdForAssignment = entry.getKey();
final StreamsGroupMemberMetadataValue.Endpoint endpoint = members.get(memberIdForAssignment).userEndpoint();
StreamsGroupMember groupMember = entry.getValue();
if (endpoint != null) {
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
responseEndpoint.setHost(endpoint.host());
responseEndpoint.setPort(endpoint.port());
StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = endpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, group);
endpointToPartitionsList.add(endpointToPartitions);
}
}
return endpointToPartitionsList;
}


private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIds(final Map<String, Set<Integer>> taskIds) {
return taskIds.entrySet().stream()
.map(entry -> new StreamsGroupHeartbeatResponseData.TaskIds()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TopicMetadata;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;


public class EndpointToPartitionsManager {


public EndpointToPartitionsManager() {}

public StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions(final StreamsGroupMember streamsGroupMember,
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint,
final StreamsGroup streamsGroup) {

StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
List<StreamsGroupHeartbeatResponseData.TopicPartition> allTopicPartitions = new ArrayList<>();
for (Map.Entry<String, ConfiguredSubtopology> entry : streamsGroup.configuredTopology().subtopologies().entrySet()) {
ConfiguredSubtopology configuredSubtopology = entry.getValue();
endpointToPartitions.setUserEndpoint(responseEndpoint);
final Map<String, TopicMetadata> groupTopicMetadata = streamsGroup.partitionMetadata();
getActiveTaskTopicPartitions(configuredSubtopology, groupTopicMetadata, streamsGroupMember, allTopicPartitions);
}
endpointToPartitions.setPartitions(allTopicPartitions);
return endpointToPartitions;
}

private void getActiveTaskTopicPartitions(final ConfiguredSubtopology configuredSubtopology,
final Map<String, TopicMetadata> groupTopicMetadata,
final StreamsGroupMember streamsGroupMember,
final List<StreamsGroupHeartbeatResponseData.TopicPartition> allTopicPartitions) {

List<String> standbyTopicNames = configuredSubtopology.nonSourceChangelogTopics().stream().map(ConfiguredInternalTopic::name).toList();

for (Map.Entry<String, Set<Integer>> taskEntry : streamsGroupMember.assignedActiveTasks().entrySet()) {
Set<Integer> taskPartitions = taskEntry.getValue();
List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionList =
Stream.concat(
configuredSubtopology.sourceTopics().stream(),
configuredSubtopology.repartitionSourceTopics().keySet().stream()
).map(topic -> {
int numPartitionsForTopic = groupTopicMetadata.get(topic).numPartitions();
StreamsGroupHeartbeatResponseData.TopicPartition tp = new StreamsGroupHeartbeatResponseData.TopicPartition();
tp.setTopic(topic);
List<Integer> tpPartitions = new ArrayList<>(taskPartitions);
if (numPartitionsForTopic < taskPartitions.size()) {
Collections.sort(tpPartitions);
tp.setPartitions(tpPartitions.subList(0, numPartitionsForTopic));
} else {
tp.setPartitions(tpPartitions);
}
return tp;
}).toList();
allTopicPartitions.addAll(topicPartitionList);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.common.MetadataVersion;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkstyle fix

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.TopicMetadata;
import org.apache.kafka.timeline.SnapshotRegistry;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.mock;

class EndpointToPartitionsManagerTest {

private static final LogContext LOG_CONTEXT = new LogContext();

@ParameterizedTest(name = "{5}")
@MethodSource("argsProvider")
void testEndpointToPartitionsWithTwoTopicsAndDifferentPartitions(int topicAPartitions,
int topicBPartitions,
List<Integer> topicAExpectedPartitions,
List<Integer> topicBExpectedPartitions,
String testName
) {
Uuid topicAId = Uuid.randomUuid();
Uuid topicBId = Uuid.randomUuid();

Map<Integer, Set<String>> emptyRackMap = Collections.emptyMap();
StreamsGroupHeartbeatResponseData.Endpoint endpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
endpoint.setPort(8080);
endpoint.setHost("localhost");

Map<String, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put("Topic-A", new TopicMetadata(topicAId, "Topic-A", topicAPartitions, emptyRackMap));
topicMetadata.put("Topic-B", new TopicMetadata(topicBId, "Topic-B", topicBPartitions, emptyRackMap));

StreamsGroup streamsGroup = createStreamsGroup("streamsGroup");
streamsGroup.setPartitionMetadata(topicMetadata);
streamsGroup.setGroupEpoch(1);
streamsGroup.setTopology(topology());

EndpointToPartitionsManager endpointToPartitionsManager = new EndpointToPartitionsManager();

StreamsGroupHeartbeatResponseData.EndpointToPartitions result = endpointToPartitionsManager.endpointToPartitions(null, endpoint, streamsGroup);

assertEquals(endpoint, result.userEndpoint());
assertEquals(2, result.partitions().size());

StreamsGroupHeartbeatResponseData.TopicPartition topicAPartition = result.partitions().get(0);
assertEquals("Topic-A", topicAPartition.topic());
assertEquals(topicAExpectedPartitions, topicAPartition.partitions());

StreamsGroupHeartbeatResponseData.TopicPartition topicBPartition = result.partitions().get(1);
assertEquals("Topic-B", topicBPartition.topic());
assertEquals(topicBExpectedPartitions, topicBPartition.partitions());
}

static Stream<Arguments> argsProvider() {
return Stream.of(
arguments(2, 5, new TreeSet<>(List.of(0, 1, 2, 3, 4)), List.of(0, 1), List.of(0, 1, 2, 3, 4), "Should assign correct partitions when partitions differ between topics"),
arguments(3, 3, new TreeSet<>(List.of(0, 1, 2)), List.of(0, 1, 2), List.of(0, 1, 2), "Should assign correct partitions when partitions same between topics")
);
}

private StreamsTopology topology() {
StreamsGroupTopologyValue.Subtopology subtopology = new StreamsGroupTopologyValue.Subtopology();
subtopology.setSubtopologyId("subtopology-1");
subtopology.setSourceTopics(List.of("Topic-A", "Topic-B"));
return new StreamsTopology(1, Map.of("subtopology-1", subtopology));
}

private StreamsGroup createStreamsGroup(String groupId) {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
return new StreamsGroup(
LOG_CONTEXT,
snapshotRegistry,
groupId,
mock(GroupCoordinatorMetricsShard.class)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.coordinator.group.taskassignor;

import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkstyle fix

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
Expand Down Expand Up @@ -74,15 +76,20 @@ public class ConsistencyVectorIntegrationTest {
private static final int KEY = 1;
private static final int NUMBER_OF_MESSAGES = 100;

public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
public EmbeddedKafkaCluster cluster;

private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
private final MockTime mockTime = cluster.time;
private MockTime mockTime;

@BeforeEach
public void before() throws InterruptedException, IOException {
final Properties props = new Properties();
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams");
props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
cluster = new EmbeddedKafkaCluster(NUM_BROKERS, props);
cluster.start();
cluster.createTopic(INPUT_TOPIC_NAME, 1, 1);
mockTime = cluster.time;
}

@AfterEach
Expand Down Expand Up @@ -213,6 +220,7 @@ private Properties streamsConfiguration(final String safeTestName) {
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
config.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
config.put(InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, true);
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
Expand Down Expand Up @@ -122,7 +124,9 @@ public class EosIntegrationTest {

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
NUM_BROKERS,
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true"))
Utils.mkProperties(Map.of("auto.create.topics.enable", "true",
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams",
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"))
);

@BeforeAll
Expand Down Expand Up @@ -248,6 +252,8 @@ private void runSimpleCopyTest(final int numberOfRestarts,
final Properties properties = new Properties();
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:7777");
properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
Expand Down
Loading
Loading