Java 9 Flow.Subscriber and Flow.Publisher

Java 9 představila Flow API, které umožňuje reaktivní programování. V tomto příspěvku vytvořím aplikaci, která se bude skládat z jednoho publishera, který vytváří data a dvou subscriberů, kteří data zpracovávají. Jeden ze subscriberů je dostatečně rychlý a zvládá data zpracovávat rychleji, než je publisher produkuje, druhý subscriber je výrazně pomalejší.

Data.class

public class Data {

    private final int number;

    public Data(int number) {
        this.number = number;
    }

    @Override
    public String toString() {
        return "Data{number = " + number + "}";
    }
}

MyPublisher.class

import java.util.concurrent.Executor;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class MyPublisher extends SubmissionPublisher<Data> {

    private final String name;

    public MyPublisher(Executor executor, int maxBufferCapacity, String name) {
        super(executor, maxBufferCapacity);
        this.name = name;
    }

    public void start() {
        IntStream.range(1, 10).forEach(i -> {
            Data data = createData(i);
            System.out.println(name + " created " + data);
            offer(
                    data,
                    10,
                    TimeUnit.SECONDS,
                    (subscriber, msg) -> {
                        return false;
            });

        });
        System.out.println(name + " finished");
    }

    private Data createData(int number) {
        try {
            Thread.sleep(1000);
            return new Data(number);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

MySubscriber.class

import java.util.concurrent.Flow;

public class MySubscriber implements Flow.Subscriber<Data> {

    private Flow.Subscription subscription;
    private final long waiting;
    private final String name;

    public MySubscriber(String name, long waiting) {
        this.waiting = waiting;
        this.name = name;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println(name + " onSubscribe");
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(Data data) {
        System.out.println(name + " got: " + data);
        process();
        System.out.println(name + " processed: " + data);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(name + " error");
    }

    @Override
    public void onComplete() {
        System.out.println(name + " finished");
    }

    private void process() {
        try {
            Thread.sleep(waiting);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Application.class

import java.util.concurrent.ForkJoinPool;

public class Application {

    public static void main(String[] args) {
        MyPublisher publisher = new MyPublisher(ForkJoinPool.commonPool(), 10, "Publisher");
        MySubscriber sub1 = new MySubscriber("Subscriber 1", 500);
        MySubscriber sub2 = new MySubscriber("Subscriber 2", 2000);
        publisher.subscribe(sub1);
        publisher.subscribe(sub2);

        publisher.start();

        try {
            // čekání, než se aplikace ukončí
            Thread.sleep(20_000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Výsledek

Subscriber 1 onSubscribe
Subscriber 2 onSubscribe
Publisher created Data{number = 1}
Subscriber 2 got: Data{number = 1}
Subscriber 1 got: Data{number = 1}
Subscriber 1 processed: Data{number = 1}
Publisher created Data{number = 2}
Subscriber 1 got: Data{number = 2}
Subscriber 1 processed: Data{number = 2}
Subscriber 2 processed: Data{number = 1}
Subscriber 2 got: Data{number = 2}
Publisher created Data{number = 3}
Subscriber 1 got: Data{number = 3}
Subscriber 1 processed: Data{number = 3}
Publisher created Data{number = 4}
Subscriber 1 got: Data{number = 4}
Subscriber 1 processed: Data{number = 4}
Subscriber 2 processed: Data{number = 2}
Subscriber 2 got: Data{number = 3}
Publisher created Data{number = 5}
Subscriber 1 got: Data{number = 5}
Subscriber 1 processed: Data{number = 5}
Publisher created Data{number = 6}
Subscriber 1 got: Data{number = 6}
Subscriber 1 processed: Data{number = 6}
Subscriber 2 processed: Data{number = 3}
Subscriber 2 got: Data{number = 4}
Publisher created Data{number = 7}
Subscriber 1 got: Data{number = 7}
Subscriber 1 processed: Data{number = 7}
Publisher created Data{number = 8}
Subscriber 1 got: Data{number = 8}
Subscriber 1 processed: Data{number = 8}
Subscriber 2 processed: Data{number = 4}
Subscriber 2 got: Data{number = 5}
Publisher created Data{number = 9}
Subscriber 1 got: Data{number = 9}
Publisher finished
Subscriber 1 processed: Data{number = 9}
Subscriber 2 processed: Data{number = 5}
Subscriber 2 got: Data{number = 6}
Subscriber 2 processed: Data{number = 6}
Subscriber 2 got: Data{number = 7}
Subscriber 2 processed: Data{number = 7}
Subscriber 2 got: Data{number = 8}
Subscriber 2 processed: Data{number = 8}
Subscriber 2 got: Data{number = 9}
Subscriber 2 processed: Data{number = 9}

Napsat komentář