SSE 방식을 이용한 알림 구현

프로젝트에서 사용했던 SSE 방식에 대해

개요

알림 구현에는 크게 4가지 방식이 있다.

폴링(Polling)

  • 일정 시간마다 Client에서 Server로 요청을 보내고 응답을 받는 방식
  • 응답해줄 데이터가 없어도 응답을 받음
  • 지속적으로 request를 날림으로써 비용 부담 및 서버에 부하를 줄 수 있음

롱 폴링(Long Polling)

  • 긴 connection을 열어두고, 이벤트가 발생했을때 Request를 보냄
  • 응답해줄 데이터가 없으면 데이터가 생길때까지 기다림
  • connection 간격이 좁으면 Polling 방식과 큰 차이가 없음

웹소켓(WebSocket)

  • 양방향 통신으로써 Client, Server 간 Handshaking 방식으로 접속 후 통신
  • 연결 후 계속 connection을 유지하므로 불필요한 비용 발생할 수 있음

SSE(Server-Sent-Events)

  • Client가 Server에 구독 요청을 보내면, Server에서 이벤트가 발생할 때마다 Response를 보냄
  • Server에서 Client로만 이벤트를 보내는 단방향 통신

우리는 사용자 알림에 대해서 기능을 구현할 예정이었기 때문에, SSE 방식을 통해서 알림을 구현하기로 했다.

구현

  • Emitter란? SseEmitter 라는 SSE 통신을 위한 구현체를 제공받기 위해 사용

  • React(Client)에서 SSE응답을 받기 위한 방법 token을 사용하지 않을 때: EventSource token을 사용할 때: EventSourcePolyfill

우리는 사용자 인증/인가를 JWT를 사용하여 구현하였기 때문에 EventSourcePolyfill을 사용하게 되었다.

1. SseEmitters 클래스

@Slf4j
@Getter
@Component
@RequiredArgsConstructor
public class SseEmitters {
    private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    public SseEmitter add(String id, SseEmitter emitter) {
        emitters.put(id, emitter);
        log.info("new emitter added: {}", emitter);
        log.info("emitter list size: {}", emitters.size());

        return emitter;
    }

    public Map<String, SseEmitter> findEmitter(String id){
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(id))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    public void delete(String id) {
        emitters.remove(id);
    }
}
  • 이벤트를 관리하는 SseEmitter, ConcurrentHashMap을 통해서 여러 스레드에서 동시에 접근해도 데이터를 처리할 수 있도록 함

2. NotificationController 클래스

@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/notification")
public class NotificationController {
    private final JWTUtil jwtUtil;
    private final NotificationService notificationService;

    // sse 연결
    @GetMapping(value = "/subscribe/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> subscribe(@PathVariable Long userId,
                                                @RequestHeader(value = "lastEventId", required = false, defaultValue = "") String lastEventId,
                                                HttpServletResponse response){
        log.info(lastEventId);
        return new ResponseEntity<>(notificationService.subscribe(userId, response), HttpStatus.OK);
    }

    // 유저 별 알림 조회
    @GetMapping("/{userId}")
    public ResponseEntity<List<Notification>> getUserNotifications(@PathVariable Long userId){
        List<Notification> notifications = notificationService.getUserNotifications(userId);
        return new ResponseEntity<> (notifications, HttpStatus.OK);
    }
  • 클라이언트 Header에 알림 모달이 위치해있고, 유저가 로그인해서 메인페이지에 접속하게 되면 Header를 통해서 /api/notification/subscribe/{userId}에 구독 요청을 보낸다.
  • 알림에는 아이디가 존재하는데, 통신 간에 누락 된 알림을 테스트하기 위해 lastEventId를 사용했다.
  • MediaType.TEXT_EVENT_STREAM_VALUE는 SSE를 지원하는 텍스트 형식을 나타낸다.

3. NotificationService 클래스

@Slf4j
@Service
@RequiredArgsConstructor
public class NotificationService {
    private final SseEmitters sseEmitters;
    private final NotificationRepository notificationRepository;
    // timeout 시간 설정
    private static final long TIMEOUT = 60 * 1000L;

    public SseEmitter subscribe(Long userId, HttpServletResponse response) {
        // 기존의 연결 종료
        String existingId = userId + "_";
        Map<String, SseEmitter> existingEmitters = sseEmitters.findEmitter(existingId);
        existingEmitters.forEach((key, emitter) -> {
            emitter.complete();
            sseEmitters.delete(key);
        });

        // 새 연결 생성
        SseEmitter emitter = new SseEmitter(TIMEOUT);
        String id = userId + "_" + System.currentTimeMillis();
        sseEmitters.add(id, emitter);

        // NGINX PROXY 에서의 필요 설정 불필요한 버퍼링방지
        response.setHeader("X-Accel-Buffering", "no");

        Map<String, Object> testContent = new HashMap<>();
        testContent.put("content", "connected!");
        sendToClient(emitter, "test", id, testContent);

        // 타임아웃 시 emitter 만료
        emitter.onTimeout(() -> {
            log.info("onTimeout callback");
            emitter.complete();
            sseEmitters.delete(id);
        });

		// 에러 발생
        emitter.onError(throwable -> {
            log.error("[sse] SseEmitters 파일 add 메서드 : {}", throwable.getMessage());
            emitter.complete();
            sseEmitters.delete(id);
        });

		//클라이언트와의 연결이 끊어졌을 때
        emitter.onCompletion(() -> {
            log.info("onCompletion callback");
            sseEmitters.delete(id);
        });

        return emitter;
    }


    private void sendToClient(SseEmitter emitter, String name, String id, Object data) {
        try {
            emitter.send(SseEmitter.event()
                    .id(id)
                    .name(name)
                    .data(data));
        } catch (IOException exception) {
            sseEmitters.delete(id);
            throw new RuntimeException("연결 오류!");
        }
    }

    @Transactional
    public void send(SendNotificationEvent noti) {
        Notification notification = notificationRepository.save(Notification.create(noti));
        log.info("저장됨");

        String receiverId = noti.getReceiver() + "_";
        log.info(receiverId);

        // 해당 회원의 emitter 모두 찾아서 이벤트 전송
        Map<String, SseEmitter> emitters = sseEmitters.findEmitter(receiverId);
        log.info(emitters.entrySet().toString());

        emitters.forEach(
            (key, emitter) -> {
                sendToClient(emitter, noti.getName(), noti.getEventId(), notification);
                log.info("알림 전송 완료");
            }
        );
    }

    // 유저 별 알림 조회
    public List<Notification> getUserNotifications(Long userId) {
        return notificationRepository.findByReceiverOrderByNotificationCreatedDateDesc(userId);
    }
  • 구독 요청이 들어왔을 때, 새로운 Emitter를 생성하고 더미 데이터를 보낸다. 기존 연결이 있다면 제거하고 새로운 Emitter를 생성한다.

추가적으로, 읽지 않은 알림 및 전체 알림 조회/삭제 기능을 위해서 알림에 대한 Entity와 Repository를 생성해줬다.

(+) 클라이언트에서 구독 요청 & 응답 읽기

// sse 연결 선언
    useEffect(() => {
        // 마운트 시 로그인 상태 + sse 연결이 안 된 상태면 연결
        if (token && !eventSource) {
            subscribe();
        }

        // 언마운트 시 sse 연결 종료
        return () => {
            if (eventSource) {
                eventSource.close();
                console.log("연결 종료");
                setEventSource(null);
            }
        }
    },[eventSource, token])
    
    // sse 연결 시작
    const subscribe = async () => {
        console.log("연결 시작");
        const source = new EventSourcePolyfill(
            `${notifyApi}/subscribe/` + userId,
            {
            headers: {
                Authorization: `Bearer ${token}`,
                lastEventId: lastEventId,
            },
            heartbeatTimeout: 600000,
            }
        );
        
        // mileage 관련 알림 받아옴
        source.addEventListener("mileage", (e) => {
            setLastEventId(e.lastEventId);
            console.log(lastEventId);
            const data = JSON.parse(e.data);
            console.log(data);
            toast(data.content);
            setNotifications((prevNotifications) => [data, ...prevNotifications]);
            console.log(notifications);
            setUnreadCount((prevCount) => prevCount + 1);
        });
	}

연결 이펙트에 대한 함수를 선언하고, Server에서 보내는 알림에 대해 setNotifications useState에 인자로 넣어준다.

Trouble Shooting

  • 프로젝트에서 WebServer로 nginx를 사용했는데, nginx는 WAS로 HTTP/1.0을 사용하고 Connection: close 헤더를 사용하기 때문에 지속적으로 연결이 안돼서 SSE가 작동하지 않는다. 이에 대한 설정으로 nginx.conf에 추가해준다.
proxy_set_header Connection '';
proxy_http_version 1.1;
  • open-in-view를 false로 설정하여 요청이 트랜잭션이 처리되는 동안에만 데이터베이스 연결을 열어준다.