Java 9 představila Flow API, které umožňuje reaktivní programování. V tomto příspěvku vytvořím aplikaci, která se bude skládat z jednoho publishera, který vytváří data a dvou subscriberů, kteří data zpracovávají. Jeden ze subscriberů je dostatečně rychlý a zvládá data zpracovávat rychleji, než je publisher produkuje, druhý subscriber je výrazně pomalejší.
Data.class
public class Data { private final int number; public Data(int number) { this.number = number; } @Override public String toString() { return "Data{number = " + number + "}"; } }
MyPublisher.class
import java.util.concurrent.Executor; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; public class MyPublisher extends SubmissionPublisher<Data> { private final String name; public MyPublisher(Executor executor, int maxBufferCapacity, String name) { super(executor, maxBufferCapacity); this.name = name; } public void start() { IntStream.range(1, 10).forEach(i -> { Data data = createData(i); System.out.println(name + " created " + data); offer( data, 10, TimeUnit.SECONDS, (subscriber, msg) -> { return false; }); }); System.out.println(name + " finished"); } private Data createData(int number) { try { Thread.sleep(1000); return new Data(number); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
MySubscriber.class
import java.util.concurrent.Flow; public class MySubscriber implements Flow.Subscriber<Data> { private Flow.Subscription subscription; private final long waiting; private final String name; public MySubscriber(String name, long waiting) { this.waiting = waiting; this.name = name; } @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println(name + " onSubscribe"); this.subscription = subscription; subscription.request(1); } @Override public void onNext(Data data) { System.out.println(name + " got: " + data); process(); System.out.println(name + " processed: " + data); subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println(name + " error"); } @Override public void onComplete() { System.out.println(name + " finished"); } private void process() { try { Thread.sleep(waiting); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
Application.class
import java.util.concurrent.ForkJoinPool; public class Application { public static void main(String[] args) { MyPublisher publisher = new MyPublisher(ForkJoinPool.commonPool(), 10, "Publisher"); MySubscriber sub1 = new MySubscriber("Subscriber 1", 500); MySubscriber sub2 = new MySubscriber("Subscriber 2", 2000); publisher.subscribe(sub1); publisher.subscribe(sub2); publisher.start(); try { // čekání, než se aplikace ukončí Thread.sleep(20_000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
Výsledek
Subscriber 1 onSubscribe
Subscriber 2 onSubscribe
Publisher created Data{number = 1}
Subscriber 2 got: Data{number = 1}
Subscriber 1 got: Data{number = 1}
Subscriber 1 processed: Data{number = 1}
Publisher created Data{number = 2}
Subscriber 1 got: Data{number = 2}
Subscriber 1 processed: Data{number = 2}
Subscriber 2 processed: Data{number = 1}
Subscriber 2 got: Data{number = 2}
Publisher created Data{number = 3}
Subscriber 1 got: Data{number = 3}
Subscriber 1 processed: Data{number = 3}
Publisher created Data{number = 4}
Subscriber 1 got: Data{number = 4}
Subscriber 1 processed: Data{number = 4}
Subscriber 2 processed: Data{number = 2}
Subscriber 2 got: Data{number = 3}
Publisher created Data{number = 5}
Subscriber 1 got: Data{number = 5}
Subscriber 1 processed: Data{number = 5}
Publisher created Data{number = 6}
Subscriber 1 got: Data{number = 6}
Subscriber 1 processed: Data{number = 6}
Subscriber 2 processed: Data{number = 3}
Subscriber 2 got: Data{number = 4}
Publisher created Data{number = 7}
Subscriber 1 got: Data{number = 7}
Subscriber 1 processed: Data{number = 7}
Publisher created Data{number = 8}
Subscriber 1 got: Data{number = 8}
Subscriber 1 processed: Data{number = 8}
Subscriber 2 processed: Data{number = 4}
Subscriber 2 got: Data{number = 5}
Publisher created Data{number = 9}
Subscriber 1 got: Data{number = 9}
Publisher finished
Subscriber 1 processed: Data{number = 9}
Subscriber 2 processed: Data{number = 5}
Subscriber 2 got: Data{number = 6}
Subscriber 2 processed: Data{number = 6}
Subscriber 2 got: Data{number = 7}
Subscriber 2 processed: Data{number = 7}
Subscriber 2 got: Data{number = 8}
Subscriber 2 processed: Data{number = 8}
Subscriber 2 got: Data{number = 9}
Subscriber 2 processed: Data{number = 9}