public class Streams extends Object
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure on the JVM.
Ratpack uses the Reactive Streams API when consuming streams of data (e.g Response.sendStream(Publisher)
).
This class provides some useful reactive utilities that integrate other parts of the Ratpack API with Reactive Stream types. It is not designed to be a fully featured reactive toolkit. If you require more features than provided here, consider using Ratpack's RxJava or Reactor integration.
The methods in this class are available as Groovy Extensions. When using Groovy, applications can utilize the static methods provided in this class as instance-level methods against the first argument in their variable argument list.
Constructor and Description |
---|
Streams() |
Modifier and Type | Method and Description |
---|---|
static <T> TransformablePublisher<T> |
bindExec(Publisher<T> publisher) |
static <T> TransformablePublisher<T> |
buffer(Publisher<T> publisher)
Returns a publisher that allows the given publisher to without respecting demand.
|
static <T> TransformablePublisher<T> |
constant(T item)
Creates a new publisher, that indefinitely streams the given object to all subscribers.
|
static <T> TransformablePublisher<T> |
fanOut(Publisher<? extends Iterable<? extends T>> publisher)
Returns a publisher that publishes each element from Collections that are produced from the given input publisher.
|
static <T> TransformablePublisher<T> |
filter(Publisher<T> input,
Predicate<? super T> filter)
Returns a publisher that filters items from the given input stream by applying the given filter predicate.
|
static <I,O> TransformablePublisher<O> |
flatMap(Publisher<I> input,
Function<? super I,? extends Promise<? extends O>> function)
Returns a publisher that publishes items from the given input publisher after transforming each item via the given, promise returning, function.
|
static <T> TransformablePublisher<T> |
flatYield(Function<? super YieldRequest,? extends Promise<T>> producer)
Creates a new publisher, backed by the given asynchronous data producing function.
|
static <T> TransformablePublisher<T> |
gate(Publisher<T> publisher,
Action<? super Runnable> valveReceiver)
Allows requests from the subscriber of the return publisher to be withheld from the given publisher until an externally defined moment.
|
static <I,O> TransformablePublisher<O> |
map(Publisher<I> input,
Function<? super I,? extends O> function)
Returns a publisher that publishes items from the given input publisher after transforming each item via the given function.
|
static <T> TransformablePublisher<T> |
merge(Publisher<? extends T>... publishers)
Returns a publisher that merges the given input publishers into a single stream of elements.
|
static <T> TransformablePublisher<T> |
multicast(Publisher<T> publisher)
Returns a publisher that will stream events emitted from the given publisher to all of its subscribers.
|
static <T> TransformablePublisher<T> |
periodically(Registry registry,
Duration duration,
Function<? super Integer,? extends T> producer) |
static <T> TransformablePublisher<T> |
periodically(ScheduledExecutorService executorService,
Duration duration,
Function<? super Integer,? extends T> producer)
Executes the given function periodically, publishing the return value to the subscriber.
|
static <T> TransformablePublisher<T> |
publish(Iterable<T> iterable)
Converts an iterable to a publishable.
|
static <T> TransformablePublisher<T> |
publish(Promise<? extends Iterable<T>> promise)
Converts a
Promise for an iterable into a publishable. |
static <T,O> TransformablePublisher<O> |
streamMap(Publisher<T> input,
Function<? super WriteStream<O>,? extends WriteStream<? super T>> mapper)
Allows transforming a stream into an entirely different stream.
|
static <T> Promise<List<T>> |
toList(Publisher<T> publisher)
Creates a promise for the given publisher's items as a List.
|
static <T> Promise<T> |
toPromise(Publisher<T> publisher)
Creates a promise for the given publisher's single item.
|
static <T> TransformablePublisher<T> |
transformable(Publisher<T> publisher)
Wraps the publisher in Ratpack's
TransformablePublisher to make composing a pipeline easier. |
static <T> TransformablePublisher<T> |
wiretap(Publisher<T> publisher,
Action<? super StreamEvent<? super T>> listener)
Allows listening to the events of the given publisher as they flow to subscribers.
|
static <T> TransformablePublisher<T> |
yield(Function<? super YieldRequest,? extends T> producer)
Creates a new publisher, backed by the given data producing function.
|
public static <T> TransformablePublisher<T> transformable(Publisher<T> publisher)
TransformablePublisher
to make composing a pipeline easier.
The return publisher is effectively the same publisher in terms of the Publisher.subscribe(org.reactivestreams.Subscriber)
method.
T
- the type of item the publisher emitspublisher
- the publisher to wrappublic static <T> TransformablePublisher<T> publish(Iterable<T> iterable)
Upon subscription, a new iterator will be created from the iterable. Values are pulled from the iterator in accordance with the requests from the subscriber.
Any exception thrown by the iterable/iterator will be forwarded to the subscriber.
T
- the type of item emittediterable
- the data sourcepublic static <T> TransformablePublisher<T> publish(Promise<? extends Iterable<T>> promise)
Promise
for an iterable into a publishable.
Upon subscription the promise will be consumed and the promised iterable will be emitted to the subscriber one element at a time.
Any exception thrown by the the promise will be forwarded to the subscriber.
T
- the element type of the promised iterablepromise
- the promisepublic static <T> TransformablePublisher<T> yield(Function<? super YieldRequest,? extends T> producer)
As subscribers request data of the returned stream, the given function is invoked.
The function returns the item to send downstream.
If the function returns null
, the stream is terminated.
If the function throws an exception, the stream is terminated and the error is sent downstream.
import ratpack.stream.Streams;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
public class Example {
public static void main(String... args) throws Exception {
List<String> strings = ExecHarness.yieldSingle(execControl ->
Streams.yield(r -> {
if (r.getRequestNum() < 2) {
return Long.toString(r.getRequestNum());
} else {
return null;
}
}).toList()
).getValue();
assertEquals(Arrays.asList("0", "1"), strings);
}
}
If the value producing function is asynchronous, use flatYield(Function)
.
T
- the type of item emittedproducer
- the data sourceflatYield(ratpack.func.Function<? super ratpack.stream.YieldRequest, ? extends ratpack.exec.Promise<T>>)
public static <T> TransformablePublisher<T> flatYield(Function<? super YieldRequest,? extends Promise<T>> producer)
As subscribers request data of the returned stream, the given function is invoked.
The function returns a promise for the item to send downstream.
If the promise provides a value of null
, the stream is terminated.
If the promise produces an error, the stream is terminated and the error is sent downstream.
If the promise producing function throws an exception, the stream is terminated and the error is sent downstream.
import ratpack.stream.Streams;
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
public class Example {
public static void main(String... args) throws Exception {
List<String> strings = ExecHarness.yieldSingle(execControl ->
Streams.flatYield(r -> {
if (r.getRequestNum() < 2) {
return Promise.value(Long.toString(r.getRequestNum()));
} else {
return Promise.value(null);
}
}).toList()
).getValue();
assertEquals(Arrays.asList("0", "1"), strings);
}
}
If the value producing function is not asynchronous, use yield(Function)
.
T
- the type of item emittedproducer
- the data sourceyield(ratpack.func.Function<? super ratpack.stream.YieldRequest, ? extends T>)
public static <T> TransformablePublisher<T> constant(T item)
This is rarely useful for anything other than testing.
T
- the type of item emitteditem
- the item to indefinitely streampublic static <I,O> TransformablePublisher<O> map(Publisher<I> input, Function<? super I,? extends O> function)
The returned publisher does not perform any flow control on the data stream.
If the given transformation errors, the exception will be forwarded to the subscriber and the subscription to the input stream will be cancelled.
I
- the type of input itemO
- the type of output iteminput
- the stream of input datafunction
- the transformationpublic static <T> TransformablePublisher<T> filter(Publisher<T> input, Predicate<? super T> filter)
The returned stream is buffered
, which means that if the downstream requests, say 5 items, which is filtered into only 3 items
the publisher will ask for more from the upstream to meet the downstream demand.
import org.reactivestreams.Publisher;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
public class Example {
public static void main(String... args) throws Exception {
List<Integer> result = ExecHarness.yieldSingle(execControl -> {
TransformablePublisher<Integer> evens = Streams.publish(Arrays.asList(1, 2, 3, 4, 5, 6)).filter(i -> i % 2 == 0);
return evens.toList();
}).getValue();
assertEquals(Arrays.asList(2, 4, 6), result);
}
}
T
- the type of item emittedinput
- the stream to filterfilter
- the filter predicatepublic static <T,O> TransformablePublisher<O> streamMap(Publisher<T> input, Function<? super WriteStream<O>,? extends WriteStream<? super T>> mapper)
While the map(Publisher, Function)
method support transforming individual items, this method supports transforming the stream as a whole.
This is necessary when the transformation causes a different number of items to be emitted than the original stream.
import org.reactivestreams.Publisher;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.WriteStream;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
public class Example {
public static void main(String... args) throws Exception {
List<String> result = ExecHarness.yieldSingle(execControl -> {
Publisher<String> chars = Streams.publish(Arrays.asList("a", "b", "c"));
TransformablePublisher<String> mapped = Streams.streamMap(chars, out ->
new WriteStream<String>() {
public void item(String item) {
out.item(item);
out.item(item.toUpperCase());
}
public void error(Throwable error) {
out.error(error);
}
public void complete() {
out.complete();
}
}
);
return mapped.toList();
}).getValue();
assertEquals(Arrays.asList("a", "A", "b", "B", "c", "C"), result);
}
}
The mapper
function receives a WriteStream
for emitting items and returns a WriteStream
that will be written to by the upstream publisher.
Calling WriteStream.complete()
or WriteStream.error(Throwable)
on the received write stream will cancel
the upstream subscription and it is guaranteed that no more items will be emitted from the upstream.
If the complete/error signals from upstream don't need to be intercepted, the WriteStream.itemMap(Action)
can be used on the write stream given to the mapping function to create the return write stream.
The returned stream is buffered
, which allows the stream transformation to emit more items downstream than what was received from the upstream.
Currently, the upstream subscription will always receive a single request
for Long.MAX_VALUE
, effectively asking upstream to emit as fast as it can.
Future versions may propagate backpressure more intelligently.
T
- the type of item receivedO
- the type of item producedinput
- the stream to mapmapper
- the mapping functionpublic static <I,O> TransformablePublisher<O> flatMap(Publisher<I> input, Function<? super I,? extends Promise<? extends O>> function)
The returned publisher does not perform any flow control on the data stream.
If the given transformation errors, or if the returned promise fails, the exception will be forwarded to the subscriber and the subscription to the input stream will be cancelled.
I
- the type of input itemO
- the type of output iteminput
- the stream of input datafunction
- the transformationpublic static <T> TransformablePublisher<T> buffer(Publisher<T> publisher)
The given publisher may violate the Reactive Streams contract in that it may emit more items than have been requested. Any excess will be buffered until there is more demand. All requests for items from the subscriber will be satisfied from the buffer first. If a request is made at any time for more items than are currently in the buffer, a request for the unmet demand will be made of the given publisher.
If the given producer emits far faster than the downstream subscriber requests, the intermediate queue will grow large and consume memory.
T
- the type of itempublisher
- a data sourcepublic static <T> TransformablePublisher<T> gate(Publisher<T> publisher, Action<? super Runnable> valveReceiver)
When the return publisher is subscribed to, the given publisher will be subscribed to. All requests made by the subscriber of the return publisher will not be forwarded to the subscription of the given publisher until the runnable given to the given action is run. Once the runnable is run, all requests are directly forwarded to the subscription of the given publisher.
The return publisher supports multi subscription, creating a new subscription to the given publisher each time. The given action will be invoke each time the return publisher is subscribed to with a distinct runnable for releasing the gate for that subscription.
The given action will be invoked immediately upon subscription of the return publisher. The runnable given to this action may be invoked any time (i.e. it does not need to be run during the action). If the action errors, the given publisher will not be subscribed to and the error will be sent to the return publisher subscriber.
T
- the type of item emittedpublisher
- the data sourcevalveReceiver
- an action that receives a runnable “valve” that when run allows request to start flowing upstreampublic static <T> TransformablePublisher<T> periodically(ScheduledExecutorService executorService, Duration duration, Function<? super Integer,? extends T> producer)
When the return publisher is subscribed to, the given function is executed immediately (via the executor) with 0
as the input.
The function will then be repeatedly executed again after the given delay (with an incrementing input) until the function returns null
.
That is, a return value from the function of null
signals that the data stream has finished.
The function will not be executed again after returning null
.
Each new subscription to the publisher will cause the function to be scheduled again. Due to this, it is generally desirable to wrap the return publisher in a multicasting publisher.
If the function throws an exception, the error will be sent to the subscribers and no more invocations of the function will occur.
The returned publisher is implicitly buffered to respect back pressure via buffer(Publisher)
.
T
- the type of itemexecutorService
- the executor service that will periodically execute the functionduration
- the duration of the delay (Duration.ofSeconds(1) - delay will be 1 second)producer
- a function that produces values to emitpublic static <T> TransformablePublisher<T> periodically(Registry registry, Duration duration, Function<? super Integer,? extends T> producer)
public static <T> TransformablePublisher<T> wiretap(Publisher<T> publisher, Action<? super StreamEvent<? super T>> listener)
When the return publisher is subscribed to, the given publisher will be subscribed to. All events (incl. data, error and completion) emitted by the given publisher will be forwarded to the given listener before being forward to the subscriber of the return publisher.
If the listener errors, the upstream subscription will be cancelled (if appropriate) and the error sent downstream.
If the listener errors while listening to an error event, the listener error will be added as a surpressed exception
to the original exception which will then be sent downstream.
T
- the type of item emittedpublisher
- the data sourcelistener
- the listener for emitted itemspublic static <T> TransformablePublisher<T> multicast(Publisher<T> publisher)
The return publisher allows the given publisher to emit as fast as it can, while applying flow control downstream to multiple subscribers. Each subscriber can signal its own demand. If the given publisher emits far faster than the downstream subscribers request, the intermediate queue of each subscriber will grow large and consume substantial memory. However, given this publisher is likely to be used with a periodic publisher or a regular indefinite stream it is unlikely to be a problem.
When a subscriber subscribes to the return publisher then it will not receive any events that have been emitted before it subscribed.
T
- the type of itempublisher
- a data sourcepublic static <T> TransformablePublisher<T> fanOut(Publisher<? extends Iterable<? extends T>> publisher)
For each item the return publisher receives from the given input publisher, the return publisher will iterate over its elements and publish a new item for each element to its downstream subscriber e.g. if the return publisher receives a Collection with 10 elements then the downstream subscriber will receive 10 calls to its onNext method.
The returned publisher is implicitly buffered to respect back pressure via buffer(Publisher)
.
T
- the type of item emittedpublisher
- the data source@SafeVarargs public static <T> TransformablePublisher<T> merge(Publisher<? extends T>... publishers)
The returned publisher obeys the following rules:
The returned publisher is implicitly buffered to respect back pressure via buffer(org.reactivestreams.Publisher)
.
T
- the type of item emittedpublishers
- the data sources to mergepublic static <T> Promise<T> toPromise(Publisher<T> publisher)
The given publisher is expected to produce zero or one items.
If it produces zero, the promised value will be null
.
The it produces exactly one item, the promised value will be that item.
If the stream produces more than one item, the promise will fail with an IllegalStateException
.
As soon as a second item is received, the subscription to the given publisher will be cancelled.
The single item is not provided to the promise subscriber until the stream completes, to ensure that it is indeed a one element stream. If the stream errors before sending a second item, the promise will fail with that error. If it fails after sending a second item, that error will be ignored.
T
- the type of promised valuepublisher
- the publiser the convert to a promisepublic static <T> Promise<List<T>> toList(Publisher<T> publisher)
T
- the type of item in the streampublisher
- the stream to collect to a listpublic static <T> TransformablePublisher<T> bindExec(Publisher<T> publisher)