Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] #103 kafka 게시물 상태변경 test #104

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ dependencies {
annotationProcessor "com.querydsl:querydsl-apt:${dependencyManagement.importedProperties['querydsl.version']}:jakarta" // querydsl JPAAnnotationProcessor 사용 지정
annotationProcessor "jakarta.annotation:jakarta.annotation-api" // java.lang.NoClassDefFoundError (javax.annotation.Generated) 대응 코드
annotationProcessor "jakarta.persistence:jakarta.persistence-api" // java.lang.NoClassDefFoundError (javax.annotation.Entity) 대응 코드

// kafka
implementation 'org.springframework.kafka:spring-kafka'
// Kafka 테스트를 위한 의존성 추가
testImplementation 'org.springframework.kafka:spring-kafka-test'

}

tasks.named('test') {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ussum.homepage.application.consumer;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import ussum.homepage.domain.event.LikeCountChangedEvent;
import ussum.homepage.domain.post.Post;
import ussum.homepage.domain.post.PostRepository;
import ussum.homepage.domain.post.exception.PostException;
import ussum.homepage.domain.post.service.PetitionPostProcessor;
import ussum.homepage.domain.postlike.service.PostReactionReader;
import ussum.homepage.infra.jpa.post.entity.Category;
import ussum.homepage.infra.utils.DateUtils;

import java.time.LocalDateTime;

import static ussum.homepage.global.error.status.ErrorStatus.POST_NOT_FOUND;

@Service
@RequiredArgsConstructor
public class PostLikeCountConsumer {

private final PostRepository postRepository;

private final PostReactionReader postReactionReader;
private final PetitionPostProcessor petitionPostProcessor;

@KafkaListener(topics = "post-like-count-changed", groupId = "post-group")
public void onLikeCountChanged(LikeCountChangedEvent event) {
Long postId = event.getPostId();
Post post = postRepository.findById(postId)
.orElseThrow(() -> new PostException(POST_NOT_FOUND));

if (post.getCategory().equals("종료됨") || post.getCategory().equals("답변완료")) {
return; // 최종 상태("종료됨", "답변완료")인 경우 상태를 업데이트하지 않음
}
Integer likeCount = postReactionReader.countPostReactionsByType(postId, "like");
petitionPostProcessor.processStatus(post, likeCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import ussum.homepage.domain.postlike.service.PostReactionManager;
import ussum.homepage.domain.postlike.service.PostReactionReader;
import ussum.homepage.domain.postlike.service.PostReactionModifier;
import ussum.homepage.infra.kafka.KafkaProducer;


import java.util.Optional;
Expand All @@ -25,6 +26,7 @@ public class PostReactionService {
private final PostReactionManager postReactionManager;
private final PostReactionReader postReactionReader;
private final PetitionPostProcessor petitionPostProcessor;
private final KafkaProducer kafkaProducer;

@Transactional
public void postReactionToggle(Long userId, Long postId, CreatePostReactionReq createPostReactionReq) {
Expand All @@ -35,7 +37,9 @@ public void postReactionToggle(Long userId, Long postId, CreatePostReactionReq c
reaction -> handleExistingReaction(reaction, newReaction),
() -> createNewReaction(newReaction)
);
petitionPostProcessor.onLikeCountChanged(postId);
// petitionPostProcessor.onLikeCountChanged(postId);
// Kafka 이벤트 발행
kafkaProducer.sendLikeCountChangedEvent(postId);
}

//기존 반응이 있고 같은 종류면 삭제
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package ussum.homepage.domain.event;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Builder
@AllArgsConstructor
@NoArgsConstructor // 기본 생성자 추가
@Getter
public class LikeCountChangedEvent {
private Long postId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void processStatus(Post post, Integer likeCount) {
private void handleInProgressStatus(Post post, Integer likeCountOfPost) {
LocalDateTime createdAt = DateUtils.parseHourMinSecFromCustomString(post.getCreatedAt());
LocalDateTime now = LocalDateTime.now();
if (now.isBefore(createdAt.plusDays(30)) && likeCountOfPost >= 100) {
if (now.isBefore(createdAt.plusDays(30)) && likeCountOfPost >= 1) {
updatePostCategoryAndOngoingStatus(post.getId(), "접수완료");
}
else if (now.isAfter(createdAt.plusDays(30))) {
Expand Down
45 changes: 45 additions & 0 deletions src/main/java/ussum/homepage/infra/config/KafkaProducerConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package ussum.homepage.infra.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import ussum.homepage.domain.event.LikeCountChangedEvent;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public ProducerFactory<String, LikeCountChangedEvent> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public NewTopic postLikeCountChangedTopic() {
return TopicBuilder.name("post-like-count-changed")
.partitions(1)
.replicas(1) // 복제 인수를 1로 설정
.build();
}

@Bean
public KafkaTemplate<String, LikeCountChangedEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class PostEntity extends BaseEntity {
private String title;

@Lob
@Column(columnDefinition = "longtext")
private String content;

private Integer viewCount;
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/ussum/homepage/infra/kafka/KafkaProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ussum.homepage.infra.kafka;

import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import ussum.homepage.domain.event.LikeCountChangedEvent;

import java.util.concurrent.CompletableFuture;

@Service
@RequiredArgsConstructor
public class KafkaProducer {

private static final String TOPIC = "post-like-count-changed";

@Autowired
private KafkaTemplate<String, LikeCountChangedEvent> kafkaTemplate;

public void sendLikeCountChangedEvent(Long postId) {
LikeCountChangedEvent event = new LikeCountChangedEvent(postId);
CompletableFuture<SendResult<String, LikeCountChangedEvent>> future = kafkaTemplate.send(TOPIC, event);
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent message=[" + event + "] with offset=[" + result.getRecordMetadata().offset() + "]");
} else {
System.out.println("Unable to send message=[" + event + "] due to : " + ex.getMessage());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package ussum.homepage.domain.post.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.test.annotation.DirtiesContext;
import ussum.homepage.domain.event.LikeCountChangedEvent;
import ussum.homepage.domain.post.Post;
import ussum.homepage.domain.post.PostRepository;
import ussum.homepage.infra.kafka.KafkaProducer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
@DisplayName("<Kafka Petition Post Processor Test>")
public class PetitionPostProcessorKafkaTest {

private static final String TOPIC = "post-like-count-changed";

@Autowired
private KafkaProducer kafkaProducer;

@MockBean
private PostRepository postRepository;

@MockBean
private PetitionPostProcessor petitionPostProcessor;

@Autowired
private KafkaTemplate<String, LikeCountChangedEvent> kafkaTemplate;

private CountDownLatch latch = new CountDownLatch(1);
private LikeCountChangedEvent receivedEvent;

@Test
@DisplayName("좋아요 수 변경 이벤트가 Kafka를 통해 정상적으로 전송되고 처리되어야 한다")
public void testLikeCountChangedEventKafkaFlow() throws InterruptedException {
// given
Long postId = 1L;
LikeCountChangedEvent event = new LikeCountChangedEvent(postId);
Post mockPost = mock(Post.class);

when(postRepository.findById(postId)).thenReturn(java.util.Optional.of(mockPost));

// when
kafkaProducer.sendLikeCountChangedEvent(postId);

// then
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(receivedEvent).isNotNull();
assertThat(receivedEvent.getPostId()).isEqualTo(postId);

verify(petitionPostProcessor, timeout(5000)).onLikeCountChanged(postId);
}

@KafkaListener(topics = TOPIC, groupId = "test-group")
public void listen(@Payload LikeCountChangedEvent event) {
System.out.println("Received event: " + event);
this.receivedEvent = event;
latch.countDown();
}
}