Skip to content

Commit

Permalink
[INLONG-11307][Manager] Fix the problem of unable to obtain extparams…
Browse files Browse the repository at this point in the history
… from multiple data sources (#11308)
  • Loading branch information
fuweng11 authored Oct 9, 2024
1 parent efd7a23 commit 6ce73d2
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -119,6 +120,9 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
private InlongStreamEntityMapper streamMapper;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private AutowireCapableBeanFactory autowireCapableBeanFactory;
private SourceOperatorFactory operatorFactory;

/**
* Getting the source type.
Expand Down Expand Up @@ -523,7 +527,12 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {

InlongGroupEntity groupEntity = groupMapper.selectByGroupIdWithoutTenant(groupId);
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
String extParams = getExtParams(entity);
if (operatorFactory == null) {
operatorFactory = new SourceOperatorFactory();
autowireCapableBeanFactory.autowireBean(operatorFactory);
}
StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType());
String extParams = sourceOperator.getExtParams(entity);
if (groupEntity != null && streamEntity != null) {
dataConfig.setState(
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus()))
Expand Down

0 comments on commit 6ce73d2

Please sign in to comment.