diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java index 5d4a77ff78..3fb5ea2b74 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java @@ -67,7 +67,7 @@ public static CloudEvent convertRecordToEvent(ConnectRecord connectRecord) { public static ConnectRecord convertEventToRecord(CloudEvent event) { byte[] body = Objects.requireNonNull(event.getData()).toBytes(); LogUtil.info(log, "handle receive events {}", () -> new String(event.getData().toBytes(), Constants.DEFAULT_CHARSET)); - // todo: recordPartition & recordOffset + ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis(), body); for (String extensionName : event.getExtensionNames()) { connectRecord.addExtension(extensionName, Objects.requireNonNull(event.getExtension(extensionName)).toString()); diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java index 682205c4a6..76931d85a9 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/storage/OffsetStorageWriterImpl.java @@ -53,9 +53,12 @@ public OffsetStorageWriterImpl(String connectorName, OffsetManagementService off } @Override - public void writeOffset(RecordPartition partition, RecordOffset position) { - ConnectorRecordPartition extendRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartition()); - data.put(extendRecordPartition, position); + public void writeOffset(RecordPartition partition, RecordOffset offset) { + ConnectorRecordPartition extendRecordPartition; + if (partition != null) { + extendRecordPartition = new ConnectorRecordPartition(connectorName, partition.getPartition()); + data.put(extendRecordPartition, offset); + } } /**