From 042d979db94c5ad6334d2225071f90497fd43469 Mon Sep 17 00:00:00 2001 From: 5uhwann <106325839+5uhwann@users.noreply.github.com> Date: Thu, 9 Jan 2025 14:08:10 +0900 Subject: [PATCH] =?UTF-8?q?[DDING-49]=20SSE=20=EC=97=B0=EA=B2=B0=20API=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84=20(#202)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ddingdongBE/sse/api/SseConnectionApi.java | 32 +++++++++ .../controller/SseConnectionController.java | 23 +++++++ .../SseConnectionMapRepository.java | 36 ++++++++++ .../repository/SseConnectionRepository.java | 15 ++++ .../sse/service/SseConnectionService.java | 56 +++++++++++++++ .../sse/service/SseConnectionServiceTest.java | 68 +++++++++++++++++++ 6 files changed, 230 insertions(+) create mode 100644 src/main/java/ddingdong/ddingdongBE/sse/api/SseConnectionApi.java create mode 100644 src/main/java/ddingdong/ddingdongBE/sse/controller/SseConnectionController.java create mode 100644 src/main/java/ddingdong/ddingdongBE/sse/repository/SseConnectionMapRepository.java create mode 100644 src/main/java/ddingdong/ddingdongBE/sse/repository/SseConnectionRepository.java create mode 100644 src/main/java/ddingdong/ddingdongBE/sse/service/SseConnectionService.java create mode 100644 src/test/java/ddingdong/ddingdongBE/sse/service/SseConnectionServiceTest.java diff --git a/src/main/java/ddingdong/ddingdongBE/sse/api/SseConnectionApi.java b/src/main/java/ddingdong/ddingdongBE/sse/api/SseConnectionApi.java new file mode 100644 index 00000000..93f50d81 --- /dev/null +++ b/src/main/java/ddingdong/ddingdongBE/sse/api/SseConnectionApi.java @@ -0,0 +1,32 @@ +package ddingdong.ddingdongBE.sse.api; + +import ddingdong.ddingdongBE.auth.PrincipalDetails; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.security.SecurityRequirement; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.security.core.annotation.AuthenticationPrincipal; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +@Tag(name = "SSE", description = "SSE subscribe API") +@RequestMapping("/server/sse") +public interface SseConnectionApi { + + @Operation( + summary = "SSE 구독", + description = "SSE 연결을 설정합니다. 연결 타임아웃은 60초입니다." + ) + @ApiResponse(responseCode = "200", description = "SSE 구독 연결 성공") + @ApiResponse(responseCode = "401", description = "인증되지 않은 사용자") + @ApiResponse(responseCode = "500", description = "서버 오류") + @ResponseStatus(HttpStatus.OK) + @SecurityRequirement(name = "AccessToken") + @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + SseEmitter subscribe(@AuthenticationPrincipal PrincipalDetails principalDetails); + +} diff --git a/src/main/java/ddingdong/ddingdongBE/sse/controller/SseConnectionController.java b/src/main/java/ddingdong/ddingdongBE/sse/controller/SseConnectionController.java new file mode 100644 index 00000000..f8d5ea6d --- /dev/null +++ b/src/main/java/ddingdong/ddingdongBE/sse/controller/SseConnectionController.java @@ -0,0 +1,23 @@ +package ddingdong.ddingdongBE.sse.controller; + +import ddingdong.ddingdongBE.auth.PrincipalDetails; +import ddingdong.ddingdongBE.domain.user.entity.User; +import ddingdong.ddingdongBE.sse.api.SseConnectionApi; +import ddingdong.ddingdongBE.sse.service.SseConnectionService; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +@RestController +@RequiredArgsConstructor +public class SseConnectionController implements SseConnectionApi { + + private final SseConnectionService sseConnectionService; + + @Override + public SseEmitter subscribe(PrincipalDetails principalDetails) { + User user = principalDetails.getUser(); + Long timeout = 60L * 1000; // 1분 (60초 * 1000밀리초) + return sseConnectionService.subscribe(user.getId().toString(), timeout); + } +} diff --git a/src/main/java/ddingdong/ddingdongBE/sse/repository/SseConnectionMapRepository.java b/src/main/java/ddingdong/ddingdongBE/sse/repository/SseConnectionMapRepository.java new file mode 100644 index 00000000..1b6d518c --- /dev/null +++ b/src/main/java/ddingdong/ddingdongBE/sse/repository/SseConnectionMapRepository.java @@ -0,0 +1,36 @@ +package ddingdong.ddingdongBE.sse.repository; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.springframework.stereotype.Repository; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +@Repository +public class SseConnectionMapRepository implements SseConnectionRepository { + + private final Map emitters = new ConcurrentHashMap<>(); + + @Override + public void save(String id, SseEmitter sseEmitter) { + emitters.put(id, sseEmitter); + } + + @Override + public SseEmitter findById(String id) { + return emitters.get(id); + } + + @Override + public void deleteById(String id) { + SseEmitter removed = this.emitters.remove(id); + if (removed != null) { + removed.complete(); + } + } + + @Override + public void deleteAll() { + emitters.values().forEach(SseEmitter::complete); + emitters.clear(); + } +} diff --git a/src/main/java/ddingdong/ddingdongBE/sse/repository/SseConnectionRepository.java b/src/main/java/ddingdong/ddingdongBE/sse/repository/SseConnectionRepository.java new file mode 100644 index 00000000..47a37bd2 --- /dev/null +++ b/src/main/java/ddingdong/ddingdongBE/sse/repository/SseConnectionRepository.java @@ -0,0 +1,15 @@ +package ddingdong.ddingdongBE.sse.repository; + +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +public interface SseConnectionRepository { + + void save(String id, SseEmitter sseEmitter); + + SseEmitter findById(String id); + + void deleteById(String id); + + void deleteAll(); + +} diff --git a/src/main/java/ddingdong/ddingdongBE/sse/service/SseConnectionService.java b/src/main/java/ddingdong/ddingdongBE/sse/service/SseConnectionService.java new file mode 100644 index 00000000..d0eeaec4 --- /dev/null +++ b/src/main/java/ddingdong/ddingdongBE/sse/service/SseConnectionService.java @@ -0,0 +1,56 @@ +package ddingdong.ddingdongBE.sse.service; + +import ddingdong.ddingdongBE.sse.repository.SseConnectionRepository; +import java.io.IOException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +@Slf4j +@Service +@RequiredArgsConstructor +public class SseConnectionService { + + private final SseConnectionRepository sseConnectionRepository; + + public SseEmitter subscribe(String id, Long timeout) { + SseEmitter sseEmitter = new SseEmitter(timeout); + checkExistingEmitter(id); + log.info("SSE Connection established for user: {}", id); + sseConnectionRepository.save(id, sseEmitter); + + // 연결 완료 콜백 + sseEmitter.onCompletion(() -> sseConnectionRepository.deleteById(id)); + + // 연결 타임아웃 콜백 + sseEmitter.onTimeout(sseEmitter::complete); + + // 연결 에러 콜백 + sseEmitter.onError((ex) -> { + log.warn("SSE Connection error: {}", ex.getMessage()); + sseConnectionRepository.deleteById(id); + }); + + try { + sseEmitter.send( + SseEmitter.event() + .id(id) + .name("connect") + .data("Connected successfully!") + ); + } catch (IOException e) { + log.error("Error sending initial SSE event to user: {}", id, e); + sseConnectionRepository.deleteById(id); // 초기 이벤트 전송 실패 시 제거 + } + return sseEmitter; + } + + private void checkExistingEmitter(String id) { + SseEmitter oldEmitter = sseConnectionRepository.findById(id); + if (oldEmitter != null) { + oldEmitter.complete(); + } + } + +} diff --git a/src/test/java/ddingdong/ddingdongBE/sse/service/SseConnectionServiceTest.java b/src/test/java/ddingdong/ddingdongBE/sse/service/SseConnectionServiceTest.java new file mode 100644 index 00000000..fd15e4d7 --- /dev/null +++ b/src/test/java/ddingdong/ddingdongBE/sse/service/SseConnectionServiceTest.java @@ -0,0 +1,68 @@ +package ddingdong.ddingdongBE.sse.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import ddingdong.ddingdongBE.common.support.TestContainerSupport; +import ddingdong.ddingdongBE.sse.repository.SseConnectionRepository; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +@SpringBootTest +@Slf4j +class SseConnectionServiceTest extends TestContainerSupport { + + @Autowired + private SseConnectionService sseConnectionService; + + @Autowired + private SseConnectionRepository sseConnectionRepository; + + private final String TEST_ID = "testId"; + private final long TEST_TIMEOUT = 60000L; + + @DisplayName("sse 구독 요청 시 sse 연결이 성공적으로 생성되어야 한다.") + @Test + void subscribe_Success() { + // when + SseEmitter emitter = sseConnectionService.subscribe(TEST_ID, TEST_TIMEOUT); + + // then + assertThat(emitter).isNotNull(); + assertThat(sseConnectionRepository.findById(TEST_ID)).isNotNull(); + } + + @DisplayName("동일한 ID로 재구독 시 기존 연결은 종료되고 새로운 연결이 생성되어야 한다") + @Test + void subscribe_ExistingEmitter() { + // given + SseEmitter firstEmitter = sseConnectionService.subscribe(TEST_ID, TEST_TIMEOUT); + + // when + SseEmitter secondEmitter = sseConnectionService.subscribe(TEST_ID, TEST_TIMEOUT); + + // then + assertThat(firstEmitter).isNotEqualTo(secondEmitter); + assertThat(sseConnectionRepository.findById(TEST_ID)).isEqualTo(secondEmitter); + } + + @DisplayName("서로 다른 ID로 구독 시 각각 독립적인 SSE 연결이 생성되어야 한다") + @Test + void multipleSubscribers_WorkIndependently() { + // given + String TEST_ID_2 = "testId2"; + + // when + sseConnectionService.subscribe(TEST_ID, TEST_TIMEOUT); + sseConnectionService.subscribe(TEST_ID_2, TEST_TIMEOUT); + + // then + assertThat(sseConnectionRepository.findById(TEST_ID)).isNotNull(); + assertThat(sseConnectionRepository.findById(TEST_ID_2)).isNotNull(); + assertThat(sseConnectionRepository.findById(TEST_ID)) + .isNotEqualTo(sseConnectionRepository.findById(TEST_ID_2)); + } +}