Reactive Streams 에 대해서

태그:

토비의 봄 영상을 통해 Reactive Streams 공부를 시작하였는데 영상 내용을 기반으로 추가적인 조사를 통해 그에 대한 내용과 궁금증을 함께 정리한 내용이다. 본격적으로 Spring WebFlux를 깊이있게 다루기 전에 가장 기본이 되는 내용에 대해서 정리하였다. 이 포스팅에서는 아래와 같은 내용을 다룬다.

  • Iterator 패턴과 Observer 패턴의 비교를 통해서 Reactive Streams를 이해하는 기본에 대해서 다룬다.
  • Reactive Streams이 사용되는 Reactive 시스템 아키텍처 디자인에 대해서 살펴본다.
  • 마지막으로 Reactive Streams 표준에 대해서 살펴본다.

Iterable에 대해서

우리는 보통 여러개의 아이템들이 주어질 때 그것을 하나씩 순회하면서 아이템들을 핸들링하는 대표적인 방법 중 하나가 for-loop를 사용하는 것이다.

Java에서는 여러개의 아이템을 나타내는 방법엔 Java native하게 지원되는 배열 개념과 Collection 인터페이스를 구현한 클래스를 통해서 나타낼 수 있다. 그런데 Collection은 Iterable이란 인터페이스를 상속받고 있는 것을 알 수 있다. 그리고 Iterable 인터페이스를 살펴보면 이를 통해 우리는 for (T t : this) 와 같은 문법을 사용할 수 있게 된다고 설명하고 있다.

/**
 * The root interface in the <i>collection hierarchy</i>.  A collection
 * represents a group of objects, known as its <i>elements</i>. 
 */
public interface Collection<E> extends Iterable<E> {
	...
}
/**
 * Implementing this interface allows an object to be the target of the enhanced
 * {@code for} statement (sometimes called the "for-each loop" statement).
 */
public interface Iterable<T> {
    Iterator<T> iterator();
}

우리는 그렇다면 Collection 인터페이스를 구현할 필요 없이 Iterable 인터페이스만 구현해주면 for (T t : this) 문법은 사용할 수 있는 것이다. 아래의 코드를 살펴보면 단순히 start 변수의 값만 증가시켜주면서 for-loop를 도는 것을 확인할 수 있다.

public static void main(String[] args) {
    Iterable<Integer> iter = () -> new Iterator<Integer>() {
        private int start = 0;
        private final static int MAX = 10;
        @Override
        public boolean hasNext() {
            return start < MAX;
        }
        @Override
        public Integer next() {
            return ++start;
        }
    };
    for (Integer integer : iter) {
        System.out.println(integer);
    }
}

Observable에 대해서

그런데 Iterator와 완전히 달라보이는 Observer 패턴에 대해서 Erik Meijer가 Observer와 Subscriber의 구조가 Iterable 구조와 동일한 기능을 하지만 다르게 표현하고 있다고 말한다.

아이템을 처리하는 입장에서 Iterable과 Observable을 비교해서 보자면, Iterable의 구조는 여러 개의 아이템에 대해서 내가 하나씩 가져오는 메커니즘인 반면에 Observable은 하나씩 나에게 주는 구조라고 볼 수 있다.

GoF Observer 패턴의 목적을 통해서도 그 느낌을 살펴볼 수 있다.

객체 사이에 일 대 다의 의존 관계를 정의해 두어, 어떤 객체의 상태가 변할 때 그 객체에 의존성을 가진 다른 객체들이 그 변화를 통지받고 자동으로 갱신될 수 있게 만듭니다.

/**
 * @deprecated
 * This class and the {@link Observer} interface have been deprecated.
 * The event model supported by {@code Observer} and {@code Observable}
 * is quite limited, the order of notifications delivered by
 * {@code Observable} is unspecified, and state changes are not in
 * one-for-one correspondence with notifications.
 * For a richer event model, consider using the
 * {@link java.beans} package.  For reliable and ordered
 * messaging among threads, consider using one of the concurrent data
 * structures in the {@link java.util.concurrent} package.
 * For reactive streams style programming, see the
 * {@link java.util.concurrent.Flow} API.
 */
@Deprecated(since="9")
public class Observable {
	public synchronized void addObserver(Observer o) { ... }
    public synchronized void deleteObserver(Observer o) { ... }
    public void notifyObservers(Object arg) { ... }
    
    protected synchronized void setChanged() { ... }
    public synchronized boolean hasChanged() { ...}
}

public interface Observer {
	void update(Observable o, Object arg);
}

현재는 deprecated된 클래스이다. 그 이유에 대해서는 주석에 자세히 설명되어있다. 그리고 아래에서 이 이유에 대해서 다시 살펴보자.

클래스의 API에 대해서 정리하면 아래와 같다.

  • Observer는 자신의 받을 아이템의 소스에게 자신을 등록한다. 그래서 Observable과 Observer은 일대다의 관계를 가질 수 있다.
  • 아이템 소스인 Observable은 자신에게 어떤 변화가 생길 때마다 setChanged() 호출하여 Observer들이 변화가 생겼다는 사실을 hasChanged()를 통해서 알 수 있게하고 이후에 notifyObservers()를 통하여 자신에게 등록한 Observer들에게 해당 아이템을 준다.

간단한 예제를 하나 살펴보자.

public class IntegerObservable extends Observable implements Runnable {
	@Override
	public void run() {
		for (int i = 1; i <=10; i++) {
			setChanged();
			notifyObservers(i); // push data to observer
		}
	}
}
public class ObserverApplication {
	public static void main(String[] args) {
        Observer ob = new Observer() {
                @Override
            public void update(Observable o, Object arg) {
                System.out.println(arg);
            }
        };
        IntegerObservable io = new IntegerObservable();
        // register observer to observable
        io.addObserver(ob);

        ExecutorService es = Executors.newSingleThreadExecutor();
        // run observable from another thread then push item to observer
        es.execute(io);
        es.shutdown();
    }
}

Iterator 패턴과 비교해서 살펴보자. 우선 Iterator 패턴과 달리 소스로부터 아이템을 pulling하는 것이 아닌 아이템 소스로부터 해당 아이템을 핸들링하는 Observer에게 아이템을 push하는 방식이기 때문에 아이템 소스가 될 IntegerObservable 클래스를 정의한다.

Observer는  IntegerObservable 로부터 아이템이 오기를 기다려야하므로 두 개의 객체가 동시에 메인 스레드에 뜰 수가 없다. 그래서 IntegerObservable 을 별도의 스레드에서 동작시킨다. Iterable 파트에서는 아이템을 가져와서 핸들링하면되기 때문에 아이템의 소스를 메인 스레드에 정의만 하면 될뿐 별도의 스레드를 띄울 필요는 없었다.

Observer 패턴에서는 for 루프가 Observable에서 정의되어있다. 이것도 Iterable과 아이템을 주는 부분과 핸들링하는 부분으로 나누어서 비교해보면 정반대인 것을 알 수 있다.

Observer 패턴이 가지는 장점

Observer 패턴이 Iterator 패턴과 duality를 가졌다는 사실은 이제 어느 정도 느낌이 온 것 같은데 Observer 패턴이 Iterator 패턴에 비해서 가지는 장점은 어떤 것이 있을까?

둘에 대해서 잘 생각해보면 Iterator 패턴에서 Iterable은 핸들링할 아이템들이 런타임 때 정해진다. 그리고 한 번 Iteratable을 사용하고 다음 코드 플로우로 넘어간다.

반면에 Observable은 어떤가? Iterable에 비해서 훨씬 더 다이나믹하게 아이템의 소스를 구현할 수 있다. 위의 예제에서는 단순히 정해진 사이즈의 for 루프를 한번 돌면 끝이지만 예를 들어 어떤 다른 서버로부터 데이터를 폴링하는 방식으로 Observable이 구현되면 어떨까? 외부 서버의 데이터가 변할 때마다 Observable에 등록된 Observer는 해당 변화를 감지할 수 있다.

또 다른 점은 Iterator 패턴에서 Iterable의 아이템을 수신할 대상이 한 순간에 한 개의 핸들러에서밖에 해당 아이템을 다룰 수 없는 반면에 Observable에서는 미리 여러개의 Observer를 등록해놓고 아이템을 여러 개의 핸들러에 보낼 수 있다는 장점도 있다.

Reactive 시스템의 등장

Reactive Stream에 대해서 알아보기 전에 ‘시스템이 reactive하다.’ 혹은 ‘reactive 시스템이다.’ 라는 것은 어떤 뜻을 가지고 있는지에 대해서 알아보자. Reactive 시스템은 The Reactive Manifesto에 잘 소개되었다. 마지막으로 수정된 날짜는 2014년으로 나와있다.

Reactive 시스템이 등장한 배경에 대해서 간단히 정리하면 다음과 같다.

당시에는 큰 서비스라고하면 몇 십개의 서버로 구성되어있고 몇 초(seconds)의 응답시간을 가져도 괜찮았다. 또한 점검, 배포를 위해 몇 시간 동안 다운되어있어도 괜찮았고 데이터의 사이즈라하면 GB 단위의 규모였다.

하지만 시간이 지나면서 (문서가 작성된 날짜가 2014년인 것을 기억하자.) 어플리케이션들은 모바일 기기에 배포되기도 하고 수천 개의 프로세서를 가진 클라우드 환경에 된다. 그리고 이제 서비스 사용자들은 milliseconds 단위의 응답 시간이 아니면 답답함을 느끼고 하루 종일 서비스를 사용할 수 있기를 기대한다. 또한 이제 서비스가 다루는 데이터의 규모는 PB에 이른다.

이러한 서비스의 규모가 급변하면서 전통적인 소프트웨어 아키텍쳐로는 이러한 요구사항을 충족시키지 못한다고 생각하게 되었다.

그래서 이러한 요구사항을 만족시키기 위해서 reactive 시스템이 등장하게 된다. 그리고 다음과 같은 아키텍쳐 design principle을 가진다.

  • Responsive: responsiveness는 서비스의 사용성의 가장 기본이 된다. 그리고 이 특징은 서비스의 사용성뿐만 아니라 문제가 발생했을 때 특정한 시간 안에 문제를 발견하고 해결할 수 있게 만든다. 그 결과 사용자에게 항상 일정한 수준의 서비스 퀄리티를 보장할 수 있게 한다.
  • Resilient: reactive 시스템은 내부적으로 에러가 발생하더라도 여전히 최종 사용자가 해당 서비스를 사용하는데 문제가 없어야 한다. 이러한 특징은 시스템을 구성하는 컴포넌트 중 하나가 실패했을 때 다른 컴포넌트로 에러를 전파하지 않음으로써 만족시킬 수 있고 실패한 컴포넌트에 대한 복구는 사전에 여러개의 레플리카를 둠으로써 하나가 실패하더라도 전체 시스템에는 영향이 가지않도록 설계한다.
  • Elastic: 전체 시스템은 외부의 다양한 부하에 대해서 유연하게 대처할 수 있어야한다. reactive 시스템은 부하에 따라서 동적으로 서비스에 리소스 할당량을 늘리거나 줄일 수 있다. 그래서 최종 사용자는 항상 일정한 수준의 서비스 수준을 제공받을 수 있다.
  • Message Driven: reactive 시스템은 비동기적으로 message-passing 방식을 통해서 컴포넌트 간의 boundary를 유지하고 결합을 느슨하게 만든다. message-passing 방식을 통해 다음의 목적을 달성할 수 있다: delegate failures as messages, back-pressure, non-blocking communication

Message Driven Reactive System

위 네 가지 특정 중 Message Driven에 대해서 좀 더 살펴보자. Message Driven의 방식을 통해서 delegate failures as messages, back-pressure, non-blocking communication이라는 목적을 달성할 수 있다고 했는데 각각은 어떤 것을 말하는 것일까?

Delegate failures as messages

시스템에서 메세지의 스트림을 처리하다보면 예기치않게 에러가 발생할 수 있다. 이 때 메세지를 처리하는 쪽에서 메세지 스트림 처리를 중단하고 에러를 핸들링하러 플로우가 넘어가는 것은 바람직하지 않은 방식이다.

여기서 말하는 에러 핸들링의 바람직한 방식은 에러가 발생했을 때 메세지 스트림을 멈추는 것이 아니라 에러를 또하나의 메세지로 생각하고 이를 에러 핸들링하는 쪽으로 전달하는 것이다. 그렇게 된다면 나머지 메세지는 계속 처리하고 발생한 에러에 대해서는 에러 핸들링을 하는 쪽에서 알아서 처리하도록 위임하는 것이다.

Back-pressure

Back-pressure는 시스템을 구성하는 컴포넌트들 간에 자신의 상황을 주고받을 수 있는 피드백 시스템으로 생각할 수 있다.

이것이 필요한 상황은 여러가지가 있을 수 있는데 그 중 한 가지는 전체 시스템 중 특정 부분이 부하를 과도하게 받을 때 부하를 받는 쪽에서 상대방에게 나의 상황을 전달하고 반대쪽에서는 그 상황에 맞춰서 더 적은 양의 데이터를 전달해서 부하를 줄일 수 있도록 도와주어야 한다. 반대의 상황도 마찬가지이다. 한쪽에서 자신의 리소스가 여유있는 경우 반대쪽에 더 많은 양의 메세지를 달라고 요청할 수도 있다.

Non-blocking communication

마지막으로 message-passing 방식은 Non-blocking 방식으로 이루어진다. Non-blocking 방식을 이용하여 메세지를 전달하면 보낸 입장에서는 상대방으로부터 응답을 듣지 않아도 되고 상대방은 메세지를 보관하다가 처리할 순서가되면 그 때 해당 메세지를 핸들링하면 되기 때문에 전체 오버헤드가 줄어든다는 장점이 있다.

Reactive Streams

이러한 reactive 시스템을 만들기 위해 Reactive Streams라는 표준 API를 만들게 된다. Reactive Streams에 대한 소개 문서는 여기에서 살펴볼 수 있다. 여기서 명시한 API를 통해서 ReactiveXProject Reactor 그룹들이 표준 API를 준수하면서 구현체를 만들어 제공하고 있다. 

자세한 해당 API의 인터페이스와 스펙들은 여기서 살펴볼 수 있다. 해당 스펙을 다이어그램으로 표현하면 아래와 같다.

Reactive Streams API diagram
Reactive Streams API diagram
  • Subscriber는 Publisher에게 subscribe(Subscriber) 를 통해서 이벤트 구독을 시작한다.
  • Publisher는 Subscriber에게 Subscription 객체를 만들거나 관리하고 있는 Subscription 객체를 onSubscribe(Subscription) 메소드를 통해 전달한다. Subscription을 통해서 Subscriber는 Publisher와 직접적으로 통신할 필요가 없어진다.
    Subscription은 Subscriber로부터 전달받는 피드백을 통해서 Publisher로부터 아이템을 가져오고 그것을 Subscriber에게 전달한다. Publisher와 Subscriber는 둘 사이를 직접적으로 몰라도된다.
  • 정상적인 경우라면 Subscriber의 onNext(Object) 메소드를 호출해서 아이템을 전달한다. 그리고 만약 Publisher 자신이 가지고 있는 아이템을 모두 전달했다면 onComplete() 를 호출해서 Subscriber에게 그 사실을 알려준다.
    마지막으로 에러가 발생했을 경우 onError(Throwable) 를 통해서 문제가 발생했다는 사실을 알려준다.

글로만 보면 잘 이해가 되지 않을 수 있으니 간단한 예제 코드를 살펴보자.

public static void main(String[] args) {
	Publisher pub = new Publisher() {
    	@Override
        public void subscribe(Subscriber subscriber) { ... }        
    }

    Subscriber<Integer> sub = new Subscriber<>() {
    	@Override
        public void onSubscribe(Subscription subscription) { ... }
        @Override
        public void onNext(Integer item) { ... }
        @Override
        public void onError(Throwable throwable) { ... }
        @Override
        public void onComplete() { ... }
    }

    pub.subscribe(sub);
}
  • 전체적인 구조는 위와 같다. 각각 Publisher, Subscriber의 구현체를 구현하고 pub.subscribe(sub) 을 통해 Subscriber가 Publisher가 주는 아이템을 구독하는 형태이다.
  • 이제 각각의 메소드를 살펴보면서 어떻게 위의 다이어그램처럼 동작하는지 비교해보자.
Publisher pub = new Publisher() {
    Iterable<Integer> it = Arrays.asList(1, 2, 3, 4, 5);
    @Override
    public void subscribe(Subscriber subscriber) {
        ExecutorService es = Executors.newSingleThreadScheduledExecutor();
        Iterator<Integer> iterator = it.iterator();

        subscriber.onSubscribe(new Subscription() {
            Future<?> f;
            @Override
            public void request(long n) {
                this.f = es.submit(() -> {
                    long left = n;
                    try {
                        while (left > 0) {
                            if (!iterator.hasNext()) {
                                subscriber.onComplete();
                                es.shutdown();
                                break;
                            }
                            subscriber.onNext(iterator.next());
                            left -= 1;
                        }
                    } catch (Exception e) {
                        subscriber.onError(e);
                    }
                });
            }
            @Override
            public void cancel() {
                f.cancel(true);
            }
        });
    }
};
  • Subscriber가 subscribe() 를 통해 구독을 시작하면 Publisher는 자신과 Subscriber을 연결해줄 수 있는 중간 객체인 Subscription 객체를 만들어서 Subscriber에게 전달한다.
  • 이후에 Subscriber는 Subscription을 통해서 Publisher에게 아이템을 달라고 back-pressure를 할 수 있다.
  • Subscription은 Reactive Streams API를 보면 request(long)cancel() 을 구현해야한다. 이는 둘다 Subscriber를 위한 인터페이스이며 Subscriber는 자신의 필요에 따라 아이템을 요청할 수도 있고 구독을 취소할 수 있다. 이와 같이 Reactive Streams는 마냥 push 모델이 아니라 필요에 따라 pulling 모델도 같이 적용하고 있다.
  • Subscription 내부에서는 요청이 들어오면 Publisher의 아이템을 가져와서 Subscriber의 onNext(), onError() 를 통해 아이템을 넘겨주거나 자신의 상황을 메세지의 형태로 알려준다.
  • 마지막으로 Publisher가 자신이 가진 아이템을 모두 넘겨주었을 때 onComplete() 을 통해서 스트림이 끝났음을 알려준다.
Subscriber<Integer> sub = new Subscriber<>() {
    final static int MAX_BUFFER_SIZE = 2;
    
    Subscription subscription;
    List<Integer> buf = new ArrayList<>();

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(MAX_BUFFER_SIZE);
    }

    @Override
    public void onNext(Integer item) {
        buf.add(item);
        if (buf.size() >= MAX_BUFFER_SIZE) {
            buf = new ArrayList<>();
            this.subscription.request(MAX_BUFFER_SIZE);
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(throwable);
    }

    @Override
    public void onComplete() {
        buf = new ArrayList<>();
    }
};
  • Subscriber는 구독을 시작하고 onSubscribe() 를 통해서 Subscription 객체를 넘겨 받는다. 이후 Subscription의 request() 를 통해서 아이템을 요청한다.
  • onNext()를 통해서는 자신이 구독한 아이템이 넘어오게되고 자신의 상황에 맞춰서 다음 번에 받을 아이템의 양을 조절한다.

Reactive 시스템과 비교

여기서 설명한 스펙들을 위의 reactive 시스템의 특징과 비교해보면 몇 가지의 궁금증은 풀린다.

  • Delegate failures as messages: 아이템을 공급해주는 Publisher에 문제가 생겼을 경우 자신에게 발생한 예외를 Subscriber에게 onError(Throwable) 이라는 메소드를 통해 그 사실을 알리고 해당 에러를 적절하게 핸들링 하도록 위임하고 있다. 
    또한 중요한 것은 에러 핸들링을 위임하고 Publisher 본인은 스트림의 아이템 전달을 멈추지 않는다는 것이다.
  • Back-pressure: 위의 스펙을 자세히 살펴보면 Subscriber가 Subscription에게 request(long) 메소드를 호출할 수 있다. Reactive Streams의 공식문서를 살펴보면 아래와 같이 설명되어있다.

    While the Subscription is not cancelled, Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.

    그러니까 Subscriber가 상황에 따라서 아이템을 더 달라고 요청할 수 있는 일종의 피드백 시스템인 것이다.

Observer 패턴과 비교

ReactiveX의 문서를 살펴보면 자신들은 GoF의 Observer 패턴을 확장하여 RxJava를 만들었다고한다. 위에서 살펴본 스펙과 비교하면 확실히 다르다. 어떤 점이 확실히 다를까?

  • 우선 데이터를 제공해주는 쪽에서 자신이 가지고 있는 아이템을 모두 전달하였을 때 그 사실을 Observer에 전달할 수 있는 방법이 없다.
  • Observable 쪽에서 에러가 발생했을 때 그 사실을 Observer에 전달할 방법이 없다.

위의 두 가지에 대해서는 Observable 쪽에서 Observer들에게 다양한 이벤트를 전달하지 못한다는 문제점을 가진다고 정리할 수 있을 것 같다. 그리고 이러한 문제점은 Java JDK의 Observer, Observable을 deprecate하는 원인이 되기도 한다.

The classes java.util.Observer and java.util.Observable should be deprecated. Applications should no longer use them.

They don’t provide a rich enough event model for applications. For example, they support only the notion that something has changed, but they don’t convey any information about what has changed. There are also some thread-safety and sequencing issues that cannot be fixed compatibly.

출처: https://bugs.openjdk.java.net/browse/JDK-8154801
  • 또한 위에 back-pressure을 설명할 때 잠깐 이야기했었는데 Observer 패턴과 달리 Observable이 마냥 아이템을 push 해주는 것이 아니라 Observable에게 피드백을 해줄 수 있는 채널이 존재한다. Observable은 해당 채널을 통해서 Observer에게 전달하는 아이템을 조절할 수 있다.

Reactive Streams 구현체 – Flow, RxJava, Reactor

마지막으로 Reactive Streams를 구현한 구현체들에 대해서 조금 정리하고 글을 마무리하려고 한다.

Reactive Streams를 발견하고 이제 그것을 실제 코드로 옮기려고 할 때 쓸 수 있는 라이브러리들이 여러 가지가 있다. 대표적인 것들 중에는 Flow, RxJava, Reactor 등이 있다. 세 가지 모두 Reactive Streams에서 명시한 API와 스펙에 맞춰서 구현을 해놓았다. 그렇다면 이것들은 어떤 차이점이 있을까?

시간순으로만 보자면 ReactiveX팀에서 여러 프로그래밍 언어에 대해서 RxJava를 비롯한 RxJs 등이 등장하였다. 그리고 RxJava2를 Reactive Streams API 표준에 맞춰 구현하고 있을 때 Rx 2 Spring 팀에서 Reactor을 구현하고 있었고 백엔드 단에서 더욱 유용하게 쓰일 수 있게 Reactive Streams API를 포함한 여러 API들을 구현하였다.

그리고 비슷한 시점에 JDK 개발자들도 Reactive Stream의 구현체를 개발하여 JDK에 포함시키기로 하였고 이 때 등장한 것이 Flow이다.

RxJava의 경우는 Java8 버전 이전 버전도 지원이 된다. 그래서 안드로이드 개발쪽에서 많이 쓰이고 있고 Reactor의 경우는 Spring WebFlux에서 사용되고 있기 때문에 Spring Cloud와 같은 것을 이용할 때 보통 Reactor를 쓰게 된다.

마치며

이 글은 본격적으로 Spring WebFlux를 깊이있게 다루기 전에 가장 기본이 되는 내용에 대해서 다루었다. Java Reactive에서 공부를 하면서 비슷비슷한 용어들이 우후죽순 나와서 너무나도 헷갈렸다. 이 글을 쓰기 위해서 엄청나게 많은 글을 읽어보았고 그러면서 어느정도 스스로 정리가 된 것 같다.

그리고 사실 여기서 Reactive 시스템의 모든 것을 다룬 것은 아니다. 이 글에서는 Reactive 시스템의 Non-blocking communication에 대해서는 다루지 않았다. 이 부분도 자세히 다루려면 꽤나 챌린징한 일이라서 다른 포스팅에 따로 정리하려고 한다.

몇 가지 섹션을 더 추가하고 싶었는데 그러면 글이 너무 길어질까봐 아래에 섹션에 토픽들과 관련 문서를 나열하고 포스팅을 마무리해야겠다.

레퍼런스

답글 남기기

이메일 주소는 공개되지 않습니다.