본문 바로가기

Reactive Programming

240809금 Spring Data R2DBC

1️⃣ R2DBC란

- Reactive Relational Database Connectivity

- 관계형 데이터베이스에 리액티브 프로그래밍 API를 제공하기 위한 개방형 사양

- 드라이버 벤더가 구현하고 클라이언트가 사용하기 위한 SPI

① Reactive Relational Database Connectivity

✔️ Reactive

- 리액티브 프로그래밍을 통해 데이터를 비동기적으로 처리한다.

- 리액티브 프로그래밍은 데이터 흐름과 변화에 반응하는 방식으로 프로그래밍을 한다.

 

✔️ Relational Databaes

- 전통적인 관계형 데이터베이스

- ex) MySQL, PostgreSQL, SQL Server

 

✔️ Connectivity

- 리액티브 프로그래밍과 관계형 데이터베이스 두 가지를 연결하는 API를 제공한다.

② 관계형 데이터베이스에 리액티브 프로그래밍 API를 제공하기 위한 개방형 사양

✔️ 개방형 사양

- R2DBC는 특정 데이터베이스나 기술에 종속되지 않는 표준화된 사양

- 다양한 데이터베이스에 적용할 수 있는 방식으로 설계되었다.

③ 드라이버 벤더가 구현하고 클라이언트가 사용하기 위한 SPI

✔️ 드라이버 벤더

 - 드라이버 벤더 ?

* 특정 하드웨어나 소프트웨어를 위해 드라이버를 개발하고 제공하는 회사

* 드라이버 : 시스템 간 소통을 가능하게 하는 소프트웨어 구성 요소 (ex. 운영체제와 하드웨어, 소프트웨어와 데이터베이스)

 

- R2DBC에서의 드라이버 벤더

* 특정 데이터베이스를 리액티브 방식으로 사용할 수 있게 해주는 드라이버를 개발하고 제공하는 회사

* 예시:

  # PostgreSQL을 위한 리액티브 드라이버를 개발하는 회사

  # MySQL을 위한 리액티브 드라이버를 개발하는 회사

 

✔️ 클라이언트

- R2DBC에서의 클라이언트

* R2DBC를 사용하는 애플리케이션이나 프로그래머

 

✔️ SPI

- SPI ?

* Service Provider Interface

* 특정 기능이나 서비스를 제공하기 위한 인터페이스를 정의

* 이 인터페이스는 서비스 제공자가 구현해야 할 메서드를 포함한다.

 

- Service Provider ?

* SPI 인터페이스를 구현하는 클래스 또는 라이브러리

* 여러 SP가 동일한 SPI 인터페이스를 구현할 수 있다.

 

- R2DBC에서의 SPI

* 관계형 데이터베이스에 접근하는 기능 및 서비스를 제공하는 인터페이스

 

✔️ 정리

리액티브 프로그래밍에서 관계형 데이터베이스에 접근하기 위한 인터페이스인 SPI가 존재한다.

드라이버 벤더는 이 SPI를 구현한 클래스 또는 라이브러리로 구성된 드라이버를 제공한다.

클라이언트 애플리케이션은 이 드라이버를 사용하여 리액티브 프로그래밍 모델에 따라 관계형 데이터베이스와 비동기 및 논블로킹 방식으로 연결하고 데이터를 처리한다.

2️⃣ R2DBC와 JDBC

R2DBC : 비동기, 논블로킹 방식으로 관계형 데이터베이스에 접근한다.

JDBC : 동기, 블로킹 방식으로 관계형 데이터베이스에 접근한다.

 

예시 1

상황

- A 스레드 : 관계형 데이터베이스 접근을 요구하는 클라이언트 요청을 받았다.

 

R2DBC 예시 (비동기, 논블로킹)

1) A 스레드는 R2DBC를 통해 관계형 데이터베이스에 요청을 보낸다.

2) A 스레드는 요청을 보낸 후, 응답을 기다리지 않고 스레드 풀에 반환되어 다른 작업을 수행할 수 있는 상태가 된다. : 논블로킹

3) 관계형 데이터베이스에서 응답이 준비되면, 스레드 풀의 다른 임의의 스레드가 선택되어 이 응답을 받고, 클라이언트의 요청을 계속 처리한다.

 

JDBC 예제 (동기, 블로킹)

1) A 스레드는 JDBC를 통해 관계형 데이터베이스에 요청을 보낸다.

2) A 스레드는 요청을 보낸 후, 응답이 올 때까지 대기한다 : 블로킹

3) 관계형 데이터베이스에서 응답이 오면, A 스레드는 클라이언트의 요청을 계속 처리한다.

 

예시 2

상황

- 요청 스레드 : 클라이언트 요청을 받아 이를 적절한 워커 스레드에 할당한다. (싱글톤 패턴으로 관리된다)

- 요청 A, B, C : 공유 자원 D에 접근을 요구하는 요청. A, B, C 순서대로 요청 스레드에 들어온다.

- Z 인스턴스 : 직렬화된 큐를 사용하여 공유 자원 D에 대한 접근 요청을 직렬화된 큐(BlockingQueue 혹은 ConcurrentLinkedQueue)에 추가하여 R2DBC에 요청을 보내는 객체 (싱글톤 패턴으로 관리된다)

- D 자원 : 공유 자원. 하나의 요청이 완전히 끝나고 난 후 다른 요청을 처리해야 하는 자원이다.

 

R2DBC 예시 (비동기, 논블로킹)

1) 요청 스레드는 A, B, C 요청을 파싱하고, Z 인스턴스의 직렬화된 큐에 추가한다.

  - 요청 스레드는 A, B, C 요청을 받아 이 요청이 공유 자원 D에 대한 접근임을 파악한다.

  - 요청 스레드는 A, B, C 요청을 Z 인스턴스의 직렬화된 큐에 순서대로 추가한다.

  - 직렬화된 큐 대신 Mono/Flux 체인을 사용하여 순서를 관리할 수 있다.

 

2) 요청 스레드는 응답을 기다리지 않고 바로 다음 클라이언트 요청을 처리한다. : 논블로킹

 

3) Z 인스턴스는 직렬화된 큐에 들어온 요청을 순차적으로 처리한다.

  - Z 인스턴스는 직렬화된 큐에 들어온 요청을 순차적으로 R2DBC에 전송한다.

  - 이 과정에서 Z 인스턴스가 요청을 R2DBC에 보내는 방식은 직렬화된 큐를 사용하여 순서를 보장한다.

 

4) Z 인스턴스는 요청에 대한 응답이 올 때까지 다른 요청을 R2DBC에 보내지 않는다.

 

5) R2DBC는 Z 인스턴스로부터 요청을 받는다.

 

6) R2DBC는 요청 순서와 상관없이 비동기적으로 데이터베이스에 요청을 보낸다.

  - R2DBC는 기본적으로 동시성을 지원한다.

  - R2DBC에 여러 스레드가 할당되어있다면 병렬적으로 요청을 데이터베이스에 보낼 수 있다.

  - 순서를 보장해야하는 요청일 경우 애플리케이션 레벨에서 순서 보장 로직이 필요하다. Z 인스턴스에서 이미 순서가 보장되어있다.

 

7) 데이터베이스에는 무작위 순서로 요청이 들어온다.

 

8) 데이터베이스에서 요청을 동시에 처리한다. 

  - 데이터베이스의 서버의 CPU 코어 수 및 설정에 따라 병렬성 및 동시성 지원 여부가 결정된다.

 

9) 데이터베이스에서의 처리 순서 보장

  - 데이터베이스에서 처리 순서를 보장하려면 트랜잭션 또는 락킹 메커니즘을 사용하여 특정 자원에 대한 접근을 제어해야 한다.

  - 락킹 메커니즘 :

    * 데이터베이스 레벨에서 동시성 문제를 해결하기 위해 제공하는 기능

    * 테이블 행 또는 전체 테이블과 같은 특정 자원에 대해 Lock을 설정하면, Lock이 걸린 자원에 대한 다른 트랜잭션의 접근이 제한된다.

    * 예를 들어 자원 D에 대해 A 트랜ㄹ잭션이 락을 걸고 작업을 수행하고 있을 때, B와 C 트랜잭션은 해당 자원에 접근하지 못하고 대기하게 된다.

  - 트랜잭션 :

    * 데이터베이스 레벨에서 데이터베이스 작업의 일관성과 무결성을 보장하기 위해 제공하는 기능

    * 여러 작업(쿼리)를 하나의 단위로 묶어서 처리하며, 이 작업들이 모두 성공해야만 최종적으로 커밋된다.

 

10) 데이터베이스로부터 응답을 받은 R2DBC는 스레드 풀에서 임의로 선택된 스레드에 해당 응답을 보내고, 이 스레드는 클라이언트 요청을 계속 처리한다.

 

JDBC 예제 (동기, 블로킹)

- JDBC 자체는 동기적, 블로킹 방식으로 동작

- 하나의 커넥션은 동시에 하나의 요청만 처리할 수 있다. 즉 워커 스레드가 하나의 커넥션에 요청을 보내면 다른 스레드는 요청을 보낼 수 없다.

- 하지만 여러 커넥션을 사용하면 여러 스레드에서 각각의 커넥션을 통해 요청을 보낼 수 있다.

 

3️⃣ R2DBC 예제

| Book 테이블 접근 및 서비스

Domain Entity Class : Book.js

@Getter
@AllArgsConstructor
@NoArgsConstructor
@Setter
public class Book {
    @Id
    private long bookId;
    private String titleKorean;
    private String titleEnglish;
    private String description;
    private String author;
    private String isbn;
    private String publishDate;

    @CreatedDate
    private LocalDateTime createdAt;

    @LastModifiedDate
    @Column("last_modified_at")
    private LocalDateTime modifiedAt;
}

 

@Id : BOOK 도메인 엔티티 클래스와 BOOK 테이블을 매핑하기 위해 기본키 필드에 추가해야 하는 애너테이션

@Table : 테이블 이름 지정 가능한 애너테이션, 생략 시 클래스 이름 = 테이블 이름

@CreateDate : 생성된 시점을 자동으로 기록하는 애너테이션

@LastModifiedDate : 마지막으로 수정된 시점을 자동으로 기록하는 애너테이션

@Column : 필드를 테이블의 특정 열과 매핑해주는 애너테이션

Repository : BookRepository.js

@Repository("bookRepositoryV5")
public interface BookRepository extends ReactiveCrudRepository<Book, Long> {
    Mono<Book> findByIsbn(String isbn);
}

- 테이블과 인터랙션하기 위한 Repository

- ReactiveCrudRepository를 상속받는다.

- findByIsbn() : isbn을 조회 조건으로 가지는 쿼리 메서드

- ReactiveCrudRepository에서 상속받은 쿼리 메서드

구분 메서드 설명
CREATE /
UPDATE
<S extends T> Mono<S> save(S entity) 주어진 엔티티를 저장합니다. 이미 존재하는 엔티티라면 업데이트하고, 그렇지 않으면 새 엔티티를 삽입합니다.
CREATE <S extends T> Flux<S> saveAll(Iterable<S> entities) 주어진 엔티티들을 모두 저장합니다. Iterable을 사용해 여러 엔티티를 저장합니다.
  <S extends T> Flux<S> saveAll(Publisher<S> entityStream) Publisher 스트림을 통해 주어진 엔티티들을 모두 저장합니다.
READ Mono<T> findById(ID id) 주어진 ID에 해당하는 엔티티를 찾습니다.
  Mono<T> findById(Publisher<ID> idPublisher) Publisher 스트림을 통해 주어진 ID에 해당하는 엔티티를 찾습니다.
  Mono<Boolean> existsById(ID id) 주어진 ID에 해당하는 엔티티가 존재하는지 확인합니다.
  Mono<Boolean> existsById(Publisher<ID> idPublisher) Publisher 스트림을 통해 주어진 ID에 해당하는 엔티티가 존재하는지 확인합니다.
  Flux<T> findAll() 모든 엔티티를 반환합니다.
  Flux<T> findAllById(Iterable<ID> ids) 주어진 ID들에 해당하는 모든 엔티티를 반환합니다.
  Flux<T> findAllById(Publisher<ID> idStream): Publisher 스트림을 통해 주어진 ID들에 해당하는 모든 엔티티를 반환합니다.
  Mono<Long> count(): 저장된 모든 엔티티의 수를 반환합니다.
DELEATE Mono<Void> deleteById(ID id): 주어진 ID에 해당하는 엔티티를 삭제합니다.
  Mono<Void> deleteById(Publisher<ID> idPublisher): Publisher 스트림을 통해 주어진 ID에 해당하는 엔티티를 삭제합니다.
  Mono<Void> delete(T entity): 주어진 엔티티를 삭제합니다.
  Mono<Void> deleteAllById(Iterable<? extends ID> ids): 주어진 ID들에 해당하는 모든 엔티티를 삭제합니다.
  Mono<Void> deleteAllById(Publisher<? extends ID> idStream): Publisher 스트림을 통해 주어진 ID들에 해당하는 모든 엔티티를 삭제합니다.
  Mono<Void> deleteAll(Iterable<? extends T> entities): 주어진 엔티티들을 모두 삭제합니다.
  Mono<Void> deleteAll(Publisher<? extends T> entityStream): Publisher 스트림을 통해 주어진 엔티티들을 모두 삭제합니다.
  Mono<Void> deleteAll():

모든 엔티티를 삭제합니다.

Service

@Slf4j
@Service("bookServiceV5")
@RequiredArgsConstructor
public class BookService {
    private final @NonNull BookRepository bookRepository;
    private final @NonNull CustomBeanUtils<Book> beanUtils;

    public Mono<Book> saveBook(Book book) {
        return verifyExistIsbn(book.getIsbn())
                .then(bookRepository.save(book));
    }

    public Mono<Book> updateBook(Book book) {
        return findVerifiedBook(book.getBookId())
                .map(findBook -> beanUtils.copyNonNullProperties(book, findBook))
                .flatMap(updatingBook -> bookRepository.save(updatingBook));
    }

    public Mono<Book> findBook(long bookId) {
        return findVerifiedBook(bookId);
    }

    public Mono<List<Book>> findBooks() {
        return bookRepository.findAll().collectList();
    }

    private Mono<Void> verifyExistIsbn(String isbn) {
        return bookRepository.findByIsbn(isbn)
                .flatMap(findBook -> {
                    if (findBook != null) {
                        return Mono.error(new BusinessLogicException(
                                                    ExceptionCode.BOOK_EXISTS));
                    }
                    return Mono.empty();
                });
    }

    private Mono<Book> findVerifiedBook(long bookId) {
        return bookRepository
                .findById(bookId)
                .switchIfEmpty(Mono.error(new BusinessLogicException(
                                                    ExceptionCode.BOOK_NOT_FOUND)));
    }
}

구성요소

- BookRepository : 책에 대한 CRUD 작업을 수행하는 Reactive Repository

- CustomBeanUtils<Boo> : Book 객체 간의 프로퍼티를 복사하는 유틸리티 클래스

  'copyNonNullProperties' : 널이 아닌 프로퍼티만을 기존 객체에 복사

- BusinessLogicException : 비즈니스 로직에서 발생하는 예외를 처리하기 위한 사용자 정의 예외 클래스

- ExceptionCode : 다양한 예외 상황을 식별하는 코드가 정의된 enum 클래스

 

주요 메서드

1. saveBook

2. updateBook

3. findBook

4. findBooks

5. verifyExisIsbn

6. findVerifiedBook

saveBook(Book book) - 새로운 책을 저장하는 메서드
- 저장하려는 책의 ISBN이 존재하는지 확인 후, 중복되지 않은 경우에만 저장
- ISBN이 중복되면 'BusinessLogicException' 발생시켜 저장 중단
updateBook(Book book) - 기존 책 정보 업데이트 메서드
- 업데이트하려는 책의 ID로 조회 후
- 업데이트된 정보를 데이터베이스에 저장
findBook(long bookId) - 특정 책 ID 기반으로 책 정보를 조회하는 메서드
- ID에 해당하는 책이 없으면 'BusinessLogicException' 발생시킴
findBooks() - 모든 책 정보 조회 메서드
- List<Book> 형식으로 변환하여 반환
verifyExisIsbn(String isbn) - 주어진 ISBN이 이미 존재하는지 확인
- 존재하면 'BusinessLogicException' 발생시킴
findVerifiedBook(long bookId) - 특정 책의 ID를 기반으로 책을 조회하는 내부 유틸리티 메서드
- ID에 해당하는 책이 없으면 'BusinessLogicException' 발생시킴

BookHandler

@Slf4j
@Component("BookHandlerV5")
public class BookHandler {
    private final BookMapper mapper;
    private final BookValidator validator;
    private final BookService bookService;

    public BookHandler(BookMapper mapper, BookValidator validator, BookService bookService) {
        this.mapper = mapper;
        this.validator = validator;
        this.bookService = bookService;
    }

    public Mono<ServerResponse> createBook(ServerRequest request) {
        return request.bodyToMono(BookDto.Post.class)
                .doOnNext(post -> validator.validate(post))
                .flatMap(post -> bookService.saveBook(mapper.bookPostToBook(post)))
                .flatMap(book -> ServerResponse
                        .created(URI.create("/v5/books/" + book.getBookId()))
                        .build());
    }

    public Mono<ServerResponse> updateBook(ServerRequest request) {
        final long bookId = Long.valueOf(request.pathVariable("book-id"));
        return request
                .bodyToMono(BookDto.Patch.class)
                .doOnNext(patch -> validator.validate(patch))
                .flatMap(patch -> {
                    patch.setBookId(bookId);
                    return bookService.updateBook(mapper.bookPatchToBook(patch));
                })
                .flatMap(book -> ServerResponse.ok()
                                        .bodyValue(mapper.bookToResponse(book)));
    }

    public Mono<ServerResponse> getBook(ServerRequest request) {
        long bookId = Long.valueOf(request.pathVariable("book-id"));

        return bookService.findBook(bookId)
                        .flatMap(book -> ServerResponse
                                .ok()
                                .bodyValue(mapper.bookToResponse(book)));
    }

    public Mono<ServerResponse> getBooks(ServerRequest request) {
        return bookService.findBooks()
                .flatMap(books -> ServerResponse
                        .ok()
                        .bodyValue(mapper.booksToResponse(books)));
    }
}

구성요소

- BookMapper : 'Book' 객체와 DTO 간 변환을 담당하는 클래스

- BookValidator : 'Book' 객체나 DTO 유효성 검사 클래스

- BookService : 실제 비즈니스 로직을 처리하는 서비스 계층

 

주요 메서드

1. createBook

2. updateBook

3. getBook

4. getBooks

createBook - HTTP POST 요청 처리
- 새로운 책을 생성
- 요청 본문을 BookDto.Post 객체로 변환 후 유효성 검사
- BookMapper로 DTO를 엔티티로 변환
- BookService로 책을 저장
- 저장 성공 시 생성된 URI와 201 Created 응답 반환
updateBook - HTTP PATCH 요청 처리
- 기존의 책 정보 업데이트
- 요청 경로에서 책 ID 추출, 요청 본문을 BookDto.Patch로 변환 후 유효성 검사
- DTO 객체에 책 ID 설정
- BootMapper로 DTO를 엔티티로 변환
- BookService로 업데이트
- 성공 시 업데이트된 책 정보를 포함한 200 OK 응답 반환
getBook - HTTP GET 요청 처리
- 특정 책 정보 조회
- 요청 경로에서 책 ID 추출
- BookService로 책 조회
- 조회된 책 정보를 포함한 200 OK 응답 반환
getBooks - HTTP GET 요청 처리
- 여러 책 정보 조회
- BookService로 모든 책을 조회
- BookMapper로 DTO 변환 후 200 OK 응답 반환