Skip to content

Commit

Permalink
KAFKA-18308; Update CoordinatorSerde (#18455)
Browse files Browse the repository at this point in the history
This patch updates the GroupCoordinatorSerde and the ShareGroupCoordinatorSerde to leverage the CoordinatorRecordType to deserialize records. With this, newly added record are automatically picked up. In other words, the serdes work with all defined records without doing anything.

Reviewers: Andrew Schofield <[email protected]>
  • Loading branch information
dajac authored Jan 10, 2025
1 parent 2b7c039 commit 87334e6
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class CoordinatorRecordSerde implements Serializer<CoordinatorRe
@Override
public byte[] serializeKey(CoordinatorRecord record) {
// Record does not accept a null key.
return MessageUtil.toVersionPrefixedBytes(
return MessageUtil.toCoordinatorTypePrefixedBytes(
record.key().version(),
record.key().message()
);
Expand Down Expand Up @@ -106,10 +106,10 @@ private void readMessage(ApiMessage message, ByteBuffer buffer, short version, S
* Concrete child class must provide implementation which returns appropriate
* type of {@link ApiMessage} objects representing the key.
*
* @param recordVersion - short representing version
* @param recordType - short representing type
* @return ApiMessage object
*/
protected abstract ApiMessage apiMessageKeyFor(short recordVersion);
protected abstract ApiMessage apiMessageKeyFor(short recordType);

/**
* Concrete child class must provide implementation which returns appropriate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ private void generate() {
buffer.printf("%n");
generateFromApiKey();
buffer.printf("%n");
generateNewRecordKey();
buffer.printf("%n");
generateNewRecordValue();
buffer.printf("%n");
generateAccessor("id", "short");
buffer.printf("%n");
generateAccessor("lowestSupportedVersion", "short");
Expand Down Expand Up @@ -171,7 +175,57 @@ private void generateFromApiKey() {
buffer.printf("default:%n");
buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Unknown metadata id \"" +
buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" +
" + id);%n");
buffer.decrementIndent();
buffer.decrementIndent();
buffer.printf("}%n");
buffer.decrementIndent();
buffer.printf("}%n");
}

private void generateNewRecordKey() {
headerGenerator.addImport(MessageGenerator.API_MESSAGE_CLASS);
buffer.printf("public ApiMessage newRecordKey() {%n");
buffer.incrementIndent();
buffer.printf("switch (id) {%n");
buffer.incrementIndent();
for (Map.Entry<Short, CoordinatorRecord> entry : records.entrySet()) {
buffer.printf("case %d:%n", entry.getKey());
buffer.incrementIndent();
buffer.printf("return new %s();%n",
MessageGenerator.capitalizeFirst(entry.getValue().key.name()));
buffer.decrementIndent();
}
buffer.printf("default:%n");
buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" +
" + id);%n");
buffer.decrementIndent();
buffer.decrementIndent();
buffer.printf("}%n");
buffer.decrementIndent();
buffer.printf("}%n");
}

private void generateNewRecordValue() {
headerGenerator.addImport(MessageGenerator.API_MESSAGE_CLASS);
buffer.printf("public ApiMessage newRecordValue() {%n");
buffer.incrementIndent();
buffer.printf("switch (id) {%n");
buffer.incrementIndent();
for (Map.Entry<Short, CoordinatorRecord> entry : records.entrySet()) {
buffer.printf("case %d:%n", entry.getKey());
buffer.incrementIndent();
buffer.printf("return new %s();%n",
MessageGenerator.capitalizeFirst(entry.getValue().value.name()));
buffer.decrementIndent();
}
buffer.printf("default:%n");
buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" +
" + id);%n");
buffer.decrementIndent();
buffer.decrementIndent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,138 +16,31 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;

/**
* Please ensure any new record added here stays in sync with DumpLogSegments.
*/
public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde {
// This method is temporary until the share coordinator is converted to
// using the new coordinator records.
@Override
public byte[] serializeKey(CoordinatorRecord record) {
// Record does not accept a null key.
return MessageUtil.toCoordinatorTypePrefixedBytes(
record.key().version(),
record.key().message()
);
}

@Override
protected ApiMessage apiMessageKeyFor(short recordVersion) {
switch (recordVersion) {
case 0:
case 1:
return new OffsetCommitKey();
case 2:
return new GroupMetadataKey();
case 3:
return new ConsumerGroupMetadataKey();
case 4:
return new ConsumerGroupPartitionMetadataKey();
case 5:
return new ConsumerGroupMemberMetadataKey();
case 6:
return new ConsumerGroupTargetAssignmentMetadataKey();
case 7:
return new ConsumerGroupTargetAssignmentMemberKey();
case 8:
return new ConsumerGroupCurrentMemberAssignmentKey();
case 9:
return new ShareGroupPartitionMetadataKey();
case 10:
return new ShareGroupMemberMetadataKey();
case 11:
return new ShareGroupMetadataKey();
case 12:
return new ShareGroupTargetAssignmentMetadataKey();
case 13:
return new ShareGroupTargetAssignmentMemberKey();
case 14:
return new ShareGroupCurrentMemberAssignmentKey();
case 15:
return new ShareGroupStatePartitionMetadataKey();
case 16:
return new ConsumerGroupRegularExpressionKey();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
protected ApiMessage apiMessageKeyFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
}
}

@Override
protected ApiMessage apiMessageValueFor(short recordVersion) {
switch (recordVersion) {
case 0:
case 1:
return new OffsetCommitValue();
case 2:
return new GroupMetadataValue();
case 3:
return new ConsumerGroupMetadataValue();
case 4:
return new ConsumerGroupPartitionMetadataValue();
case 5:
return new ConsumerGroupMemberMetadataValue();
case 6:
return new ConsumerGroupTargetAssignmentMetadataValue();
case 7:
return new ConsumerGroupTargetAssignmentMemberValue();
case 8:
return new ConsumerGroupCurrentMemberAssignmentValue();
case 9:
return new ShareGroupPartitionMetadataValue();
case 10:
return new ShareGroupMemberMetadataValue();
case 11:
return new ShareGroupMetadataValue();
case 12:
return new ShareGroupTargetAssignmentMetadataValue();
case 13:
return new ShareGroupTargetAssignmentMemberValue();
case 14:
return new ShareGroupCurrentMemberAssignmentValue();
case 15:
return new ShareGroupStatePartitionMetadataValue();
case 16:
return new ConsumerGroupRegularExpressionValue();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
try {
return CoordinatorRecordType.fromId(recordVersion).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,9 @@
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -245,23 +216,9 @@ public void testDeserializeWithInvalidValueBytes() {

@Test
public void testDeserializeAllRecordTypes() {
roundTrip((short) 0, new OffsetCommitKey(), new OffsetCommitValue());
roundTrip((short) 1, new OffsetCommitKey(), new OffsetCommitValue());
roundTrip((short) 2, new GroupMetadataKey(), new GroupMetadataValue());
roundTrip((short) 3, new ConsumerGroupMetadataKey(), new ConsumerGroupMetadataValue());
roundTrip((short) 4, new ConsumerGroupPartitionMetadataKey(), new ConsumerGroupPartitionMetadataValue());
roundTrip((short) 5, new ConsumerGroupMemberMetadataKey(), new ConsumerGroupMemberMetadataValue());
roundTrip((short) 6, new ConsumerGroupTargetAssignmentMetadataKey(), new ConsumerGroupTargetAssignmentMetadataValue());
roundTrip((short) 7, new ConsumerGroupTargetAssignmentMemberKey(), new ConsumerGroupTargetAssignmentMemberValue());
roundTrip((short) 8, new ConsumerGroupCurrentMemberAssignmentKey(), new ConsumerGroupCurrentMemberAssignmentValue());
roundTrip((short) 9, new ShareGroupPartitionMetadataKey(), new ShareGroupPartitionMetadataValue());
roundTrip((short) 10, new ShareGroupMemberMetadataKey(), new ShareGroupMemberMetadataValue());
roundTrip((short) 11, new ShareGroupMetadataKey(), new ShareGroupMetadataValue());
roundTrip((short) 12, new ShareGroupTargetAssignmentMetadataKey(), new ShareGroupTargetAssignmentMetadataValue());
roundTrip((short) 13, new ShareGroupTargetAssignmentMemberKey(), new ShareGroupTargetAssignmentMemberValue());
roundTrip((short) 14, new ShareGroupCurrentMemberAssignmentKey(), new ShareGroupCurrentMemberAssignmentValue());
roundTrip((short) 15, new ShareGroupStatePartitionMetadataKey(), new ShareGroupStatePartitionMetadataValue());
roundTrip((short) 16, new ConsumerGroupRegularExpressionKey(), new ConsumerGroupRegularExpressionValue());
for (CoordinatorRecordType record : CoordinatorRecordType.values()) {
roundTrip(record.id(), record.newRecordKey(), record.newRecordValue());
}
}

private void roundTrip(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,28 @@

package org.apache.kafka.coordinator.share;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;

public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde {
@Override
protected ApiMessage apiMessageKeyFor(short recordVersion) {
switch (recordVersion) {
case 0:
return new ShareSnapshotKey();
case 1:
return new ShareUpdateKey();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
protected ApiMessage apiMessageKeyFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
}
}

@Override
protected ApiMessage apiMessageValueFor(short recordVersion) {
switch (recordVersion) {
case 0:
return new ShareSnapshotValue();
case 1:
return new ShareUpdateValue();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
try {
return CoordinatorRecordType.fromId(recordVersion).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -199,8 +197,9 @@ public void testDeserializeWithInvalidValueBytes() {

@Test
public void testDeserializeAllRecordTypes() {
roundTrip((short) 0, new ShareSnapshotKey(), new ShareSnapshotValue());
roundTrip((short) 1, new ShareUpdateKey(), new ShareUpdateValue());
for (CoordinatorRecordType record : CoordinatorRecordType.values()) {
roundTrip(record.id(), record.newRecordKey(), record.newRecordValue());
}
}

private void roundTrip(
Expand Down
Loading

0 comments on commit 87334e6

Please sign in to comment.