Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18308; Update CoordinatorSerde #18455

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class CoordinatorRecordSerde implements Serializer<CoordinatorRe
@Override
public byte[] serializeKey(CoordinatorRecord record) {
// Record does not accept a null key.
return MessageUtil.toVersionPrefixedBytes(
return MessageUtil.toCoordinatorTypePrefixedBytes(
AndrewJSchofield marked this conversation as resolved.
Show resolved Hide resolved
record.key().version(),
record.key().message()
);
Expand Down Expand Up @@ -106,10 +106,10 @@ private void readMessage(ApiMessage message, ByteBuffer buffer, short version, S
* Concrete child class must provide implementation which returns appropriate
* type of {@link ApiMessage} objects representing the key.
*
* @param recordVersion - short representing version
* @param recordType - short representing type
* @return ApiMessage object
*/
protected abstract ApiMessage apiMessageKeyFor(short recordVersion);
protected abstract ApiMessage apiMessageKeyFor(short recordType);

/**
* Concrete child class must provide implementation which returns appropriate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ private void generate() {
buffer.printf("%n");
generateFromApiKey();
buffer.printf("%n");
generateNewRecordKey();
buffer.printf("%n");
generateNewRecordValue();
buffer.printf("%n");
generateAccessor("id", "short");
buffer.printf("%n");
generateAccessor("lowestSupportedVersion", "short");
Expand Down Expand Up @@ -171,7 +175,57 @@ private void generateFromApiKey() {
buffer.printf("default:%n");
buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Unknown metadata id \"" +
buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" +
" + id);%n");
buffer.decrementIndent();
buffer.decrementIndent();
buffer.printf("}%n");
buffer.decrementIndent();
buffer.printf("}%n");
}

private void generateNewRecordKey() {
headerGenerator.addImport(MessageGenerator.API_MESSAGE_CLASS);
buffer.printf("public ApiMessage newRecordKey() {%n");
buffer.incrementIndent();
buffer.printf("switch (id) {%n");
buffer.incrementIndent();
for (Map.Entry<Short, CoordinatorRecord> entry : records.entrySet()) {
buffer.printf("case %d:%n", entry.getKey());
buffer.incrementIndent();
buffer.printf("return new %s();%n",
MessageGenerator.capitalizeFirst(entry.getValue().key.name()));
buffer.decrementIndent();
}
buffer.printf("default:%n");
buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" +
" + id);%n");
buffer.decrementIndent();
buffer.decrementIndent();
buffer.printf("}%n");
buffer.decrementIndent();
buffer.printf("}%n");
}

private void generateNewRecordValue() {
headerGenerator.addImport(MessageGenerator.API_MESSAGE_CLASS);
buffer.printf("public ApiMessage newRecordValue() {%n");
buffer.incrementIndent();
buffer.printf("switch (id) {%n");
buffer.incrementIndent();
for (Map.Entry<Short, CoordinatorRecord> entry : records.entrySet()) {
buffer.printf("case %d:%n", entry.getKey());
buffer.incrementIndent();
buffer.printf("return new %s();%n",
MessageGenerator.capitalizeFirst(entry.getValue().value.name()));
buffer.decrementIndent();
}
buffer.printf("default:%n");
buffer.incrementIndent();
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
buffer.printf("throw new UnsupportedVersionException(\"Unknown record id \"" +
" + id);%n");
buffer.decrementIndent();
buffer.decrementIndent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,138 +16,31 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;

/**
* Please ensure any new record added here stays in sync with DumpLogSegments.
*/
public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde {
// This method is temporary until the share coordinator is converted to
// using the new coordinator records.
@Override
public byte[] serializeKey(CoordinatorRecord record) {
// Record does not accept a null key.
return MessageUtil.toCoordinatorTypePrefixedBytes(
record.key().version(),
record.key().message()
);
}

@Override
protected ApiMessage apiMessageKeyFor(short recordVersion) {
switch (recordVersion) {
case 0:
case 1:
return new OffsetCommitKey();
case 2:
return new GroupMetadataKey();
case 3:
return new ConsumerGroupMetadataKey();
case 4:
return new ConsumerGroupPartitionMetadataKey();
case 5:
return new ConsumerGroupMemberMetadataKey();
case 6:
return new ConsumerGroupTargetAssignmentMetadataKey();
case 7:
return new ConsumerGroupTargetAssignmentMemberKey();
case 8:
return new ConsumerGroupCurrentMemberAssignmentKey();
case 9:
return new ShareGroupPartitionMetadataKey();
case 10:
return new ShareGroupMemberMetadataKey();
case 11:
return new ShareGroupMetadataKey();
case 12:
return new ShareGroupTargetAssignmentMetadataKey();
case 13:
return new ShareGroupTargetAssignmentMemberKey();
case 14:
return new ShareGroupCurrentMemberAssignmentKey();
case 15:
return new ShareGroupStatePartitionMetadataKey();
case 16:
return new ConsumerGroupRegularExpressionKey();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
protected ApiMessage apiMessageKeyFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
}
}

@Override
protected ApiMessage apiMessageValueFor(short recordVersion) {
switch (recordVersion) {
case 0:
case 1:
return new OffsetCommitValue();
case 2:
return new GroupMetadataValue();
case 3:
return new ConsumerGroupMetadataValue();
case 4:
return new ConsumerGroupPartitionMetadataValue();
case 5:
return new ConsumerGroupMemberMetadataValue();
case 6:
return new ConsumerGroupTargetAssignmentMetadataValue();
case 7:
return new ConsumerGroupTargetAssignmentMemberValue();
case 8:
return new ConsumerGroupCurrentMemberAssignmentValue();
case 9:
return new ShareGroupPartitionMetadataValue();
case 10:
return new ShareGroupMemberMetadataValue();
case 11:
return new ShareGroupMetadataValue();
case 12:
return new ShareGroupTargetAssignmentMetadataValue();
case 13:
return new ShareGroupTargetAssignmentMemberValue();
case 14:
return new ShareGroupCurrentMemberAssignmentValue();
case 15:
return new ShareGroupStatePartitionMetadataValue();
case 16:
return new ConsumerGroupRegularExpressionValue();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
try {
return CoordinatorRecordType.fromId(recordVersion).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,9 @@
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -245,23 +216,9 @@ public void testDeserializeWithInvalidValueBytes() {

@Test
public void testDeserializeAllRecordTypes() {
roundTrip((short) 0, new OffsetCommitKey(), new OffsetCommitValue());
roundTrip((short) 1, new OffsetCommitKey(), new OffsetCommitValue());
roundTrip((short) 2, new GroupMetadataKey(), new GroupMetadataValue());
roundTrip((short) 3, new ConsumerGroupMetadataKey(), new ConsumerGroupMetadataValue());
roundTrip((short) 4, new ConsumerGroupPartitionMetadataKey(), new ConsumerGroupPartitionMetadataValue());
roundTrip((short) 5, new ConsumerGroupMemberMetadataKey(), new ConsumerGroupMemberMetadataValue());
roundTrip((short) 6, new ConsumerGroupTargetAssignmentMetadataKey(), new ConsumerGroupTargetAssignmentMetadataValue());
roundTrip((short) 7, new ConsumerGroupTargetAssignmentMemberKey(), new ConsumerGroupTargetAssignmentMemberValue());
roundTrip((short) 8, new ConsumerGroupCurrentMemberAssignmentKey(), new ConsumerGroupCurrentMemberAssignmentValue());
roundTrip((short) 9, new ShareGroupPartitionMetadataKey(), new ShareGroupPartitionMetadataValue());
roundTrip((short) 10, new ShareGroupMemberMetadataKey(), new ShareGroupMemberMetadataValue());
roundTrip((short) 11, new ShareGroupMetadataKey(), new ShareGroupMetadataValue());
roundTrip((short) 12, new ShareGroupTargetAssignmentMetadataKey(), new ShareGroupTargetAssignmentMetadataValue());
roundTrip((short) 13, new ShareGroupTargetAssignmentMemberKey(), new ShareGroupTargetAssignmentMemberValue());
roundTrip((short) 14, new ShareGroupCurrentMemberAssignmentKey(), new ShareGroupCurrentMemberAssignmentValue());
roundTrip((short) 15, new ShareGroupStatePartitionMetadataKey(), new ShareGroupStatePartitionMetadataValue());
roundTrip((short) 16, new ConsumerGroupRegularExpressionKey(), new ConsumerGroupRegularExpressionValue());
for (CoordinatorRecordType record : CoordinatorRecordType.values()) {
roundTrip(record.id(), record.newRecordKey(), record.newRecordValue());
}
}

private void roundTrip(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,28 @@

package org.apache.kafka.coordinator.share;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;

public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde {
@Override
protected ApiMessage apiMessageKeyFor(short recordVersion) {
switch (recordVersion) {
case 0:
return new ShareSnapshotKey();
case 1:
return new ShareUpdateKey();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
protected ApiMessage apiMessageKeyFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
}
}

@Override
protected ApiMessage apiMessageValueFor(short recordVersion) {
switch (recordVersion) {
case 0:
return new ShareSnapshotValue();
case 1:
return new ShareUpdateValue();
default:
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
try {
return CoordinatorRecordType.fromId(recordVersion).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -199,8 +197,9 @@ public void testDeserializeWithInvalidValueBytes() {

@Test
public void testDeserializeAllRecordTypes() {
roundTrip((short) 0, new ShareSnapshotKey(), new ShareSnapshotValue());
roundTrip((short) 1, new ShareUpdateKey(), new ShareUpdateValue());
for (CoordinatorRecordType record : CoordinatorRecordType.values()) {
roundTrip(record.id(), record.newRecordKey(), record.newRecordValue());
}
}

private void roundTrip(
Expand Down
Loading
Loading