Java 9 představila Flow API, které umožňuje reaktivní programování. V tomto příspěvku vytvořím aplikaci, která se bude skládat z jednoho publishera, který vytváří data a dvou subscriberů, kteří data zpracovávají. Jeden ze subscriberů je dostatečně rychlý a zvládá data zpracovávat rychleji, než je publisher produkuje, druhý subscriber je výrazně pomalejší.
Data.class
public class Data {
private final int number;
public Data(int number) {
this.number = number;
}
@Override
public String toString() {
return "Data{number = " + number + "}";
}
}
MyPublisher.class
import java.util.concurrent.Executor;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class MyPublisher extends SubmissionPublisher<Data> {
private final String name;
public MyPublisher(Executor executor, int maxBufferCapacity, String name) {
super(executor, maxBufferCapacity);
this.name = name;
}
public void start() {
IntStream.range(1, 10).forEach(i -> {
Data data = createData(i);
System.out.println(name + " created " + data);
offer(
data,
10,
TimeUnit.SECONDS,
(subscriber, msg) -> {
return false;
});
});
System.out.println(name + " finished");
}
private Data createData(int number) {
try {
Thread.sleep(1000);
return new Data(number);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
MySubscriber.class
import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber<Data> {
private Flow.Subscription subscription;
private final long waiting;
private final String name;
public MySubscriber(String name, long waiting) {
this.waiting = waiting;
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(name + " onSubscribe");
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Data data) {
System.out.println(name + " got: " + data);
process();
System.out.println(name + " processed: " + data);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println(name + " error");
}
@Override
public void onComplete() {
System.out.println(name + " finished");
}
private void process() {
try {
Thread.sleep(waiting);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Application.class
import java.util.concurrent.ForkJoinPool;
public class Application {
public static void main(String[] args) {
MyPublisher publisher = new MyPublisher(ForkJoinPool.commonPool(), 10, "Publisher");
MySubscriber sub1 = new MySubscriber("Subscriber 1", 500);
MySubscriber sub2 = new MySubscriber("Subscriber 2", 2000);
publisher.subscribe(sub1);
publisher.subscribe(sub2);
publisher.start();
try {
// čekání, než se aplikace ukončí
Thread.sleep(20_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Výsledek
Subscriber 1 onSubscribe
Subscriber 2 onSubscribe
Publisher created Data{number = 1}
Subscriber 2 got: Data{number = 1}
Subscriber 1 got: Data{number = 1}
Subscriber 1 processed: Data{number = 1}
Publisher created Data{number = 2}
Subscriber 1 got: Data{number = 2}
Subscriber 1 processed: Data{number = 2}
Subscriber 2 processed: Data{number = 1}
Subscriber 2 got: Data{number = 2}
Publisher created Data{number = 3}
Subscriber 1 got: Data{number = 3}
Subscriber 1 processed: Data{number = 3}
Publisher created Data{number = 4}
Subscriber 1 got: Data{number = 4}
Subscriber 1 processed: Data{number = 4}
Subscriber 2 processed: Data{number = 2}
Subscriber 2 got: Data{number = 3}
Publisher created Data{number = 5}
Subscriber 1 got: Data{number = 5}
Subscriber 1 processed: Data{number = 5}
Publisher created Data{number = 6}
Subscriber 1 got: Data{number = 6}
Subscriber 1 processed: Data{number = 6}
Subscriber 2 processed: Data{number = 3}
Subscriber 2 got: Data{number = 4}
Publisher created Data{number = 7}
Subscriber 1 got: Data{number = 7}
Subscriber 1 processed: Data{number = 7}
Publisher created Data{number = 8}
Subscriber 1 got: Data{number = 8}
Subscriber 1 processed: Data{number = 8}
Subscriber 2 processed: Data{number = 4}
Subscriber 2 got: Data{number = 5}
Publisher created Data{number = 9}
Subscriber 1 got: Data{number = 9}
Publisher finished
Subscriber 1 processed: Data{number = 9}
Subscriber 2 processed: Data{number = 5}
Subscriber 2 got: Data{number = 6}
Subscriber 2 processed: Data{number = 6}
Subscriber 2 got: Data{number = 7}
Subscriber 2 processed: Data{number = 7}
Subscriber 2 got: Data{number = 8}
Subscriber 2 processed: Data{number = 8}
Subscriber 2 got: Data{number = 9}
Subscriber 2 processed: Data{number = 9}