Skip to content

Commit

Permalink
added logs
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsilaghi committed Feb 23, 2024
1 parent 6225855 commit 0b672c6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ public class PulsarCommandExecutor<Q extends Request<R>, R extends Response> imp
@Value("${webprotege.pulsar.tenant}")
private String tenant;

private String replySubscriptionName;


public PulsarCommandExecutor(Class<R> responseClass) {
this.responseClass = responseClass;
Expand All @@ -82,6 +80,10 @@ public CompletableFuture<R> execute(Q request, ExecutionContext executionContext
var correlationId = UUID.randomUUID().toString();
var replyFuture = new CompletableFuture<R>();
replyHandlers.put(correlationId, replyFuture);
logger.info("ALEX avem consumer si producers. " +
"Consumer pe topic {} iar producer pe topic {} si correlation id la mesaj {}.\n" +
" reply handlers are dimensiunea {} si chei are {} ",
consumer.getTopic(), producer.getTopic(), correlationId, replyHandlers.size(), replyHandlers.keySet());
var messageBuilder = producer.newMessage()
.value(json)
.property(Headers.CORRELATION_ID, correlationId)
Expand Down Expand Up @@ -121,6 +123,8 @@ private synchronized Producer<byte[]> getProducer(Q request) {
var topicName = "persistent://" + tenant + "/" + COMMAND_REQUESTS + "/" + requestChannel;
// TODO: Consider exposing settings as configuration properties
var producerName = applicationName + "--CommandExecutor--" + request.getChannel();
logger.info("ALEX post to topic {} and producerName {}", topicName, producerName);

return producer = pulsarClient.newProducer()
.producerName(producerName)
.topic(topicName)
Expand All @@ -147,9 +151,9 @@ private void ensureConsumerIsListeningForRepliesToRequest(Q request) {
// Replies need to go to all instances of our application/service. In other words we have a pub/sub
// situation. In this case we need unique subscription names with exclusive subscriptions for each
// consumer.
replySubscriptionName = applicationName + "--" + replyChannel + "--" + UUID.randomUUID();
String replySubscriptionName = applicationName + "--" + replyChannel + "--" + UUID.randomUUID();
logger.info("Setting up consumer with subscription {} to listen for replies at {}",
replySubscriptionName,
replySubscriptionName,
replyTopic);
consumer = pulsarClient.newConsumer()
.subscriptionName(replySubscriptionName)
Expand All @@ -169,6 +173,8 @@ private void handleReplyMessageReceived(Consumer<byte[]> consumer, Message<byte[
logger.info("CorrelationId in reply message is missing. Cannot handle reply. Ignoring reply.");
return;
}
logger.info("Alex a venit raspunsul pe correlationId {}", correlationId);

var error = msg.getProperty(Headers.ERROR);
if (error != null) {
var executionException = objectMapper.readValue(error, CommandExecutionException.class);
Expand All @@ -179,6 +185,7 @@ private void handleReplyMessageReceived(Consumer<byte[]> consumer, Message<byte[
else {
var replyHandler = replyHandlers.remove(correlationId);
var response = objectMapper.readValue(msg.getData(), responseClass);
logger.info("ALEX raspund la reply handler {} cu response {}", replyHandler, response);
consumer.acknowledge(msg);
replyHandler.complete(response);
}
Expand All @@ -188,6 +195,9 @@ private void handleReplyMessageReceived(Consumer<byte[]> consumer, Message<byte[
} catch (IOException e) {
logger.error("Cannot deserialize reply message on topic {}", consumer.getTopic(), e);
consumer.negativeAcknowledge(msg);
} catch (Exception e) {
logger.error("Unknown error {}", consumer.getTopic(), e);
consumer.negativeAcknowledge(msg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private void parseAndHandleRequest(Consumer<byte[]> consumer,
try {
var payload = message.getData();
var request = objectMapper.readValue(payload, handler.getRequestClass());
logger.info("ALEX correlationID {} handler {}" ,correlationId, handler.getClass());
// The request has successfully been read. All required headers are present and the request body
// is well-formed so acknowledge the request (i.e. it shouldn't be dead-lettered)
consumer.acknowledgeAsync(message);
Expand Down Expand Up @@ -227,7 +228,7 @@ private void handleAndReplyToRequest(String replyChannel, String correlationId,
var response = handler.handleRequest(request, executionContext);
response.subscribe(r -> {
replyWithSuccessResponse(replyChannel, correlationId, userId, r);
logger.info("Sent reply to {}", replyChannel);
logger.info("Sent reply correlationId {} to {}",correlationId, replyChannel);
}, throwable -> {
if (throwable instanceof CommandExecutionException ex) {
logger.info(
Expand Down Expand Up @@ -264,29 +265,36 @@ private void replyWithInternalServerError(String replyChannel, String correlatio
* @param status The status that describes the error
*/
private void replyWithErrorResponse(String replyChannel, String correlationId, String userId, HttpStatus status) {
var replyTopicUrl = getReplyTopicUrl(replyChannel);
var replyProducer = producersManager.getProducer(replyTopicUrl);
var executionException = new CommandExecutionException(status);
var value = serializeCommandExecutionException(executionException);
replyProducer.newMessage()
.property(Headers.CORRELATION_ID, correlationId)
.property(USER_ID, userId)
.property(ERROR, value)
.sendAsync();
try {
var replyTopicUrl = getReplyTopicUrl(replyChannel);
var replyProducer = producersManager.getProducer(replyTopicUrl);
var executionException = new CommandExecutionException(status);
var value = serializeCommandExecutionException(executionException);
replyProducer.newMessage()
.property(Headers.CORRELATION_ID, correlationId)
.property(USER_ID, userId)
.property(ERROR, value)
.send();
} catch (Exception e){
logger.error("Am erroare ", e);
}
}

private void replyWithSuccessResponse(String replyChannel, String correlationId, String userId, R response) {
try {
var topicUrl = getReplyTopicUrl(replyChannel);
var producer = producersManager.getProducer(topicUrl);
var value = objectMapper.writeValueAsBytes(response);
logger.info("ALEX reply correlationId {} pe topic {} cu {}",correlationId, producer.getTopic(), objectMapper.writeValueAsString(response));
producer.newMessage()
.property(Headers.CORRELATION_ID, correlationId)
.property(USER_ID, userId)
.value(value)
.sendAsync();
.send();
} catch (JsonProcessingException e) {
replyWithErrorResponse(replyChannel, correlationId, userId, HttpStatus.INTERNAL_SERVER_ERROR);
} catch (Exception e){
logger.error("Am erroare ", e);
}
}

Expand All @@ -304,7 +312,9 @@ private String getConsumerName(CommandHandler<?, ?> handler) {

private String getRequestsTopicUrl(CommandHandler<?, ?> handler) {
var channelName = handler.getChannelName();
return tenant + "/" + PulsarNamespaces.COMMAND_REQUESTS + "/" + channelName;
String response = tenant + "/" + PulsarNamespaces.COMMAND_REQUESTS + "/" + channelName;
logger.info("ALEX getRequestsTopicUrl " + response);
return response;
}

private String serializeCommandExecutionException(CommandExecutionException exception) {
Expand Down

0 comments on commit 0b672c6

Please sign in to comment.