Skip to content

Commit

Permalink
[DDING-49] SSE 연결 API 구현 (#202)
Browse files Browse the repository at this point in the history
  • Loading branch information
5uhwann authored Jan 9, 2025
1 parent 0704a01 commit 042d979
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 0 deletions.
32 changes: 32 additions & 0 deletions src/main/java/ddingdong/ddingdongBE/sse/api/SseConnectionApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ddingdong.ddingdongBE.sse.api;

import ddingdong.ddingdongBE.auth.PrincipalDetails;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Tag(name = "SSE", description = "SSE subscribe API")
@RequestMapping("/server/sse")
public interface SseConnectionApi {

@Operation(
summary = "SSE 구독",
description = "SSE 연결을 설정합니다. 연결 타임아웃은 60초입니다."
)
@ApiResponse(responseCode = "200", description = "SSE 구독 연결 성공")
@ApiResponse(responseCode = "401", description = "인증되지 않은 사용자")
@ApiResponse(responseCode = "500", description = "서버 오류")
@ResponseStatus(HttpStatus.OK)
@SecurityRequirement(name = "AccessToken")
@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter subscribe(@AuthenticationPrincipal PrincipalDetails principalDetails);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ddingdong.ddingdongBE.sse.controller;

import ddingdong.ddingdongBE.auth.PrincipalDetails;
import ddingdong.ddingdongBE.domain.user.entity.User;
import ddingdong.ddingdongBE.sse.api.SseConnectionApi;
import ddingdong.ddingdongBE.sse.service.SseConnectionService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
@RequiredArgsConstructor
public class SseConnectionController implements SseConnectionApi {

private final SseConnectionService sseConnectionService;

@Override
public SseEmitter subscribe(PrincipalDetails principalDetails) {
User user = principalDetails.getUser();
Long timeout = 60L * 1000; // 1분 (60초 * 1000밀리초)
return sseConnectionService.subscribe(user.getId().toString(), timeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package ddingdong.ddingdongBE.sse.repository;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Repository
public class SseConnectionMapRepository implements SseConnectionRepository {

private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

@Override
public void save(String id, SseEmitter sseEmitter) {
emitters.put(id, sseEmitter);
}

@Override
public SseEmitter findById(String id) {
return emitters.get(id);
}

@Override
public void deleteById(String id) {
SseEmitter removed = this.emitters.remove(id);
if (removed != null) {
removed.complete();
}
}

@Override
public void deleteAll() {
emitters.values().forEach(SseEmitter::complete);
emitters.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ddingdong.ddingdongBE.sse.repository;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

public interface SseConnectionRepository {

void save(String id, SseEmitter sseEmitter);

SseEmitter findById(String id);

void deleteById(String id);

void deleteAll();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package ddingdong.ddingdongBE.sse.service;

import ddingdong.ddingdongBE.sse.repository.SseConnectionRepository;
import java.io.IOException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Slf4j
@Service
@RequiredArgsConstructor
public class SseConnectionService {

private final SseConnectionRepository sseConnectionRepository;

public SseEmitter subscribe(String id, Long timeout) {
SseEmitter sseEmitter = new SseEmitter(timeout);
checkExistingEmitter(id);
log.info("SSE Connection established for user: {}", id);
sseConnectionRepository.save(id, sseEmitter);

// 연결 완료 콜백
sseEmitter.onCompletion(() -> sseConnectionRepository.deleteById(id));

// 연결 타임아웃 콜백
sseEmitter.onTimeout(sseEmitter::complete);

// 연결 에러 콜백
sseEmitter.onError((ex) -> {
log.warn("SSE Connection error: {}", ex.getMessage());
sseConnectionRepository.deleteById(id);
});

try {
sseEmitter.send(
SseEmitter.event()
.id(id)
.name("connect")
.data("Connected successfully!")
);
} catch (IOException e) {
log.error("Error sending initial SSE event to user: {}", id, e);
sseConnectionRepository.deleteById(id); // 초기 이벤트 전송 실패 시 제거
}
return sseEmitter;
}

private void checkExistingEmitter(String id) {
SseEmitter oldEmitter = sseConnectionRepository.findById(id);
if (oldEmitter != null) {
oldEmitter.complete();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ddingdong.ddingdongBE.sse.service;

import static org.assertj.core.api.Assertions.assertThat;

import ddingdong.ddingdongBE.common.support.TestContainerSupport;
import ddingdong.ddingdongBE.sse.repository.SseConnectionRepository;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@SpringBootTest
@Slf4j
class SseConnectionServiceTest extends TestContainerSupport {

@Autowired
private SseConnectionService sseConnectionService;

@Autowired
private SseConnectionRepository sseConnectionRepository;

private final String TEST_ID = "testId";
private final long TEST_TIMEOUT = 60000L;

@DisplayName("sse 구독 요청 시 sse 연결이 성공적으로 생성되어야 한다.")
@Test
void subscribe_Success() {
// when
SseEmitter emitter = sseConnectionService.subscribe(TEST_ID, TEST_TIMEOUT);

// then
assertThat(emitter).isNotNull();
assertThat(sseConnectionRepository.findById(TEST_ID)).isNotNull();
}

@DisplayName("동일한 ID로 재구독 시 기존 연결은 종료되고 새로운 연결이 생성되어야 한다")
@Test
void subscribe_ExistingEmitter() {
// given
SseEmitter firstEmitter = sseConnectionService.subscribe(TEST_ID, TEST_TIMEOUT);

// when
SseEmitter secondEmitter = sseConnectionService.subscribe(TEST_ID, TEST_TIMEOUT);

// then
assertThat(firstEmitter).isNotEqualTo(secondEmitter);
assertThat(sseConnectionRepository.findById(TEST_ID)).isEqualTo(secondEmitter);
}

@DisplayName("서로 다른 ID로 구독 시 각각 독립적인 SSE 연결이 생성되어야 한다")
@Test
void multipleSubscribers_WorkIndependently() {
// given
String TEST_ID_2 = "testId2";

// when
sseConnectionService.subscribe(TEST_ID, TEST_TIMEOUT);
sseConnectionService.subscribe(TEST_ID_2, TEST_TIMEOUT);

// then
assertThat(sseConnectionRepository.findById(TEST_ID)).isNotNull();
assertThat(sseConnectionRepository.findById(TEST_ID_2)).isNotNull();
assertThat(sseConnectionRepository.findById(TEST_ID))
.isNotEqualTo(sseConnectionRepository.findById(TEST_ID_2));
}
}

0 comments on commit 042d979

Please sign in to comment.