T
- the type of item emitted by this publisherpublic interface TransformablePublisher<T> extends Publisher<T>
Publisher
that makes it more convenient to chain transformations of different kinds.
Note that this type implements the publisher interface, so behaves just like the publisher that it is wrapping with respect to the Publisher.subscribe(org.reactivestreams.Subscriber)
method.
Modifier and Type | Method and Description |
---|---|
default TransformablePublisher<T> |
buffer()
|
default TransformablePublisher<T> |
filter(Predicate<T> filter)
|
default <O> TransformablePublisher<O> |
flatMap(Function<? super T,? extends Promise<? extends O>> function)
|
default TransformablePublisher<T> |
gate(Action<? super Runnable> valveReceiver)
|
default <O> TransformablePublisher<O> |
map(Function<? super T,? extends O> function)
|
default TransformablePublisher<T> |
multicast()
|
default <O> TransformablePublisher<O> |
streamMap(Function<? super WriteStream<O>,? extends WriteStream<T>> function)
|
default Promise<List<T>> |
toList()
Consumes the given publisher's items to a list.
|
default Promise<List<T>> |
toList(ExecControl execControl)
See
toList() . |
default Promise<T> |
toPromise()
|
default Promise<T> |
toPromise(ExecControl execControl)
|
default <O> TransformablePublisher<O> |
transform(Function<? super TransformablePublisher<T>,? extends Publisher<O>> transformer)
Convenience method to allow a non Ratpack publisher transform method to be hooked in.
|
default TransformablePublisher<T> |
wiretap(Action<? super StreamEvent<? super T>> listener)
|
default <O> TransformablePublisher<O> map(Function<? super T,? extends O> function)
O
- the type of transformed itemfunction
- the transformationdefault <O> TransformablePublisher<O> flatMap(Function<? super T,? extends Promise<? extends O>> function)
O
- the type of transformed itemfunction
- the transformationdefault TransformablePublisher<T> buffer()
default TransformablePublisher<T> gate(Action<? super Runnable> valveReceiver)
valveReceiver
- an action that receives a runnable “valve” that when run allows request to start flowing upstreamdefault TransformablePublisher<T> wiretap(Action<? super StreamEvent<? super T>> listener)
listener
- the listener for emitted itemsdefault TransformablePublisher<T> multicast()
default Promise<T> toPromise(ExecControl execControl)
execControl
- the exec control to create the promise fromdefault Promise<List<T>> toList()
This method can be useful when testing, but should be uses with care in production code as it will exhaust memory if the stream is very large.
import org.reactivestreams.Publisher;
import ratpack.stream.Streams;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
public class Example {
public static void main(String... args) throws Exception {
List<String> expected = Arrays.asList("a", "b", "c");
List<String> result = ExecHarness.yieldSingle(execControl ->
Streams.publish(expected).toList()
).getValue();
assertEquals(Arrays.asList("a", "b", "c"), result);
}
}
If the publisher emits an error, the promise will fail and the collected items will be discarded.
import org.reactivestreams.Publisher;
import ratpack.stream.Streams;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.*;
public class Example {
public static void main(String... args) throws Exception {
Throwable error = ExecHarness.yieldSingle(execControl ->
Streams.yield(r -> {
if (r.getRequestNum() < 1) {
return "a";
} else {
throw new RuntimeException("bang!");
}
}).toList()
).getThrowable();
assertEquals("bang!", error.getMessage());
}
}
default Promise<List<T>> toList(ExecControl execControl)
toList()
.execControl
- the exec control to create the promise fromdefault <O> TransformablePublisher<O> transform(Function<? super TransformablePublisher<T>,? extends Publisher<O>> transformer)
This transformable publisher will be given to the function, that should return a new publisher. The returned publisher will then be wrapped in a transformable wrapper which will be returned by this method.
O
- the type of transformed itemtransformer
- a publisher transformerdefault <O> TransformablePublisher<O> streamMap(Function<? super WriteStream<O>,? extends WriteStream<T>> function)
O
- the type of transformed itemfunction
- the transformationdefault TransformablePublisher<T> filter(Predicate<T> filter)
filter
- the filter