Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DDING-51] VodProcessingJob 완료 시 notification 전송 구현 #205

Merged
merged 13 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ddingdong.ddingdongBE.common.exception;

import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR;

sealed public class SseException extends CustomException {

public static final String SSE_NOT_FOUND_ERROR_MESSAGE = "SSE Not Found = ";

public SseException(String message, int errorCode) {
super(message, errorCode);
}

public static final class SseEmitterNotFound extends SseException {

public SseEmitterNotFound(String sseId) {
super(SSE_NOT_FOUND_ERROR_MESSAGE + sseId, INTERNAL_SERVER_ERROR.value());
}
}
5uhwann marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class Feed extends BaseEntity {
private LocalDateTime deletedAt;

@Builder
private Feed(String activityContent, Club club, FeedType feedType) {
private Feed(Long id, String activityContent, Club club, FeedType feedType) {
this.id = id;
5uhwann marked this conversation as resolved.
Show resolved Hide resolved
this.activityContent = activityContent;
this.club = club;
this.feedType = feedType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,27 @@
import ddingdong.ddingdongBE.domain.feed.service.dto.command.UpdateFeedCommand;
import ddingdong.ddingdongBE.domain.filemetadata.entity.DomainType;
import ddingdong.ddingdongBE.domain.filemetadata.service.FileMetaDataService;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.ConvertJobStatus;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingJob;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingNotification;
import ddingdong.ddingdongBE.domain.vodprocessing.service.VodProcessingJobService;
import ddingdong.ddingdongBE.sse.service.SseConnectionService;
import ddingdong.ddingdongBE.sse.service.dto.SseEvent;
import java.time.LocalDateTime;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@RequiredArgsConstructor
@Transactional(readOnly = true)
public class FacadeClubFeedServiceImpl implements FacadeClubFeedService{
public class FacadeClubFeedServiceImpl implements FacadeClubFeedService {

private final ClubService clubService;
private final FileMetaDataService fileMetaDataService;
private final FeedService feedService;
private final VodProcessingJobService vodProcessingJobService;
private final SseConnectionService sseConnectionService;

@Override
@Transactional
Expand All @@ -34,6 +43,7 @@ public void create(CreateFeedCommand command) {

if (feed.isVideo()) {
fileMetaDataService.updateStatusToCoupled(command.mediaId(), DomainType.FEED_VIDEO, createdId);
checkVodProcessingJobAndNotify(feed);
}
}

Expand All @@ -52,4 +62,16 @@ public void delete(Long feedId) {
feedService.delete(feed);
fileMetaDataService.updateStatusToDelete(feed.getFeedType().getDomainType(), feed.getId());
}

private void checkVodProcessingJobAndNotify(Feed feed) {
VodProcessingJob vodProcessingJob = vodProcessingJobService.getByVideoFeedId(feed.getId());
if (vodProcessingJob.isPossibleNotify()) {
SseEvent<ConvertJobStatus> sseEvent = SseEvent.of(
"vod-processing",
5uhwann marked this conversation as resolved.
Show resolved Hide resolved
vodProcessingJob.getConvertJobStatus(),
LocalDateTime.now()
);
sseConnectionService.sendVodProcessingNotification(vodProcessingJob, sseEvent);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import ddingdong.ddingdongBE.domain.feed.entity.Feed;
import java.util.List;
import java.util.Optional;

public interface FeedService {

Expand All @@ -11,6 +12,8 @@ public interface FeedService {

Feed getById(Long feedId);

Optional<Feed> findById(Long feedId);

Long create(Feed feed);

void delete(Feed feed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import ddingdong.ddingdongBE.domain.feed.entity.Feed;
import ddingdong.ddingdongBE.domain.feed.repository.FeedRepository;
import java.util.List;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -31,6 +32,11 @@ public Feed getById(Long feedId) {
.orElseThrow(() -> new ResourceNotFound("Feed(id: " + feedId + ")를 찾을 수 없습니다."));
}

@Override
public Optional<Feed> findById(Long feedId) {
return feedRepository.findById(feedId);
}
5uhwann marked this conversation as resolved.
Show resolved Hide resolved

@Override
@Transactional
public Long create(Feed feed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import ddingdong.ddingdongBE.domain.vodprocessing.entity.ConvertJobStatus;
import ddingdong.ddingdongBE.domain.vodprocessing.service.dto.command.UpdateVodProcessingJobStatusCommand;
import jakarta.validation.constraints.NotNull;
import org.hibernate.validator.constraints.UUID;

public record UpdateVodProcessingJobStatusRequest(
@NotNull(message = "변환 작업 ID는 필수입니다")
@UUID
String convertJobId,
@NotNull(message = "상태는 필수입니다")
ConvertJobStatus status
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
package ddingdong.ddingdongBE.domain.vodprocessing.entity;

public enum VodNotificationStatus {
PENDING,
SENT,
COMPLETED,
FAILED,
EXPIRED
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,8 @@ private VodProcessingJob(Long id, VodProcessingNotification vodProcessingNotific
public void updateConvertJobStatus(ConvertJobStatus convertJobStatus) {
this.convertJobStatus = convertJobStatus;
}

public boolean isPossibleNotify() {
return this.convertJobStatus == ConvertJobStatus.COMPLETE || this.convertJobStatus == ConvertJobStatus.ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import jakarta.persistence.Id;
import java.time.LocalDateTime;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

Expand All @@ -33,4 +34,26 @@ public class VodProcessingNotification extends BaseEntity {
@Column(name = "notification_status", nullable = false)
private VodNotificationStatus vodNotificationStatus;

@Builder
private VodProcessingNotification(Long id, LocalDateTime expiredAt, LocalDateTime sentAt, int retryCount,
VodNotificationStatus vodNotificationStatus) {
this.id = id;
this.expiredAt = expiredAt;
this.sentAt = sentAt;
this.retryCount = retryCount;
this.vodNotificationStatus = vodNotificationStatus;
}

public static VodProcessingNotification creatPending() {
return VodProcessingNotification.builder()
.retryCount(0)
.vodNotificationStatus(VodNotificationStatus.PENDING)
.build();
}

public void updateVodNotificationStatusToSent(LocalDateTime sentAt) {
this.sentAt = sentAt;
this.vodNotificationStatus = VodNotificationStatus.SENT;
}
5uhwann marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
package ddingdong.ddingdongBE.domain.vodprocessing.repository;

import ddingdong.ddingdongBE.domain.filemetadata.entity.DomainType;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingJob;
import java.util.Optional;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;

@Repository
public interface VodProcessingJobRepository extends JpaRepository<VodProcessingJob, Long> {

Optional<VodProcessingJob> findByConvertJobId(String convertJobId);

@Query("""
SELECT vj FROM VodProcessingJob vj
JOIN FETCH vj.fileMetaData fm
WHERE fm.entityId = :entityId
AND fm.domainType = :domainType
""")
Optional<VodProcessingJob> findFirstByFileMetaDataEntityIdAndDomainType(
@Param("entityId") Long entityId,
@Param("domainType") DomainType domainType
);
Comment on lines +16 to +25
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p2)
해당 쿼리가 의도대로 동작하는지, 테스트 작성해주면 좋을 것 같아요


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ddingdong.ddingdongBE.domain.vodprocessing.repository;

import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingNotification;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface VodProcessingNotificationRepository extends JpaRepository<VodProcessingNotification, Long> {

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package ddingdong.ddingdongBE.domain.vodprocessing.service;

import ddingdong.ddingdongBE.domain.feed.entity.Feed;
import ddingdong.ddingdongBE.domain.feed.service.FeedService;
import ddingdong.ddingdongBE.domain.filemetadata.entity.FileMetaData;
import ddingdong.ddingdongBE.domain.filemetadata.service.FileMetaDataService;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.ConvertJobStatus;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingJob;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingNotification;
import ddingdong.ddingdongBE.domain.vodprocessing.service.dto.command.CreatePendingVodProcessingJobCommand;
import ddingdong.ddingdongBE.domain.vodprocessing.service.dto.command.UpdateVodProcessingJobStatusCommand;
import ddingdong.ddingdongBE.sse.service.SseConnectionService;
import ddingdong.ddingdongBE.sse.service.dto.SseEvent;
import java.time.LocalDateTime;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -16,19 +24,43 @@ public class FacadeVodProcessingJobServiceImpl implements FacadeVodProcessingJob

private final VodProcessingJobService vodProcessingJobService;
private final FileMetaDataService fileMetaDataService;
private final VodProcessingNotificationService vodProcessingNotificationService;
private final SseConnectionService sseConnectionService;
private final FeedService feedService;

@Override
@Transactional
public Long create(CreatePendingVodProcessingJobCommand command) {
FileMetaData fileMetaData = fileMetaDataService.getById(command.fileId());
return vodProcessingJobService.save(command.toPendingVodProcessingJob(fileMetaData));
VodProcessingNotification pendingNotification =
vodProcessingNotificationService.save(VodProcessingNotification.creatPending());
return vodProcessingJobService.save(command.toPendingVodProcessingJob(pendingNotification, fileMetaData));
}

@Override
@Transactional
public void updateVodProcessingJobStatus(UpdateVodProcessingJobStatusCommand command) {
VodProcessingJob vodProcessingJob = vodProcessingJobService.getByConvertJobId(command.convertJobId());
vodProcessingJob.updateConvertJobStatus(command.status());
checkVodProcessingJobStatus(vodProcessingJob);
}

private void checkVodProcessingJobStatus(VodProcessingJob vodProcessingJob) {
if (vodProcessingJob.isPossibleNotify()) {
checkExistingFeedAndNotify(vodProcessingJob);
}
}

private void checkExistingFeedAndNotify(VodProcessingJob vodProcessingJob) {
Optional<Feed> optionalFeed = feedService.findById(vodProcessingJob.getFileMetaData().getEntityId());
if (optionalFeed.isPresent()) {
SseEvent<ConvertJobStatus> sseEvent = SseEvent.of(
"vod-processing",
vodProcessingJob.getConvertJobStatus(),
LocalDateTime.now()
);
sseConnectionService.sendVodProcessingNotification(vodProcessingJob, sseEvent);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ddingdong.ddingdongBE.domain.vodprocessing.service;

import ddingdong.ddingdongBE.common.exception.PersistenceException.ResourceNotFound;
import ddingdong.ddingdongBE.domain.filemetadata.entity.DomainType;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingJob;
import ddingdong.ddingdongBE.domain.vodprocessing.repository.VodProcessingJobRepository;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -34,4 +35,13 @@ public VodProcessingJob getByConvertJobId(String convertJobId) {
.orElseThrow(() -> new ResourceNotFound(
"VodProcessingJob(convertJobId=" + convertJobId + ")를 찾을 수 없습니다."));
}

@Override
public VodProcessingJob getByVideoFeedId(Long videoFeedId) {
return vodProcessingJobRepository.findFirstByFileMetaDataEntityIdAndDomainType(
videoFeedId,
DomainType.FEED_VIDEO)
.orElseThrow(() -> new ResourceNotFound(
"VodProcessingJob(videoFeedId=" + videoFeedId + ")를 찾을 수 없습니다."));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ddingdong.ddingdongBE.domain.vodprocessing.service;

import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingNotification;
import ddingdong.ddingdongBE.domain.vodprocessing.repository.VodProcessingNotificationRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional(readOnly = true)
@RequiredArgsConstructor
5uhwann marked this conversation as resolved.
Show resolved Hide resolved
public class GeneralVodProcessingNotificationService implements VodProcessingNotificationService {

private final VodProcessingNotificationRepository notificationRepository;

@Override
@Transactional
public VodProcessingNotification save(VodProcessingNotification vodProcessingNotification) {
return notificationRepository.save(vodProcessingNotification);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ public interface VodProcessingJobService {

VodProcessingJob getByConvertJobId(String convertJobId);

VodProcessingJob getByVideoFeedId(Long videoFeedId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ddingdong.ddingdongBE.domain.vodprocessing.service;

import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingNotification;

public interface VodProcessingNotificationService {

VodProcessingNotification save(VodProcessingNotification vodProcessingNotification);

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@
import ddingdong.ddingdongBE.domain.filemetadata.entity.FileMetaData;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.ConvertJobStatus;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingJob;
import ddingdong.ddingdongBE.domain.vodprocessing.entity.VodProcessingNotification;

public record CreatePendingVodProcessingJobCommand(
String convertJobId,
String userId,
String fileId
) {

public VodProcessingJob toPendingVodProcessingJob(FileMetaData fileMetaData) {
public VodProcessingJob toPendingVodProcessingJob(VodProcessingNotification notification, FileMetaData fileMetaData) {
return VodProcessingJob.builder()
.convertJobId(convertJobId)
.fileMetaData(fileMetaData)
.userId(userId)
.convertJobStatus(ConvertJobStatus.PENDING)
.vodProcessingNotification(notification)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ddingdong.ddingdongBE.sse.repository;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
Expand All @@ -16,8 +17,8 @@ public void save(String id, SseEmitter sseEmitter) {
}

@Override
public SseEmitter findById(String id) {
return emitters.get(id);
public Optional<SseEmitter> findById(String id) {
return Optional.ofNullable(emitters.get(id));
}

@Override
Expand Down
Loading
Loading