diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java index 56f9a6cae1313..9e5afc3b89f2a 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java @@ -43,7 +43,7 @@ public abstract class CoordinatorRecordSerde implements Serializer entry : records.entrySet()) { + buffer.printf("case %d:%n", entry.getKey()); + buffer.incrementIndent(); + buffer.printf("return new %s();%n", + MessageGenerator.capitalizeFirst(entry.getValue().key.name())); + buffer.decrementIndent(); + } + buffer.printf("default:%n"); + buffer.incrementIndent(); + headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); + buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" + + " + id);%n"); + buffer.decrementIndent(); + buffer.decrementIndent(); + buffer.printf("}%n"); + buffer.decrementIndent(); + buffer.printf("}%n"); + } + + private void generateNewRecordValue() { + headerGenerator.addImport(MessageGenerator.API_MESSAGE_CLASS); + buffer.printf("public ApiMessage newRecordValue() {%n"); + buffer.incrementIndent(); + buffer.printf("switch (id) {%n"); + buffer.incrementIndent(); + for (Map.Entry entry : records.entrySet()) { + buffer.printf("case %d:%n", entry.getKey()); + buffer.incrementIndent(); + buffer.printf("return new %s();%n", + MessageGenerator.capitalizeFirst(entry.getValue().value.name())); + buffer.decrementIndent(); + } + buffer.printf("default:%n"); + buffer.incrementIndent(); + headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS); + buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" + " + id);%n"); buffer.decrementIndent(); buffer.decrementIndent(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java index afa489502d8b0..9143079aa115b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java @@ -16,138 +16,31 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; -import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; -import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; -import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; /** * Please ensure any new record added here stays in sync with DumpLogSegments. */ public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde { - // This method is temporary until the share coordinator is converted to - // using the new coordinator records. @Override - public byte[] serializeKey(CoordinatorRecord record) { - // Record does not accept a null key. - return MessageUtil.toCoordinatorTypePrefixedBytes( - record.key().version(), - record.key().message() - ); - } - - @Override - protected ApiMessage apiMessageKeyFor(short recordVersion) { - switch (recordVersion) { - case 0: - case 1: - return new OffsetCommitKey(); - case 2: - return new GroupMetadataKey(); - case 3: - return new ConsumerGroupMetadataKey(); - case 4: - return new ConsumerGroupPartitionMetadataKey(); - case 5: - return new ConsumerGroupMemberMetadataKey(); - case 6: - return new ConsumerGroupTargetAssignmentMetadataKey(); - case 7: - return new ConsumerGroupTargetAssignmentMemberKey(); - case 8: - return new ConsumerGroupCurrentMemberAssignmentKey(); - case 9: - return new ShareGroupPartitionMetadataKey(); - case 10: - return new ShareGroupMemberMetadataKey(); - case 11: - return new ShareGroupMetadataKey(); - case 12: - return new ShareGroupTargetAssignmentMetadataKey(); - case 13: - return new ShareGroupTargetAssignmentMemberKey(); - case 14: - return new ShareGroupCurrentMemberAssignmentKey(); - case 15: - return new ShareGroupStatePartitionMetadataKey(); - case 16: - return new ConsumerGroupRegularExpressionKey(); - default: - throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + protected ApiMessage apiMessageKeyFor(short recordType) { + try { + return CoordinatorRecordType.fromId(recordType).newRecordKey(); + } catch (UnsupportedVersionException ex) { + throw new CoordinatorLoader.UnknownRecordTypeException(recordType); } } @Override protected ApiMessage apiMessageValueFor(short recordVersion) { - switch (recordVersion) { - case 0: - case 1: - return new OffsetCommitValue(); - case 2: - return new GroupMetadataValue(); - case 3: - return new ConsumerGroupMetadataValue(); - case 4: - return new ConsumerGroupPartitionMetadataValue(); - case 5: - return new ConsumerGroupMemberMetadataValue(); - case 6: - return new ConsumerGroupTargetAssignmentMetadataValue(); - case 7: - return new ConsumerGroupTargetAssignmentMemberValue(); - case 8: - return new ConsumerGroupCurrentMemberAssignmentValue(); - case 9: - return new ShareGroupPartitionMetadataValue(); - case 10: - return new ShareGroupMemberMetadataValue(); - case 11: - return new ShareGroupMetadataValue(); - case 12: - return new ShareGroupTargetAssignmentMetadataValue(); - case 13: - return new ShareGroupTargetAssignmentMemberValue(); - case 14: - return new ShareGroupCurrentMemberAssignmentValue(); - case 15: - return new ShareGroupStatePartitionMetadataValue(); - case 16: - return new ConsumerGroupRegularExpressionValue(); - default: - throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + try { + return CoordinatorRecordType.fromId(recordVersion).newRecordValue(); + } catch (UnsupportedVersionException ex) { + throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java index 94eb326af3856..319cc9358a2b7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java @@ -20,38 +20,9 @@ import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; -import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; -import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; -import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; @@ -245,23 +216,9 @@ public void testDeserializeWithInvalidValueBytes() { @Test public void testDeserializeAllRecordTypes() { - roundTrip((short) 0, new OffsetCommitKey(), new OffsetCommitValue()); - roundTrip((short) 1, new OffsetCommitKey(), new OffsetCommitValue()); - roundTrip((short) 2, new GroupMetadataKey(), new GroupMetadataValue()); - roundTrip((short) 3, new ConsumerGroupMetadataKey(), new ConsumerGroupMetadataValue()); - roundTrip((short) 4, new ConsumerGroupPartitionMetadataKey(), new ConsumerGroupPartitionMetadataValue()); - roundTrip((short) 5, new ConsumerGroupMemberMetadataKey(), new ConsumerGroupMemberMetadataValue()); - roundTrip((short) 6, new ConsumerGroupTargetAssignmentMetadataKey(), new ConsumerGroupTargetAssignmentMetadataValue()); - roundTrip((short) 7, new ConsumerGroupTargetAssignmentMemberKey(), new ConsumerGroupTargetAssignmentMemberValue()); - roundTrip((short) 8, new ConsumerGroupCurrentMemberAssignmentKey(), new ConsumerGroupCurrentMemberAssignmentValue()); - roundTrip((short) 9, new ShareGroupPartitionMetadataKey(), new ShareGroupPartitionMetadataValue()); - roundTrip((short) 10, new ShareGroupMemberMetadataKey(), new ShareGroupMemberMetadataValue()); - roundTrip((short) 11, new ShareGroupMetadataKey(), new ShareGroupMetadataValue()); - roundTrip((short) 12, new ShareGroupTargetAssignmentMetadataKey(), new ShareGroupTargetAssignmentMetadataValue()); - roundTrip((short) 13, new ShareGroupTargetAssignmentMemberKey(), new ShareGroupTargetAssignmentMemberValue()); - roundTrip((short) 14, new ShareGroupCurrentMemberAssignmentKey(), new ShareGroupCurrentMemberAssignmentValue()); - roundTrip((short) 15, new ShareGroupStatePartitionMetadataKey(), new ShareGroupStatePartitionMetadataValue()); - roundTrip((short) 16, new ConsumerGroupRegularExpressionKey(), new ConsumerGroupRegularExpressionValue()); + for (CoordinatorRecordType record : CoordinatorRecordType.values()) { + roundTrip(record.id(), record.newRecordKey(), record.newRecordValue()); + } } private void roundTrip( diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java index 28f59e57d336a..1fbfabb98f201 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java @@ -17,36 +17,28 @@ package org.apache.kafka.coordinator.share; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; -import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey; -import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue; -import org.apache.kafka.coordinator.share.generated.ShareUpdateKey; -import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; +import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType; public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde { @Override - protected ApiMessage apiMessageKeyFor(short recordVersion) { - switch (recordVersion) { - case 0: - return new ShareSnapshotKey(); - case 1: - return new ShareUpdateKey(); - default: - throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + protected ApiMessage apiMessageKeyFor(short recordType) { + try { + return CoordinatorRecordType.fromId(recordType).newRecordKey(); + } catch (UnsupportedVersionException ex) { + throw new CoordinatorLoader.UnknownRecordTypeException(recordType); } } @Override protected ApiMessage apiMessageValueFor(short recordVersion) { - switch (recordVersion) { - case 0: - return new ShareSnapshotValue(); - case 1: - return new ShareUpdateValue(); - default: - throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + try { + return CoordinatorRecordType.fromId(recordVersion).newRecordValue(); + } catch (UnsupportedVersionException ex) { + throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); } } } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java index c11df0c19bb11..3f40924028bc3 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java @@ -24,8 +24,6 @@ import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey; import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue; -import org.apache.kafka.coordinator.share.generated.ShareUpdateKey; -import org.apache.kafka.coordinator.share.generated.ShareUpdateValue; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.BeforeEach; @@ -199,8 +197,9 @@ public void testDeserializeWithInvalidValueBytes() { @Test public void testDeserializeAllRecordTypes() { - roundTrip((short) 0, new ShareSnapshotKey(), new ShareSnapshotValue()); - roundTrip((short) 1, new ShareUpdateKey(), new ShareUpdateValue()); + for (CoordinatorRecordType record : CoordinatorRecordType.values()) { + roundTrip(record.id(), record.newRecordKey(), record.newRecordValue()); + } } private void roundTrip( diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java index f8d1c72f4ef4b..07387948c6cad 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java @@ -25,12 +25,12 @@ public class TransactionCoordinatorRecordSerde extends CoordinatorRecordSerde { @Override - protected ApiMessage apiMessageKeyFor(short recordVersion) { - switch (recordVersion) { + protected ApiMessage apiMessageKeyFor(short recordType) { + switch (recordType) { case 0: return new TransactionLogKey(); default: - throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion); + throw new CoordinatorLoader.UnknownRecordTypeException(recordType); } }