T
- the type of item emitted by this publisherpublic interface TransformablePublisher<T> extends Publisher<T>
Publisher
that makes it more convenient to chain transformations of different kinds.
Note that this type implements the publisher interface,
so behaves just like the publisher that it is wrapping with respect to the
Publisher.subscribe(Subscriber)
method.
Modifier and Type | Method and Description |
---|---|
default TransformablePublisher<T> |
batch(int batchSize,
Action<? super T> disposer)
|
default TransformablePublisher<T> |
bindExec()
|
default TransformablePublisher<T> |
bindExec(Action<? super T> disposer)
|
default TransformablePublisher<T> |
buffer()
|
default TransformablePublisher<T> |
filter(Predicate<? super T> filter)
|
default <O> TransformablePublisher<O> |
flatMap(Function<? super T,? extends Promise<? extends O>> function)
|
default TransformablePublisher<T> |
fork()
Consumes the given publisher eagerly in a forked execution, buffering results until ready to be consumed by this execution.
|
default TransformablePublisher<T> |
fork(Action<? super ExecSpec> execConfig,
Action<? super T> disposer)
Consumes the given publisher eagerly in a forked execution, buffering results until ready to be consumed by this execution.
|
default TransformablePublisher<T> |
gate(Action<? super java.lang.Runnable> valveReceiver)
|
default <O> TransformablePublisher<O> |
map(Function<? super T,? extends O> function)
|
default TransformablePublisher<T> |
multicast()
|
default <R> Promise<R> |
reduce(R seed,
BiFunction<? super R,? super T,? extends R> reducer)
Reduces the stream to a single value, by applying the given function successively.
|
default <O> TransformablePublisher<O> |
streamMap(Function<? super WriteStream<O>,? extends WriteStream<? super T>> function)
Deprecated.
since 1.4, use
streamMap(StreamMapper) |
default <O> TransformablePublisher<O> |
streamMap(StreamMapper<? super T,O> mapper)
|
default TransformablePublisher<T> |
take(long count)
|
default Promise<java.util.List<T>> |
toList()
Consumes the given publisher's items to a list.
|
default Promise<T> |
toPromise()
|
default <O> TransformablePublisher<O> |
transform(java.util.function.Function<? super TransformablePublisher<? extends T>,? extends Publisher<O>> transformer)
Convenience method to allow a non Ratpack publisher transform method to be hooked in.
|
default TransformablePublisher<T> |
wiretap(Action<? super StreamEvent<T>> listener)
|
default <O> TransformablePublisher<O> map(Function<? super T,? extends O> function)
O
- the type of transformed itemfunction
- the transformationdefault <O> TransformablePublisher<O> flatMap(Function<? super T,? extends Promise<? extends O>> function)
O
- the type of transformed itemfunction
- the transformationdefault TransformablePublisher<T> buffer()
default TransformablePublisher<T> gate(Action<? super java.lang.Runnable> valveReceiver)
valveReceiver
- an action that receives a runnable “valve” that when run allows request to start flowing upstreamdefault TransformablePublisher<T> wiretap(Action<? super StreamEvent<T>> listener)
listener
- the listener for emitted itemsdefault TransformablePublisher<T> multicast()
default Promise<java.util.List<T>> toList()
This method can be useful when testing, but should be uses with care in production code as it will exhaust memory if the stream is very large.
import org.reactivestreams.Publisher;
import ratpack.stream.Streams;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
public class Example {
public static void main(String... args) throws Exception {
List<String> expected = Arrays.asList("a", "b", "c");
List<String> result = ExecHarness.yieldSingle(execControl ->
Streams.publish(expected).toList()
).getValue();
assertEquals(Arrays.asList("a", "b", "c"), result);
}
}
If the publisher emits an error, the promise will fail and the collected items will be discarded.
import org.reactivestreams.Publisher;
import ratpack.stream.Streams;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.*;
public class Example {
public static void main(String... args) throws Exception {
Throwable error = ExecHarness.yieldSingle(execControl ->
Streams.yield(r -> {
if (r.getRequestNum() < 1) {
return "a";
} else {
throw new RuntimeException("bang!");
}
}).toList()
).getThrowable();
assertEquals("bang!", error.getMessage());
}
}
default <O> TransformablePublisher<O> transform(java.util.function.Function<? super TransformablePublisher<? extends T>,? extends Publisher<O>> transformer)
This transformable publisher will be given to the function, that should return a new publisher. The returned publisher will then be wrapped in a transformable wrapper which will be returned by this method.
O
- the type of transformed itemtransformer
- a publisher transformerdefault <O> TransformablePublisher<O> streamMap(StreamMapper<? super T,O> mapper)
O
- the type of transformed itemmapper
- the transformation@Deprecated default <O> TransformablePublisher<O> streamMap(Function<? super WriteStream<O>,? extends WriteStream<? super T>> function)
streamMap(StreamMapper)
default TransformablePublisher<T> filter(Predicate<? super T> filter)
filter
- the filterdefault TransformablePublisher<T> bindExec()
default TransformablePublisher<T> bindExec(Action<? super T> disposer)
disposer
- the disposer of unhandled itemsdefault <R> Promise<R> reduce(R seed, BiFunction<? super R,? super T,? extends R> reducer)
R
- the type of resultseed
- the initial valuereducer
- the reducing functiondefault TransformablePublisher<T> fork(Action<? super ExecSpec> execConfig, Action<? super T> disposer)
execConfig
- the configuration for the forked executiondisposer
- the disposer for any buffered items when the stream errors or is cancelledStreams.fork(Publisher, Action, Action)
default TransformablePublisher<T> fork()
This method is identical to fork(Action, Action)
, but uses Action.noop()
for both arguments.
Streams.fork(Publisher, Action, Action)
default TransformablePublisher<T> take(long count)
n
elementsdefault TransformablePublisher<T> batch(int batchSize, Action<? super T> disposer)