1️⃣ R2DBC란
- Reactive Relational Database Connectivity
- 관계형 데이터베이스에 리액티브 프로그래밍 API를 제공하기 위한 개방형 사양
- 드라이버 벤더가 구현하고 클라이언트가 사용하기 위한 SPI
① Reactive Relational Database Connectivity
✔️ Reactive
- 리액티브 프로그래밍을 통해 데이터를 비동기적으로 처리한다.
- 리액티브 프로그래밍은 데이터 흐름과 변화에 반응하는 방식으로 프로그래밍을 한다.
✔️ Relational Databaes
- 전통적인 관계형 데이터베이스
- ex) MySQL, PostgreSQL, SQL Server
✔️ Connectivity
- 리액티브 프로그래밍과 관계형 데이터베이스 두 가지를 연결하는 API를 제공한다.
② 관계형 데이터베이스에 리액티브 프로그래밍 API를 제공하기 위한 개방형 사양
✔️ 개방형 사양
- R2DBC는 특정 데이터베이스나 기술에 종속되지 않는 표준화된 사양
- 다양한 데이터베이스에 적용할 수 있는 방식으로 설계되었다.
③ 드라이버 벤더가 구현하고 클라이언트가 사용하기 위한 SPI
✔️ 드라이버 벤더
- 드라이버 벤더 ?
* 특정 하드웨어나 소프트웨어를 위해 드라이버를 개발하고 제공하는 회사
* 드라이버 : 시스템 간 소통을 가능하게 하는 소프트웨어 구성 요소 (ex. 운영체제와 하드웨어, 소프트웨어와 데이터베이스)
- R2DBC에서의 드라이버 벤더
* 특정 데이터베이스를 리액티브 방식으로 사용할 수 있게 해주는 드라이버를 개발하고 제공하는 회사
* 예시:
# PostgreSQL을 위한 리액티브 드라이버를 개발하는 회사
# MySQL을 위한 리액티브 드라이버를 개발하는 회사
✔️ 클라이언트
- R2DBC에서의 클라이언트
* R2DBC를 사용하는 애플리케이션이나 프로그래머
✔️ SPI
- SPI ?
* Service Provider Interface
* 특정 기능이나 서비스를 제공하기 위한 인터페이스를 정의
* 이 인터페이스는 서비스 제공자가 구현해야 할 메서드를 포함한다.
- Service Provider ?
* SPI 인터페이스를 구현하는 클래스 또는 라이브러리
* 여러 SP가 동일한 SPI 인터페이스를 구현할 수 있다.
- R2DBC에서의 SPI
* 관계형 데이터베이스에 접근하는 기능 및 서비스를 제공하는 인터페이스
✔️ 정리
리액티브 프로그래밍에서 관계형 데이터베이스에 접근하기 위한 인터페이스인 SPI가 존재한다.
드라이버 벤더는 이 SPI를 구현한 클래스 또는 라이브러리로 구성된 드라이버를 제공한다.
클라이언트 애플리케이션은 이 드라이버를 사용하여 리액티브 프로그래밍 모델에 따라 관계형 데이터베이스와 비동기 및 논블로킹 방식으로 연결하고 데이터를 처리한다.
2️⃣ R2DBC와 JDBC
R2DBC : 비동기, 논블로킹 방식으로 관계형 데이터베이스에 접근한다.
JDBC : 동기, 블로킹 방식으로 관계형 데이터베이스에 접근한다.
예시 1
상황
- A 스레드 : 관계형 데이터베이스 접근을 요구하는 클라이언트 요청을 받았다.
R2DBC 예시 (비동기, 논블로킹)
1) A 스레드는 R2DBC를 통해 관계형 데이터베이스에 요청을 보낸다.
2) A 스레드는 요청을 보낸 후, 응답을 기다리지 않고 스레드 풀에 반환되어 다른 작업을 수행할 수 있는 상태가 된다. : 논블로킹
3) 관계형 데이터베이스에서 응답이 준비되면, 스레드 풀의 다른 임의의 스레드가 선택되어 이 응답을 받고, 클라이언트의 요청을 계속 처리한다.
JDBC 예제 (동기, 블로킹)
1) A 스레드는 JDBC를 통해 관계형 데이터베이스에 요청을 보낸다.
2) A 스레드는 요청을 보낸 후, 응답이 올 때까지 대기한다 : 블로킹
3) 관계형 데이터베이스에서 응답이 오면, A 스레드는 클라이언트의 요청을 계속 처리한다.
예시 2
상황
- 요청 스레드 : 클라이언트 요청을 받아 이를 적절한 워커 스레드에 할당한다. (싱글톤 패턴으로 관리된다)
- 요청 A, B, C : 공유 자원 D에 접근을 요구하는 요청. A, B, C 순서대로 요청 스레드에 들어온다.
- Z 인스턴스 : 직렬화된 큐를 사용하여 공유 자원 D에 대한 접근 요청을 직렬화된 큐(BlockingQueue 혹은 ConcurrentLinkedQueue)에 추가하여 R2DBC에 요청을 보내는 객체 (싱글톤 패턴으로 관리된다)
- D 자원 : 공유 자원. 하나의 요청이 완전히 끝나고 난 후 다른 요청을 처리해야 하는 자원이다.
R2DBC 예시 (비동기, 논블로킹)
1) 요청 스레드는 A, B, C 요청을 파싱하고, Z 인스턴스의 직렬화된 큐에 추가한다.
- 요청 스레드는 A, B, C 요청을 받아 이 요청이 공유 자원 D에 대한 접근임을 파악한다.
- 요청 스레드는 A, B, C 요청을 Z 인스턴스의 직렬화된 큐에 순서대로 추가한다.
- 직렬화된 큐 대신 Mono/Flux 체인을 사용하여 순서를 관리할 수 있다.
2) 요청 스레드는 응답을 기다리지 않고 바로 다음 클라이언트 요청을 처리한다. : 논블로킹
3) Z 인스턴스는 직렬화된 큐에 들어온 요청을 순차적으로 처리한다.
- Z 인스턴스는 직렬화된 큐에 들어온 요청을 순차적으로 R2DBC에 전송한다.
- 이 과정에서 Z 인스턴스가 요청을 R2DBC에 보내는 방식은 직렬화된 큐를 사용하여 순서를 보장한다.
4) Z 인스턴스는 요청에 대한 응답이 올 때까지 다른 요청을 R2DBC에 보내지 않는다.
5) R2DBC는 Z 인스턴스로부터 요청을 받는다.
6) R2DBC는 요청 순서와 상관없이 비동기적으로 데이터베이스에 요청을 보낸다.
- R2DBC는 기본적으로 동시성을 지원한다.
- R2DBC에 여러 스레드가 할당되어있다면 병렬적으로 요청을 데이터베이스에 보낼 수 있다.
- 순서를 보장해야하는 요청일 경우 애플리케이션 레벨에서 순서 보장 로직이 필요하다. Z 인스턴스에서 이미 순서가 보장되어있다.
7) 데이터베이스에는 무작위 순서로 요청이 들어온다.
8) 데이터베이스에서 요청을 동시에 처리한다.
- 데이터베이스의 서버의 CPU 코어 수 및 설정에 따라 병렬성 및 동시성 지원 여부가 결정된다.
9) 데이터베이스에서의 처리 순서 보장
- 데이터베이스에서 처리 순서를 보장하려면 트랜잭션 또는 락킹 메커니즘을 사용하여 특정 자원에 대한 접근을 제어해야 한다.
- 락킹 메커니즘 :
* 데이터베이스 레벨에서 동시성 문제를 해결하기 위해 제공하는 기능
* 테이블 행 또는 전체 테이블과 같은 특정 자원에 대해 Lock을 설정하면, Lock이 걸린 자원에 대한 다른 트랜잭션의 접근이 제한된다.
* 예를 들어 자원 D에 대해 A 트랜ㄹ잭션이 락을 걸고 작업을 수행하고 있을 때, B와 C 트랜잭션은 해당 자원에 접근하지 못하고 대기하게 된다.
- 트랜잭션 :
* 데이터베이스 레벨에서 데이터베이스 작업의 일관성과 무결성을 보장하기 위해 제공하는 기능
* 여러 작업(쿼리)를 하나의 단위로 묶어서 처리하며, 이 작업들이 모두 성공해야만 최종적으로 커밋된다.
10) 데이터베이스로부터 응답을 받은 R2DBC는 스레드 풀에서 임의로 선택된 스레드에 해당 응답을 보내고, 이 스레드는 클라이언트 요청을 계속 처리한다.
JDBC 예제 (동기, 블로킹)
- JDBC 자체는 동기적, 블로킹 방식으로 동작
- 하나의 커넥션은 동시에 하나의 요청만 처리할 수 있다. 즉 워커 스레드가 하나의 커넥션에 요청을 보내면 다른 스레드는 요청을 보낼 수 없다.
- 하지만 여러 커넥션을 사용하면 여러 스레드에서 각각의 커넥션을 통해 요청을 보낼 수 있다.
3️⃣ R2DBC 예제
| Book 테이블 접근 및 서비스
Domain Entity Class : Book.js
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Setter
public class Book {
@Id
private long bookId;
private String titleKorean;
private String titleEnglish;
private String description;
private String author;
private String isbn;
private String publishDate;
@CreatedDate
private LocalDateTime createdAt;
@LastModifiedDate
@Column("last_modified_at")
private LocalDateTime modifiedAt;
}
@Id : BOOK 도메인 엔티티 클래스와 BOOK 테이블을 매핑하기 위해 기본키 필드에 추가해야 하는 애너테이션
@Table : 테이블 이름 지정 가능한 애너테이션, 생략 시 클래스 이름 = 테이블 이름
@CreateDate : 생성된 시점을 자동으로 기록하는 애너테이션
@LastModifiedDate : 마지막으로 수정된 시점을 자동으로 기록하는 애너테이션
@Column : 필드를 테이블의 특정 열과 매핑해주는 애너테이션
Repository : BookRepository.js
@Repository("bookRepositoryV5")
public interface BookRepository extends ReactiveCrudRepository<Book, Long> {
Mono<Book> findByIsbn(String isbn);
}
- 테이블과 인터랙션하기 위한 Repository
- ReactiveCrudRepository를 상속받는다.
- findByIsbn() : isbn을 조회 조건으로 가지는 쿼리 메서드
- ReactiveCrudRepository에서 상속받은 쿼리 메서드
구분 | 메서드 | 설명 |
CREATE / UPDATE |
<S extends T> Mono<S> save(S entity) | 주어진 엔티티를 저장합니다. 이미 존재하는 엔티티라면 업데이트하고, 그렇지 않으면 새 엔티티를 삽입합니다. |
CREATE | <S extends T> Flux<S> saveAll(Iterable<S> entities) | 주어진 엔티티들을 모두 저장합니다. Iterable을 사용해 여러 엔티티를 저장합니다. |
<S extends T> Flux<S> saveAll(Publisher<S> entityStream) | Publisher 스트림을 통해 주어진 엔티티들을 모두 저장합니다. | |
READ | Mono<T> findById(ID id) | 주어진 ID에 해당하는 엔티티를 찾습니다. |
Mono<T> findById(Publisher<ID> idPublisher) | Publisher 스트림을 통해 주어진 ID에 해당하는 엔티티를 찾습니다. | |
Mono<Boolean> existsById(ID id) | 주어진 ID에 해당하는 엔티티가 존재하는지 확인합니다. | |
Mono<Boolean> existsById(Publisher<ID> idPublisher) | Publisher 스트림을 통해 주어진 ID에 해당하는 엔티티가 존재하는지 확인합니다. | |
Flux<T> findAll() | 모든 엔티티를 반환합니다. | |
Flux<T> findAllById(Iterable<ID> ids) | 주어진 ID들에 해당하는 모든 엔티티를 반환합니다. | |
Flux<T> findAllById(Publisher<ID> idStream): | Publisher 스트림을 통해 주어진 ID들에 해당하는 모든 엔티티를 반환합니다. | |
Mono<Long> count(): | 저장된 모든 엔티티의 수를 반환합니다. | |
DELEATE | Mono<Void> deleteById(ID id): | 주어진 ID에 해당하는 엔티티를 삭제합니다. |
Mono<Void> deleteById(Publisher<ID> idPublisher): | Publisher 스트림을 통해 주어진 ID에 해당하는 엔티티를 삭제합니다. | |
Mono<Void> delete(T entity): | 주어진 엔티티를 삭제합니다. | |
Mono<Void> deleteAllById(Iterable<? extends ID> ids): | 주어진 ID들에 해당하는 모든 엔티티를 삭제합니다. | |
Mono<Void> deleteAllById(Publisher<? extends ID> idStream): | Publisher 스트림을 통해 주어진 ID들에 해당하는 모든 엔티티를 삭제합니다. | |
Mono<Void> deleteAll(Iterable<? extends T> entities): | 주어진 엔티티들을 모두 삭제합니다. | |
Mono<Void> deleteAll(Publisher<? extends T> entityStream): | Publisher 스트림을 통해 주어진 엔티티들을 모두 삭제합니다. | |
Mono<Void> deleteAll(): |
모든 엔티티를 삭제합니다. |
Service
@Slf4j
@Service("bookServiceV5")
@RequiredArgsConstructor
public class BookService {
private final @NonNull BookRepository bookRepository;
private final @NonNull CustomBeanUtils<Book> beanUtils;
public Mono<Book> saveBook(Book book) {
return verifyExistIsbn(book.getIsbn())
.then(bookRepository.save(book));
}
public Mono<Book> updateBook(Book book) {
return findVerifiedBook(book.getBookId())
.map(findBook -> beanUtils.copyNonNullProperties(book, findBook))
.flatMap(updatingBook -> bookRepository.save(updatingBook));
}
public Mono<Book> findBook(long bookId) {
return findVerifiedBook(bookId);
}
public Mono<List<Book>> findBooks() {
return bookRepository.findAll().collectList();
}
private Mono<Void> verifyExistIsbn(String isbn) {
return bookRepository.findByIsbn(isbn)
.flatMap(findBook -> {
if (findBook != null) {
return Mono.error(new BusinessLogicException(
ExceptionCode.BOOK_EXISTS));
}
return Mono.empty();
});
}
private Mono<Book> findVerifiedBook(long bookId) {
return bookRepository
.findById(bookId)
.switchIfEmpty(Mono.error(new BusinessLogicException(
ExceptionCode.BOOK_NOT_FOUND)));
}
}
구성요소
- BookRepository : 책에 대한 CRUD 작업을 수행하는 Reactive Repository
- CustomBeanUtils<Boo> : Book 객체 간의 프로퍼티를 복사하는 유틸리티 클래스
'copyNonNullProperties' : 널이 아닌 프로퍼티만을 기존 객체에 복사
- BusinessLogicException : 비즈니스 로직에서 발생하는 예외를 처리하기 위한 사용자 정의 예외 클래스
- ExceptionCode : 다양한 예외 상황을 식별하는 코드가 정의된 enum 클래스
주요 메서드
1. saveBook
2. updateBook
3. findBook
4. findBooks
5. verifyExisIsbn
6. findVerifiedBook
saveBook(Book book) | - 새로운 책을 저장하는 메서드 - 저장하려는 책의 ISBN이 존재하는지 확인 후, 중복되지 않은 경우에만 저장 - ISBN이 중복되면 'BusinessLogicException' 발생시켜 저장 중단 |
updateBook(Book book) | - 기존 책 정보 업데이트 메서드 - 업데이트하려는 책의 ID로 조회 후 - 업데이트된 정보를 데이터베이스에 저장 |
findBook(long bookId) | - 특정 책 ID 기반으로 책 정보를 조회하는 메서드 - ID에 해당하는 책이 없으면 'BusinessLogicException' 발생시킴 |
findBooks() | - 모든 책 정보 조회 메서드 - List<Book> 형식으로 변환하여 반환 |
verifyExisIsbn(String isbn) | - 주어진 ISBN이 이미 존재하는지 확인 - 존재하면 'BusinessLogicException' 발생시킴 |
findVerifiedBook(long bookId) | - 특정 책의 ID를 기반으로 책을 조회하는 내부 유틸리티 메서드 - ID에 해당하는 책이 없으면 'BusinessLogicException' 발생시킴 |
BookHandler
@Slf4j
@Component("BookHandlerV5")
public class BookHandler {
private final BookMapper mapper;
private final BookValidator validator;
private final BookService bookService;
public BookHandler(BookMapper mapper, BookValidator validator, BookService bookService) {
this.mapper = mapper;
this.validator = validator;
this.bookService = bookService;
}
public Mono<ServerResponse> createBook(ServerRequest request) {
return request.bodyToMono(BookDto.Post.class)
.doOnNext(post -> validator.validate(post))
.flatMap(post -> bookService.saveBook(mapper.bookPostToBook(post)))
.flatMap(book -> ServerResponse
.created(URI.create("/v5/books/" + book.getBookId()))
.build());
}
public Mono<ServerResponse> updateBook(ServerRequest request) {
final long bookId = Long.valueOf(request.pathVariable("book-id"));
return request
.bodyToMono(BookDto.Patch.class)
.doOnNext(patch -> validator.validate(patch))
.flatMap(patch -> {
patch.setBookId(bookId);
return bookService.updateBook(mapper.bookPatchToBook(patch));
})
.flatMap(book -> ServerResponse.ok()
.bodyValue(mapper.bookToResponse(book)));
}
public Mono<ServerResponse> getBook(ServerRequest request) {
long bookId = Long.valueOf(request.pathVariable("book-id"));
return bookService.findBook(bookId)
.flatMap(book -> ServerResponse
.ok()
.bodyValue(mapper.bookToResponse(book)));
}
public Mono<ServerResponse> getBooks(ServerRequest request) {
return bookService.findBooks()
.flatMap(books -> ServerResponse
.ok()
.bodyValue(mapper.booksToResponse(books)));
}
}
구성요소
- BookMapper : 'Book' 객체와 DTO 간 변환을 담당하는 클래스
- BookValidator : 'Book' 객체나 DTO 유효성 검사 클래스
- BookService : 실제 비즈니스 로직을 처리하는 서비스 계층
주요 메서드
1. createBook
2. updateBook
3. getBook
4. getBooks
createBook | - HTTP POST 요청 처리 - 새로운 책을 생성 - 요청 본문을 BookDto.Post 객체로 변환 후 유효성 검사 - BookMapper로 DTO를 엔티티로 변환 - BookService로 책을 저장 - 저장 성공 시 생성된 URI와 201 Created 응답 반환 |
updateBook | - HTTP PATCH 요청 처리 - 기존의 책 정보 업데이트 - 요청 경로에서 책 ID 추출, 요청 본문을 BookDto.Patch로 변환 후 유효성 검사 - DTO 객체에 책 ID 설정 - BootMapper로 DTO를 엔티티로 변환 - BookService로 업데이트 - 성공 시 업데이트된 책 정보를 포함한 200 OK 응답 반환 |
getBook | - HTTP GET 요청 처리 - 특정 책 정보 조회 - 요청 경로에서 책 ID 추출 - BookService로 책 조회 - 조회된 책 정보를 포함한 200 OK 응답 반환 |
getBooks | - HTTP GET 요청 처리 - 여러 책 정보 조회 - BookService로 모든 책을 조회 - BookMapper로 DTO 변환 후 200 OK 응답 반환 |
'Reactive Programming' 카테고리의 다른 글
240815 WebClient (0) | 2024.08.15 |
---|---|
240815 Reactive Streaming 데이터 처리 (0) | 2024.08.15 |
240804 Spring MVC vs Spring WebFlux (0) | 2024.08.04 |
240804 Operators :: Sequence 내부 동작 확인 (0) | 2024.08.04 |
240804 Operators :: Sequence 변환 (0) | 2024.08.04 |