Skip to content

Commit

Permalink
Merge pull request #226 from SolaceProducts/crushton/DATAGO-89835
Browse files Browse the repository at this point in the history
DATAGO-89835: Public DC C-EMA config push
  • Loading branch information
CameronRushton authored Jan 31, 2025
2 parents 3b74e50 + fdbaedc commit a8c0b75
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void handleError(Exception e, CommandMessage message) {
@SuppressWarnings("PMD")
private void configPush(CommandRequest request) {
List<Path> executionLogFilesToClean = new ArrayList<>();
boolean attacheErrorToTerraformCommand = false;
boolean attachErrorToTerraformCommand = false;
try {
// if the serviceId is not found, messagingServiceDelegateService will most likely throw an exception (which is not guaranteed
// based on the interface definition) and we need to catch it here.
Expand All @@ -129,13 +129,14 @@ private void configPush(CommandRequest request) {
boolean exitEarlyOnFailedCommand = bundle.getExitOnFailure();
// For now everything is run serially
for (Command command : bundle.getCommands()) {
attacheErrorToTerraformCommand = false;
attachErrorToTerraformCommand = false;
if (command.getCommandType() == semp) {
executeSempCommand(command, solaceClient);
} else if (command.getCommandType() == terraform) {
attacheErrorToTerraformCommand = true;
attachErrorToTerraformCommand = true;
Path executionLog = executeTerraformCommand(request, command, envVars);
if (executionLog != null) {
// Only stream logs for self-managed EMAs not in standalone mode
if (commandLogStreamingProcessorOpt.isPresent()) {
streamCommandExecutionLogToEpCore(request, command, executionLog);
}
Expand All @@ -152,7 +153,7 @@ private void configPush(CommandRequest request) {
}
} catch (Exception e) {
log.error("ConfigPush command not executed successfully", e);
attachErrorLogToCommand(attacheErrorToTerraformCommand, e, request);
attachErrorLogToCommand(attachErrorToTerraformCommand, e, request);
} finally {
try {
finalizeAndSendResponse(request);
Expand Down Expand Up @@ -204,19 +205,19 @@ private void attachErrorLogToCommand(boolean isTerraformRelatedError, Exception
}
}

private void finalizeAndSendResponse(CommandRequest request) {
request.determineStatus();
private void finalizeAndSendResponse(CommandRequest requestBO) {
requestBO.determineStatus();
Map<String, String> topicVars = Map.of(
"orgId", eventPortalProperties.getOrganizationId(),
"orgId", requestBO.getOrgId(),
"runtimeAgentId", eventPortalProperties.getRuntimeAgentId(),
COMMAND_CORRELATION_ID, request.getCommandCorrelationId()
COMMAND_CORRELATION_ID, requestBO.getCommandCorrelationId()
);
CommandMessage response = new CommandMessage(request.getServiceId(),
request.getCommandCorrelationId(),
request.getContext(),
request.getStatus(),
request.getCommandBundles());
response.setOrgId(eventPortalProperties.getOrganizationId());
CommandMessage response = new CommandMessage(requestBO.getServiceId(),
requestBO.getCommandCorrelationId(),
requestBO.getContext(),
requestBO.getStatus(),
requestBO.getCommandBundles());
response.setOrgId(requestBO.getOrgId());
response.setTraceId(MDC.get(TRACE_ID));
response.setActorId(MDC.get(ACTOR_ID));
commandPublisher.sendCommandResponse(response, topicVars);
Expand All @@ -225,13 +226,12 @@ private void finalizeAndSendResponse(CommandRequest request) {
Timer jobCycleTime = Timer
.builder(MAAS_EMA_CONFIG_PUSH_EVENT_CYCLE_TIME)
.tag(ORG_ID_TAG, response.getOrgId())
.tag(STATUS_TAG, request.getStatus().name())
.tag(STATUS_TAG, requestBO.getStatus().name())
.register(meterRegistry);
jobCycleTime.record(request.getLifetime(ChronoUnit.MILLIS), TimeUnit.MILLISECONDS);
jobCycleTime.record(requestBO.getLifetime(ChronoUnit.MILLIS), TimeUnit.MILLISECONDS);
}

private Path executeTerraformCommand(CommandRequest request, Command command, Map<String, String> envVars) {
Path executionLog = null;
try {
Validate.isTrue(command.getCommandType().equals(terraform), "Command type must be terraform");
return terraformManager.execute(request, command, envVars);
Expand All @@ -240,7 +240,7 @@ private Path executeTerraformCommand(CommandRequest request, Command command, Ma
setCommandError(command, e);

}
return executionLog;
return null;
}

private void executeSempCommand(Command command, SolaceHttpSemp solaceClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.solace.messaging.publisher.DirectMessagePublisher;
import com.solace.messaging.publisher.OutboundMessageBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -48,14 +49,19 @@ public SolaceConfiguration(Properties vmrConfig, ArrayList<String> sessionConfig
this.eventPortalProperties = eventPortalProperties;
}

public String getTopicPrefix() {

if (topicPrefix == null) {
topicPrefix = String.format(TOPIC_PREFIX_FORMAT,
eventPortalProperties.getOrganizationId(),
eventPortalProperties.getRuntimeAgentId());
public String getTopicPrefix(String orgId) {
if (StringUtils.isEmpty(orgId)) {
log.debug("Attempted to get topic prefix with empty orgId. Defaulting to application properties org ID {}",
eventPortalProperties.getOrganizationId());
orgId = eventPortalProperties.getOrganizationId();
}
return topicPrefix;
return String.format(TOPIC_PREFIX_FORMAT,
orgId,
eventPortalProperties.getRuntimeAgentId());
}

public String getTopicPrefix() {
return getTopicPrefix(null);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public CommandLogStreamingProcessor(CommandLogsPublisher commandLogsPublisher,
this.objectMapper = objectMapper;
}


public void streamLogsToEP(CommandRequest request, Command executedCommand, Path commandExecutionLog) {

log.info("Streaming logs to EP for command {} with commandCorrelationId {} for orgId {}", executedCommand.getCommand(),
request.getCommandCorrelationId(), request.getOrgId());
if (executedCommand.getIgnoreResult()) {
log.debug("Skipping log streaming to ep for command {} with commandCorrelationId {} as ignoreResult is set to true",
executedCommand.getCommand(), request.getCommandCorrelationId());
Expand Down Expand Up @@ -81,7 +81,7 @@ public void streamLogsToEP(CommandRequest request, Command executedCommand, Path

})
.map(strLog -> toCommandLogMessage(
eventPortalProperties.getOrganizationId(),
request.getOrgId(),
strLog,
request.getCommandCorrelationId(),
eventPortalProperties.getRuntimeAgentId()
Expand All @@ -92,7 +92,8 @@ public void streamLogsToEP(CommandRequest request, Command executedCommand, Path
log -> sendLogToEpCore(
log,
request.getCommandCorrelationId(),
request.getServiceId()
request.getServiceId(),
request.getOrgId()
)
);
}
Expand All @@ -111,10 +112,11 @@ private List<String> readAllExecutionLogs(Path path) {

private void sendLogToEpCore(CommandLogMessage logDataMessage,
String commandCorrelationId,
String messagingServiceId) {
String messagingServiceId,
String organizationId) {
try {
Map<String, String> topicDetails = new HashMap<>();
topicDetails.put("orgId", eventPortalProperties.getOrganizationId());
topicDetails.put("orgId", organizationId);
topicDetails.put("runtimeAgentId", eventPortalProperties.getRuntimeAgentId());
topicDetails.put("messagingServiceId", messagingServiceId);
topicDetails.put(COMMAND_CORRELATION_ID, commandCorrelationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ public CommandLogsPublisher(SolacePublisher solacePublisher, SolaceConfiguration
}

public void sendCommandLogData(CommandLogMessage message, Map<String, String> topicDetails) {
String topicString = solaceConfiguration.getTopicPrefix() +
String topicString = solaceConfiguration.getTopicPrefix(topicDetails.get("orgId")) +
String.format("commandLogs/v1/%s", topicDetails.get(Command.COMMAND_CORRELATION_ID));

solacePublisher.publish(message, topicString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import com.solace.maas.ep.event.management.agent.constants.Command;
import com.solace.maas.ep.event.management.agent.plugin.mop.MOPMessage;
import com.solace.maas.ep.event.management.agent.plugin.publisher.SolacePublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
@ConditionalOnProperty(name = "event-portal.gateway.messaging.standalone", havingValue = "false")
public class CommandPublisher {
Expand All @@ -27,14 +29,11 @@ public CommandPublisher(SolacePublisher solacePublisher, SolaceConfiguration sol
* The topic for command response:
* sc/ep/runtime/{orgId}/{runtimeAgentId}/commandResponse/v1/{commandCorrelationId}
*/

public void sendCommandResponse(MOPMessage message, Map<String, String> topicDetails) {

String topicString =
String.format("%scommandResponse/v1/%s",
solaceConfiguration.getTopicPrefix(),
solaceConfiguration.getTopicPrefix(topicDetails.get("orgId")),
topicDetails.get(Command.COMMAND_CORRELATION_ID));

solacePublisher.publish(message, topicString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public CommandMessageHandler(
SolaceSubscriber solaceSubscriber,
CommandMessageProcessor commandMessageProcessor) {
super(solaceConfiguration.getTopicPrefix() + "command/v1/>", solaceSubscriber);
log.debug("CommandMessageHandler created with subscription {}", getTopicString());
this.commandMessageProcessor = commandMessageProcessor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public CommandMessageProcessor(CommandManager commandManager,

@Override
public void processMessage(CommandMessage message) {
log.info("Config push command processor started. context={} actorId={} ", message.getContext(), message.getActorId());
log.info("Config push command processor started. context={} orgId={} actorId={} ",
message.getContext(), message.getOrgId(), message.getActorId());
logConfigPushMetric(message);
if (CollectionUtils.isNotEmpty(message.getResources())) {
dynamicResourceConfigurationHelper.loadSolaceBrokerResourceConfigurations(message.getResources());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.solace.maas.ep.event.management.agent.plugin.terraform.manager.TerraformManager;
import com.solace.maas.ep.event.management.agent.publisher.CommandPublisher;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -28,6 +27,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -89,6 +89,28 @@ void setUp() {
responseCaptor = ArgumentCaptor.forClass(CommandMessage.class);
}

@Test
void orgIdFromMessageIsUsedThroughout(@TempDir Path basePath) {
// We're not going to need this in the flow, so to ensure it's not used, we'll set it to null
eventPortalProperties.setOrganizationId(null);
doAnswer((Answer<Path>) invocation -> {
Command command = invocation.getArgument(1);
return CommandManagerTestHelper.setCommandStatusAndReturnExecutionLog(command, JobStatus.success, true, basePath);
}).when(terraformManager).execute(any(), any(), any());

commandManager.execute(message);

// Wait for the command thread to complete
await().atMost(5, TimeUnit.SECONDS).until(() -> CommandManagerTestHelper.verifyCommandPublisherIsInvoked(commandPublisher, 1));

verify(commandPublisher, times(1)).sendCommandResponse(responseCaptor.capture(), topicArgCaptor.capture());
Map<String, String> topicVariables = topicArgCaptor.getValue();
CommandMessage mopMessageResponse = responseCaptor.getValue();
// With this, we know the input mop message has orgId set, and it's used until the end of the flow.
assertThat(mopMessageResponse.getOrgId()).isEqualTo("myOrg123");
assertThat(topicVariables).containsEntry("orgId", mopMessageResponse.getOrgId());
}

@Test
void noLogsStreamingToEP(@TempDir Path basePath) {
doAnswer((Answer<Path>) invocation -> {
Expand All @@ -109,7 +131,7 @@ void noLogsStreamingToEP(@TempDir Path basePath) {

//Logs will be cleaned up anyway
verify(commandManager, times(1)).deleteExecutionLogFiles(executionLogFileCaptor.capture());
Assertions.assertThat(executionLogFileCaptor.getValue())
assertThat(executionLogFileCaptor.getValue())
.containsExactlyInAnyOrder(
basePath.resolve("apply"),
basePath.resolve("write_HCL"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class CommandRequest {
private List<CommandBundle> commandBundles;
private Instant createdTime;
private Instant updatedTime;
private String orgId;

public long getLifetime(TemporalUnit timeUnit) {
if (createdTime == null) {
Expand Down

0 comments on commit a8c0b75

Please sign in to comment.