1. Controller

스프링 웹 플럭스는 컨트롤러에 대해서 두 가지 등록 방식을 제공한다.

1.1. functional endpoint

@Component
public class SampleHandler {

    public Mono<ServerResponse> getString(ServerRequest request) {
        return ServerResponse.ok().bodyValue("hello, functional endpoint");
    }
}
@RequiredArgsConstructor
@Configuration
public class RouteConfig {

    private final SampleHandler sampleHandler;

    @Bean
    public RouterFunction<ServerResponse> route() {
        return RouterFunctions.route()
                .GET("/hello-functional", sampleHandler::getString)
                .build();
    }
}

1.2. annotaion endpoint

reactor라고 한다면 publisher <--> subscriber 서로 데이터를 주고 받아야 한다. subscriber가 구독을 하지 않으면 동작하지 않는 구조를 갖고 있는데, 여기서는 구독 자체가 보이질 않는다. 이는 스프링 웹 플럭스가 리턴되는 값들에 대해서 별도로 구독을 하고 있다고 보면 된다.

@RestController
public class SampleController {

    @GetMapping("/sample/hello")
    public Mono<String> getHello() {
        return Mono.just("hello rest controller with webflux");
    }

}

@RequiredArgsConstructor
@RestController
@RequestMapping("/users")
public class UserController {

    private final UserService userService;

    @PostMapping("")
    public Mono<UserResponse> createUser(@RequestBody UserCreateRequest request) {
        return userService.create(request.getName(), request.getEmail())
                .map(UserResponse::of);
    }

    @GetMapping("")
    public Flux<UserResponse> findAllUsers() {
        return userService.findAll()
                .map(UserResponse::of);
    }

    @GetMapping("/{id}")
    public Mono<ResponseEntity<UserResponse>> findUser(@PathVariable Long id) {
        return userService.findById(id)
                .map(u -> ResponseEntity.ok(UserResponse.of(u)))
                .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
    }

    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<?>> deleteUser(@PathVariable Long id) {
        return userService.deleteById(id).then(
                Mono.just(ResponseEntity.noContent().build()) // 응답 시 no content(204)
        );
    }

    @PutMapping("/{id}")
    public Mono<ResponseEntity<UserResponse>> updateUser(@PathVariable Long id, @RequestBody UserUpdateRequest request) {
        // user(x) -> 404 Not Fount, user(o) -> 200 ok
        return userService.update(id, request.getName(), request.getEmail())
                .map(u -> ResponseEntity.ok(UserResponse.of(u)))
                .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
    }
}

2. Service

@RequiredArgsConstructor
@Service
public class UserService {

    private final UserRepository userRepository;

    public Mono<User> create(String name, String email) {
        return userRepository.save(User.builder().name(name).email(email).build());
    }

    public Flux<User> findAll() {
        return userRepository.findAll();
    }

    public Mono<User> findById(Long id) {
        return userRepository.findById(id);
    }

    public Mono<Integer> deleteById(Long id) {
        return userRepository.deleteById(id);
    }

    public Mono<User> update(Long id, String name, String email) {
// 1.해당 사용자를 조회 2.데이터를 변경하고 저장
return userRepository.findById(id)
                .flatMap(u -> {
                    u.setName(name);
                    u.setEmail(email);
                    return userRepository.save(u);
                });

// map을 하지 않은 이유: map은 값을 리턴해야 하는데,이때 flatMap에서 전달되는 값이 Mono이기 때문에
//        return userRepository.findById(id)
//                .map(u -> {
//                    u.setName(name);
//                    u.setEmail(email);
//                    return userRepository.save(u);
//                )
//                //위에서 리턴한 값이 Mono<Mono<User>>형태가 된다.
//                .flatMap(u -> {
//                    u.setName(name);
//                    u.setEmail(email);
//                    return userRepository.save(u);
//                });
//                }
}
}

3. Repository

@Repository
public class UserRepositoryImpl implements UserRepository{

    private final ConcurrentHashMap<Long, User> userHashMap = new ConcurrentHashMap<>();
    private AtomicLong sequence = new AtomicLong(1L);

    // create, update
    @Override
    public Mono<User> save(User user) {
        LocalDateTime now = LocalDateTime.now();

        if(user.getId() == null) {
            user.setId(sequence.getAndIncrement());
            user.setCreateAt(now);
        }

        user.setUpdatedAt(now);
        userHashMap.put(user.getId(), user);
        return Mono.just(user);
    }

    @Override
    public Flux<User> findAll() {
        return Flux.fromIterable(userHashMap.values());
    }

    @Override
    public Mono<User> findById(Long id) {
        return Mono.justOrEmpty(userHashMap.getOrDefault(id, null));
    }

    @Override
    public Mono<Integer> deleteById(Long id) {
        User user = userHashMap.getOrDefault(id, null);

        if(user == null) {
            return Mono.just(0);
        }

        userHashMap.remove(id);
        return Mono.just(1);
    }
}

비동기의 장점을 생각해 보면, 웹 플럭스에서는 웹 클라이언트를 이용해서 다수의 서버에 동시에 요청을 하고 이를 aggregation 하여 응답할 수 있는 것 또한 가능하다.

Untitled