diff --git a/client/src/containers/Topic/Topic/TopicData/TopicData.jsx b/client/src/containers/Topic/Topic/TopicData/TopicData.jsx
index 25e9b612d..4739b723d 100644
--- a/client/src/containers/Topic/Topic/TopicData/TopicData.jsx
+++ b/client/src/containers/Topic/Topic/TopicData/TopicData.jsx
@@ -452,7 +452,7 @@ class TopicData extends Root {
timestamp: message.timestamp,
partition: JSON.stringify(message.partition) || '',
offset: JSON.stringify(message.offset) || '',
- headers: message.headers || {},
+ headers: message.headers || [],
schema: { key: message.keySchemaId, value: message.valueSchemaId },
exceptions: message.exceptions || []
};
@@ -1018,7 +1018,7 @@ class TopicData extends Root {
type: 'text',
expand: true,
cell: obj => {
- return
{Object.keys(obj.headers).length}
;
+ return {obj.headers.length}
;
}
},
{
@@ -1078,7 +1078,7 @@ class TopicData extends Root {
}}
actions={actions}
onExpand={obj => {
- return Object.keys(obj.headers).map(header => {
+ return obj.headers.map(header => {
return (
- {header}
+ {header.key}
- {obj.headers[header]}
+ {header.value}
|
);
diff --git a/client/src/containers/Topic/TopicProduce/TopicProduce.jsx b/client/src/containers/Topic/TopicProduce/TopicProduce.jsx
index 1850d5b0c..ee6f32ace 100644
--- a/client/src/containers/Topic/TopicProduce/TopicProduce.jsx
+++ b/client/src/containers/Topic/TopicProduce/TopicProduce.jsx
@@ -185,11 +185,14 @@ class TopicProduce extends Form {
keyValueSeparator: formData.keyValueSeparator
};
- let headers = {};
+ const headers = [];
Object.keys(formData).forEach(key => {
if (key.includes('hKey')) {
let keyNumbers = key.replace(/\D/g, '');
- headers[formData[key]] = formData[`hValue${keyNumbers}`];
+ headers.push({
+ key: formData[key],
+ value: formData[`hValue${keyNumbers}`]
+ });
}
});
diff --git a/src/main/java/org/akhq/controllers/TopicController.java b/src/main/java/org/akhq/controllers/TopicController.java
index 02e42be22..c170f2687 100644
--- a/src/main/java/org/akhq/controllers/TopicController.java
+++ b/src/main/java/org/akhq/controllers/TopicController.java
@@ -145,7 +145,7 @@ public List produce(
Optional key,
Optional partition,
Optional timestamp,
- Map headers,
+ List> headers,
Optional keySchema,
Optional valueSchema,
Boolean multiMessage,
@@ -300,7 +300,7 @@ public Record deleteRecordApi(String cluster, String topicName, Integer partitio
schemaRegistryRepository.getSchemaRegistryType(cluster),
Base64.getDecoder().decode(key),
null,
- new HashMap<>(),
+ new ArrayList<>(),
topicRepository.findByName(cluster, topicName)
);
}
diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java
index 53fd6fec6..243eb1939 100644
--- a/src/main/java/org/akhq/models/Record.java
+++ b/src/main/java/org/akhq/models/Record.java
@@ -29,6 +29,7 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.*;
+import java.util.stream.Collectors;
@ToString
@EqualsAndHashCode
@@ -43,7 +44,7 @@ public class Record {
private TimestampType timestampType;
private Integer keySchemaId;
private Integer valueSchemaId;
- private Map headers = new HashMap<>();
+ private List> headers = new ArrayList<>();
@JsonIgnore
private Deserializer kafkaAvroDeserializer;
@JsonIgnore
@@ -81,7 +82,7 @@ public class Record {
private Boolean truncated;
- public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, Map headers, Topic topic) {
+ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List> headers, Topic topic) {
this.MAGIC_BYTE = schemaRegistryType.getMagicByte();
this.topic = topic;
this.partition = record.partition();
@@ -114,7 +115,8 @@ public Record(SchemaRegistryClient client, ConsumerRecord record
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
for (Header header: record.headers()) {
- this.headers.put(header.key(), header.value() != null ? new String(header.value()) : null);
+ String headerValue = header.value() != null ? new String(header.value()) : null;
+ this.headers.add(new KeyValue<>(header.key(), headerValue));
}
this.kafkaAvroDeserializer = kafkaAvroDeserializer;
@@ -264,6 +266,20 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}
}
+ public Collection getHeadersKeySet() {
+ return headers
+ .stream()
+ .map(KeyValue::getKey)
+ .collect(Collectors.toList());
+ }
+
+ public Collection getHeadersValues() {
+ return headers
+ .stream()
+ .map(KeyValue::getValue)
+ .collect(Collectors.toList());
+ }
+
private Integer getAvroSchemaId(byte[] payload) {
if (topic.isInternalTopic()) {
return null;
@@ -281,4 +297,5 @@ private Integer getAvroSchemaId(byte[] payload) {
}
return null;
}
+
}
diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java
index c00b19ab4..1c6a57720 100644
--- a/src/main/java/org/akhq/repositories/RecordRepository.java
+++ b/src/main/java/org/akhq/repositories/RecordRepository.java
@@ -481,7 +481,7 @@ public List produce(
String clusterId,
String topic,
Optional value,
- Map headers,
+ List> headers,
Optional key,
Optional partition,
Optional timestamp,
@@ -509,7 +509,7 @@ public List produce(
private RecordMetadata produce(
String clusterId,
String topic, byte[] value,
- Map headers,
+ List> headers,
byte[] key,
Optional partition,
Optional timestamp
@@ -522,8 +522,7 @@ private RecordMetadata produce(
timestamp.orElse(null),
key,
value,
- (headers == null ? ImmutableMap.of() : headers)
- .entrySet()
+ headers == null ? Collections.emptyList() : headers
.stream()
.filter(entry -> StringUtils.isNotEmpty(entry.getKey()))
.map(entry -> new RecordHeader(
@@ -592,7 +591,7 @@ public RecordMetadata produce(
String clusterId,
String topic,
Optional value,
- Map headers,
+ List> headers,
Optional key,
Optional partition,
Optional timestamp,
@@ -743,13 +742,13 @@ private static boolean searchFilter(BaseOptions options, Record record) {
}
if (options.getSearchByHeaderKey() != null) {
- if (!search(options.getSearchByHeaderKey(), record.getHeaders().keySet())) {
+ if (!search(options.getSearchByHeaderKey(), record.getHeadersKeySet())) {
return false;
}
}
if (options.getSearchByHeaderValue() != null) {
- return search(options.getSearchByHeaderValue(), record.getHeaders().values());
+ return search(options.getSearchByHeaderValue(), record.getHeadersValues());
}
}
return true;
diff --git a/src/test/java/org/akhq/controllers/TopicControllerTest.java b/src/test/java/org/akhq/controllers/TopicControllerTest.java
index d1f3193b3..861625f14 100644
--- a/src/test/java/org/akhq/controllers/TopicControllerTest.java
+++ b/src/test/java/org/akhq/controllers/TopicControllerTest.java
@@ -177,9 +177,9 @@ void produce() {
paramMap.put("value", "my-value");
paramMap.put("key", "my-key");
paramMap.put("partition", 1);
- paramMap.put("headers", ImmutableMap.of(
- "my-header-1", "1",
- "my-header-2", "2"));
+ paramMap.put("headers", List.of(
+ new KeyValue<>("my-header-1", "1"),
+ new KeyValue<>("my-header-2", "2")));
paramMap.put("multiMessage", false);
List response = this.retrieveList(HttpRequest.POST(
CREATE_TOPIC_URL + "/data", paramMap
@@ -190,7 +190,7 @@ void produce() {
assertEquals("my-value", response.get(0).getValue());
assertEquals(1, response.get(0).getPartition());
assertEquals(2, response.get(0).getHeaders().size());
- assertEquals("1", response.get(0).getHeaders().get("my-header-1"));
+ assertEquals("1", response.get(0).getHeaders().get(0).getValue());
}
@Test
diff --git a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java
index 9bf7b10f6..8d98832c6 100644
--- a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java
+++ b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java
@@ -251,7 +251,7 @@ void produceAndConsumeRecordUsingJsonSchema() throws ExecutionException, Interru
KafkaTestCluster.CLUSTER_ID,
KafkaTestCluster.TOPIC_JSON_SCHEMA,
Optional.of(recordAsJsonString),
- Collections.emptyMap(),
+ Collections.emptyList(),
Optional.of(keyJsonString),
Optional.empty(),
Optional.empty(),