Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.firstticket.queueservice.programmeta.domain;

import com.firstticket.queueservice.programmeta.domain.exception.ProgramNotActiveException;
import com.firstticket.queueservice.programmeta.domain.vo.ProgramId;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -79,6 +80,21 @@ public boolean isActiveAt(LocalDateTime now) {
return !openAt.isAfter(now) && !closeAt.isBefore(now);
}

/**
* 현재 시점에 입장 가능한지 보장. 그렇지 않으면 도메인 예외를 던진다.
*
* <p>대기열 진입 시점처럼 "활성 본질을 강제" 해야 하는 곳에서 사용한다.
* 호출 측은 분기 없이 메소드만 호출하면 된다 (Tell, Don't Ask).</p>
*
* @throws ProgramNotActiveException CANCELLED 이거나 openAt 전 / closeAt 후
*/
public void ensureActiveAt(LocalDateTime now) {
Objects.requireNonNull(now, "now는 null일 수 없습니다.");
if (!isActiveAt(now)) {
throw new ProgramNotActiveException();
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

/**
* ProgramMeta 영속화 인터페이스.
Expand All @@ -19,6 +20,10 @@ public interface ProgramMetaRepository {

Optional<ProgramMeta> findById(ProgramId programId);

default Optional<ProgramMeta> findById(UUID programId) {
return findById(ProgramId.of(programId));
}

List<ProgramMeta> findAll();

void deleteById(ProgramId programId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.firstticket.queueservice.programmeta.domain.exception;

import com.firstticket.common.exception.BusinessException;
import com.firstticket.queueservice.queuetoken.domain.exception.QueueErrorCode;

/**
* 현재 시점이 프로그램의 입장 가능 시간이 아닐 때.
*
* <p>본질:
* <ul>
* <li>openAt 전</li>
* <li>closeAt 후</li>
* <li>CANCELLED 상태</li>
* </ul>
*/
public class ProgramNotActiveException extends BusinessException {

public ProgramNotActiveException() {
super(QueueErrorCode.PROGRAM_NOT_ACTIVE);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.firstticket.queueservice.queuetoken.application;

import com.firstticket.queueservice.programmeta.domain.ProgramMeta;
import com.firstticket.queueservice.programmeta.domain.ProgramMetaRepository;
import com.firstticket.queueservice.queuetoken.application.dto.CancelQueueTokenCommand;
import com.firstticket.queueservice.queuetoken.application.dto.GetQueueTokenQuery;
import com.firstticket.queueservice.queuetoken.application.dto.IssueQueueTokenCommand;
Expand All @@ -8,12 +10,15 @@
import com.firstticket.queueservice.queuetoken.domain.QueueTokenRepository;
import com.firstticket.queueservice.queuetoken.domain.exception.DuplicateTokenException;
import com.firstticket.queueservice.queuetoken.domain.exception.InvalidTokenStateException;
import com.firstticket.queueservice.queuetoken.domain.exception.ProgramNotFoundException;
import com.firstticket.queueservice.queuetoken.domain.exception.TokenNotFoundException;
import com.firstticket.queueservice.queuetoken.domain.vo.ProgramId;
import com.firstticket.queueservice.queuetoken.domain.vo.UserId;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;

/**
* 대기열 진입 / 조회 / 취소를 처리하는 서비스.
*/
Expand All @@ -22,6 +27,7 @@
public class QueueTokenService {

private final QueueTokenRepository queueTokenRepository;
private final ProgramMetaRepository programMetaRepository;

/**
* 대기열에 진입한다.
Expand All @@ -36,7 +42,10 @@ public QueueTokenResult issueToken(IssueQueueTokenCommand command) {
UserId userId = command.userId();
ProgramId programId = command.programId();

// 같은 user+program 토큰이 있으면 폐기 후 새로 발급
// 1. 활성 프로그램 검증
validateProgramActive(programId);

// 2. 같은 user+program 토큰이 있으면 폐기 후 새로 발급
queueTokenRepository.findByUserIdAndProgramId(userId, programId)
.ifPresent(queueTokenRepository::delete);

Expand Down Expand Up @@ -87,4 +96,16 @@ public void cancelToken(CancelQueueTokenCommand command) {

queueTokenRepository.delete(token);
}

/**
* ProgramMeta 로 활성 프로그램 검증.
*
* <p>존재 여부 + 시간 + status 를 확인한다.</p>
*/
private void validateProgramActive(ProgramId programId) {
ProgramMeta programMeta = programMetaRepository.findById(programId.id())
.orElseThrow(ProgramNotFoundException::new);

programMeta.ensureActiveAt(LocalDateTime.now());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ public interface QueueTokenRepository {
*/
void admit(QueueToken token);

/**
* 현재 큐가 존재하는 모든 프로그램 ID 를 조회한다.
*/
List<ProgramId> findActiveProgramIds();

/**
* 특정 프로그램의 모든 대기 / 입장 토큰을 삭제한다.
* Program 이 취소되었을 때 호출.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.firstticket.queueservice.queuetoken.domain.exception;

import com.firstticket.common.exception.BusinessException;

/**
* 입장하려는 프로그램이 ProgramMeta 캐시에 존재하지 않을 때.
*
* <p>가능한 본질:
* <ul>
* <li>program.created 이벤트 도착 전</li>
* <li>존재하지 않는 programId 로 입장 시도</li>
* </ul>
*/
public class ProgramNotFoundException extends BusinessException {

public ProgramNotFoundException() {
super(QueueErrorCode.PROGRAM_NOT_FOUND);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
@Getter
@RequiredArgsConstructor
public enum QueueErrorCode implements ErrorCode {
PROGRAM_NOT_FOUND(HttpStatus.NOT_FOUND, "존재하지 않는 프로그램입니다"),
PROGRAM_NOT_ACTIVE(HttpStatus.UNPROCESSABLE_ENTITY, "현재 입장 가능 시간이 아닙니다"),
INVALID_TOKEN_STATE(HttpStatus.BAD_REQUEST, "대기 토큰 상태 전이 규칙을 위반했습니다"),
DUPLICATE_TOKEN(HttpStatus.CONFLICT, "이미 대기 중인 토큰이 있습니다"),
TOKEN_NOT_FOUND(HttpStatus.NOT_FOUND, "토큰을 찾을 수 없습니다");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,55 +339,57 @@ public List<Object> execute(RedisOperations operations) {
});
}

/**
* Redis 기반 findActiveProgramIds 구현.
*
* <p>{@code queue:program:*} 패턴으로 SCAN 하여 활성 프로그램 ID 목록을 반환한다.
*
* <p>SCAN 사용 이유: KEYS 명령은 production 에서 블로킹 발생 위험. SCAN 은 점진적.
*
* <h3>Future Work</h3>
* 본 메서드는 MVP 의 가정 (큐 존재 = 활성 프로그램) 을 따른다.
* <p>program-service 와 Kafka 이벤트 통합 후엔:
* <ul>
* <li>{@code program.opened} 이벤트 → Redis Set 의 활성 프로그램 추가</li>
* <li>{@code program.closed} 이벤트 → Set 에서 제거</li>
* <li>본 메서드는 Redis Set 직접 조회 (SCAN 불필요)</li>
* </ul>
*/
public List<ProgramId> findActiveProgramIds() {
String pattern = QUEUE_KEY_PREFIX + PROGRAM_KEY_PREFIX + "*";

// 1. SCAN 으로 모든 큐 키 수집
Set<String> keys = redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
Set<String> result = new HashSet<>();
ScanOptions options = ScanOptions.scanOptions()
.match(pattern) // queue:program:* 매칭
.count(100) // 한 번에 100 개씩 점진적 조회
.build();

// try-with-resources 로 cursor 자동 정리
try (Cursor<byte[]> cursor = connection.scan(options)) {
while (cursor.hasNext()) {
// Redis 는 byte[] 반환 → UTF-8 문자열로 변환
result.add(new String(cursor.next(), StandardCharsets.UTF_8));
}
}
return result;
});

if (keys == null || keys.isEmpty()) {
return List.of();
}

// 2. 키에서 UUID 추출하여 ProgramId 로 변환
// 예: "queue:program:abc-123" → "abc-123" → ProgramId.of("abc-123")
String prefix = QUEUE_KEY_PREFIX + PROGRAM_KEY_PREFIX;
return keys.stream()
.map(key -> key.substring(prefix.length()))
.map(ProgramId::fromString)
.toList();
}
// program-service 와 kafka 이벤트 통합 후 교체 완료
//
// /**
// * Redis 기반 findActiveProgramIds 구현.
// *
// * <p>{@code queue:program:*} 패턴으로 SCAN 하여 활성 프로그램 ID 목록을 반환한다.
// *
// * <p>SCAN 사용 이유: KEYS 명령은 production 에서 블로킹 발생 위험. SCAN 은 점진적.
// *
// * <h3>Future Work</h3>
// * 본 메서드는 MVP 의 가정 (큐 존재 = 활성 프로그램) 을 따른다.
// * <p>program-service 와 Kafka 이벤트 통합 후엔:
// * <ul>
// * <li>{@code program.opened} 이벤트 → Redis Set 의 활성 프로그램 추가</li>
// * <li>{@code program.closed} 이벤트 → Set 에서 제거</li>
// * <li>본 메서드는 Redis Set 직접 조회 (SCAN 불필요)</li>
// * </ul>
// */
// public List<ProgramId> findActiveProgramIds() {
// String pattern = QUEUE_KEY_PREFIX + PROGRAM_KEY_PREFIX + "*";
//
// // 1. SCAN 으로 모든 큐 키 수집
// Set<String> keys = redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
// Set<String> result = new HashSet<>();
// ScanOptions options = ScanOptions.scanOptions()
// .match(pattern) // queue:program:* 매칭
// .count(100) // 한 번에 100 개씩 점진적 조회
// .build();
//
// // try-with-resources 로 cursor 자동 정리
// try (Cursor<byte[]> cursor = connection.scan(options)) {
// while (cursor.hasNext()) {
// // Redis 는 byte[] 반환 → UTF-8 문자열로 변환
// result.add(new String(cursor.next(), StandardCharsets.UTF_8));
// }
// }
// return result;
// });
//
// if (keys == null || keys.isEmpty()) {
// return List.of();
// }
//
// // 2. 키에서 UUID 추출하여 ProgramId 로 변환
// // 예: "queue:program:abc-123" → "abc-123" → ProgramId.of("abc-123")
// String prefix = QUEUE_KEY_PREFIX + PROGRAM_KEY_PREFIX;
// return keys.stream()
// .map(key -> key.substring(prefix.length()))
// .map(ProgramId::fromString)
// .toList();
// }

/**
* Redis 기반 deleteAllByProgramId 구현.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.firstticket.queueservice.queuetoken.infrastructure.scheduler;

import com.firstticket.queueservice.programmeta.domain.ProgramMetaRepository;
import com.firstticket.queueservice.queuetoken.config.QueueProperties;
import com.firstticket.queueservice.queuetoken.domain.QueueToken;
import com.firstticket.queueservice.queuetoken.domain.QueueTokenRepository;
Expand All @@ -10,6 +11,7 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.List;

/**
Expand All @@ -20,15 +22,15 @@
*
* <p>흐름:
* <ol>
* <li>Redis SCAN 으로 활성 프로그램 (큐가 존재하는 프로그램) 발견</li>
* <li>programmeta Aggregate 에서 활성 프로그램 조회
* (CANCELLED 아니고 openAt ~ closeAt 사이)</li>
* <li>각 프로그램의 큐 앞에서 batchSize 명 조회 (Sorted Set ZRANGE)</li>
* <li>각 토큰에 대해 JWT 입장 토큰 발급 + 도메인 상태 전이 + Redis 영속성</li>
* </ol>
*
* <p>MVP 단계 한계:
* <ul>
* <li>단일 인스턴스 가정 — 여러 인스턴스에서 동시 실행 시 race 가능 (v0.2.0 별도 이슈)</li>
* <li>활성 프로그램 발견을 Redis SCAN 에 의존 — program-service 통합 후 변경</li>
* </ul>
*/
@Slf4j
Expand All @@ -37,6 +39,7 @@
public class AdmissionScheduler {

private final QueueTokenRepository queueTokenRepository;
private final ProgramMetaRepository programMetaRepository;
private final EntryTokenIssuer entryTokenIssuer;
private final QueueProperties queueProperties;

Expand All @@ -47,7 +50,12 @@ public class AdmissionScheduler {
*/
@Scheduled(fixedRate = 5000)
public void admit() {
List<ProgramId> activePrograms = queueTokenRepository.findActiveProgramIds();
// programmeta Aggregate 에서 활성 프로그램 조회 → queuetoken 의 ProgramId 로 변환
List<ProgramId> activePrograms = programMetaRepository
.findActiveProgramIds(LocalDateTime.now())
.stream()
.map(metaId -> ProgramId.of(metaId.id()))
.toList();

if (activePrograms.isEmpty()) {
return;
Expand Down
Loading