-
Notifications
You must be signed in to change notification settings - Fork 366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-894] Add checksum for shuffle data #2979
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @jiang13021
Thanks for your contribution!
I have two questions, PTAL.
@@ -79,6 +76,7 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl { | |||
private ConcurrentHashMap<String, TransportClient> currentClient = | |||
JavaUtils.newConcurrentHashMap(); | |||
private long driverTimestamp; | |||
private final int BATCH_HEADER_SIZE = 4 * 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this feature currently support only Spark? Will flink be supported in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Currently, this feature does not support Flink, but I will submit another PRs in the future to add support. The current changes should be compatible with Flink.
@@ -293,14 +293,15 @@ public void flush(boolean finalFlush, boolean fromEvict) throws IOException { | |||
// read flush buffer to generate correct chunk offsets | |||
// data header layout (mapId, attemptId, nextBatchId, length) | |||
if (numBytes > chunkSize) { | |||
ByteBuffer headerBuf = ByteBuffer.allocate(16); | |||
ByteBuffer headerBuf = ByteBuffer.allocate(PushDataHeaderUtils.BATCH_HEADER_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a worker receives a data buffer from the an old version client or not supported engine client, will this modification yield accurate results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a worker receives a data buffer from an old version client, we will check the checksum flag in the highest bit of batchId for compatibility. Similarly, if the data comes from an unsupported engine's client but follows the old version data header layout, it will also be compatible.
...common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
Outdated
Show resolved
Hide resolved
@@ -79,6 +76,7 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl { | |||
private ConcurrentHashMap<String, TransportClient> currentClient = | |||
JavaUtils.newConcurrentHashMap(); | |||
private long driverTimestamp; | |||
private final int BATCH_HEADER_SIZE = 4 * 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
} | ||
|
||
public static int getLength(byte[] data) { | ||
return Platform.getInt(data, LENGTH_OFFSET) - 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause errors if an old client reads from Celeborn workers with this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, we should think about how maintain compatibility with older versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FMX @zwangsheng Thank you for reviving reviewing this, we will check the checksum flag in the highest bit of batchId for compatibility. I have update the code here, please take another look.
public static final int LENGTH_OFFSET = Platform.BYTE_ARRAY_OFFSET + 12; | ||
public static final int CHECKSUM_OFFSET = Platform.BYTE_ARRAY_OFFSET + 16; | ||
public static final int POSITIVE_MASK = 0x7FFFFFFF; | ||
public static final int HIGHEST_1_BIT_FLAG_MASK = 0x80000000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this design that the batchId can not be a negative number. But this class needs to be compatible with the old clients. During cluster upgrade, there will be a moment when old clients are talking to new servers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worker will check the checksum flag in the highest bit of batchId for compatibility.
public static int computeHeaderChecksum32(byte[] data) { | ||
assert data.length >= BATCH_HEADER_SIZE_WITHOUT_CHECKSUM; | ||
CRC32 crc32 = new CRC32(); | ||
crc32.update(data, 0, BATCH_HEADER_SIZE_WITHOUT_CHECKSUM); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although 16-byte calculation for CRC can be trivial, there can be enormous push data structures to handle for a Celeborn Worker. I think it would be better to add a switch to let the users enable or disable this feature. For some users, the never meet any issues about this, they can just disable this feature to save the CPU.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have received some reports from users telling me that Celeborn workers are consuming too much CPU during rush hours. If this feature is on by default, this will surely get things worse.
You can make this config a client-side control config and pass it in the ReserveSlots request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will add a config this week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FMX Hi, I have added a config to disable header checksum, PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CRC32 is not the fastest choice, I haven't gone through the whole design, and not sure if the algo can be configured like Spark's impl.
apache/spark#47929 apache/spark#49258
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiang13021 Seems CRC32C is an optimized version, I think this can do done in following pr, WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RexXiong the current design does not have extra space to carry the algorithm type, we should choose the most efficient and future-proof one.
BTW, I found Kafka also use CRC32C in Message format v2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To answer @pan3793's query - there are a bunch of impl's which are pretty fast, and yet give good error detection. Adler32 is not as robust from pov of error detection IIRC - murmurhash or xxhash are good candidates.
From my understanding, this is primarily just for header and not data, right ? In which case, initialization overhead, cost would be dimensions to consider - especially pure java based impl's might help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is primarily just for header and not data, right ?
the method name is misleading, it calculates data checksum and stores the result in the header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...er/src/test/scala/org/apache/celeborn/service/deploy/cluster/PushDataWithChecksumSuite.scala
Outdated
Show resolved
Hide resolved
...er/src/test/scala/org/apache/celeborn/service/deploy/cluster/PushDataWithChecksumSuite.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
public static int getLength(byte[] data) { | ||
return Platform.getInt(data, LENGTH_OFFSET) - 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, we should think about how maintain compatibility with older versions.
@@ -1470,7 +1470,24 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler | |||
shuffleKey: String, | |||
index: Int): Unit = { | |||
try { | |||
fileWriter.write(body) | |||
val header = new Array[Byte](PushDataHeaderUtils.BATCH_HEADER_SIZE_WITHOUT_CHECKSUM) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could consider implementing a new method for verifying data checksums. If a checksum validation fails, we would throw a CelebornChecksumException. This approach would easier for ut testing. Additionally, we can handle this exception within the catch block alongside other exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, thank you.
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
2eef55a
to
f719b49
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately I have not been able to do a very close read of the PR, but trying to understand - is this not an incompatible change which will require all clients and server side to be upgraded in lock step ?
The reason for query - when @otterc and I were initially designing the TLS/authn, we had discussed in depth whether we should have a 'declared features' mechanism - for clients and servers to advertise and negotiate what is supported : given what we observed to be inflexibility with other shuffle protocols.
We did not add it at that time for Celeborn given the complexity (and lack of immediate need) - but I am wondering if we are having more usecases to be supported.
@mridulm the current impl allows the old client to communicate with the newer server, because it uses the highest bit of an int in the chuck header(which represents the data size, so the highest bit always is zero) to indicate whether the checksum is enabled, so old clients always perform as checksum disabled. this design is backward compatible and efficient, but also means we can not ship checksum algo info, so choosing an efficient algo is also important. |
After a deep check Flink's implementation, I found that the header/data written by Flink is in big-endian format, while the header written by Spark using Platform(Unsafe) may be in little-endian format. This could lead to compatibility issues when the server reads the header in the same way. Possible solutions include selecting the appropriate way to read the data based on the PartitionType or adding a new field, byteOrder, to explicitly inform the server of the endianness used in writing. Personally I prefer the first solution. also cc @reswqa @codenohup |
read data based on |
@pan3793 thanks for clarifying, will go over the PR later this week - have been a bit swamped lately. |
What changes were proposed in this pull request?
Why are the changes needed?
Sometimes, the transmitted data may change, leading to errors. Therefore, it is necessary to add a checksum to detect this exception. I only added a checksum to the header because I believe that in a production environment, the data body is mostly compressed, and during compression, a checksum is typically generated. Therefore, in scenarios where compression is enabled, adding a checksum to the header is enough.
Does this PR introduce any user-facing change?
No, this change is compatible.
How was this patch tested?
unit test: org.apache.celeborn.service.deploy.cluster.PushDataWithChecksumSuite