T
- the type of promised valuepublic interface Promise<T>
A promise is a representation of a value which will become available later.
Methods such as map(Function)
, flatMap(Function)
, cache()
etc.) allow a pipeline of “operations” to be specified,
that the value will travel through as it becomes available.
Such operations are implemented via the transform(Function)
method.
Each operation returns a new promise object, not the original promise object.
To create a promise, use the of(Upstream)
method (or one of the variants such as ofLazy(Factory)
.
To test code that uses promises, use the ExecHarness
.
The promise is not “activated” until the then(Action)
method is called.
This method terminates the pipeline, and receives the final value.
Promise objects are multi use.
Every promise pipeline has a value producing function at its start.
Activating a promise (i.e. calling then(Action)
) invokes the function.
The cache()
operation can be used to change this behaviour.
Modifier and Type | Method and Description |
---|---|
default <O> Promise<O> |
apply(Function<? super Promise<T>,? extends Promise<O>> function)
Applies the custom operation function to this promise.
|
default <O> Promise<O> |
blockingMap(Function<? super T,? extends O> transformer)
Like
map(Function) , but performs the transformation on a blocking thread. |
default Promise<T> |
blockingOp(Action<? super T> action)
Executes the given action with the promise value, on a blocking thread.
|
default Promise<T> |
cache()
Caches the promised value (or error) and returns it to all subscribers.
|
void |
connect(Downstream<T> downstream)
A low level hook for consuming the promised value.
|
default Promise<T> |
defer(Action<? super Runnable> releaser)
Allows the execution of the promise to be deferred to a later time.
|
static <T> Promise<T> |
error(Throwable t)
Creates a failed promise with the given error.
|
default <O> Promise<O> |
flatMap(Function<? super T,? extends Promise<O>> transformer)
Transforms the promised value by applying the given function to it that returns a promise for the transformed value.
|
default <O> Promise<Pair<O,T>> |
left(Promise<O> left) |
default <O> Promise<O> |
map(Function<? super T,? extends O> transformer)
Transforms the promised value by applying the given function to it.
|
default Promise<T> |
mapError(Function<? super Throwable,? extends T> transformer)
Transforms the promise failure (potentially into a value) by applying the given function to it.
|
default Promise<T> |
next(Action<? super T> action)
Executes the provided, potentially asynchronous,
Action with the promised value as input. |
default <O> Promise<O> |
next(Promise<O> next)
Deprecated.
replaced by
replace(Promise) as of 1.1.0 |
default Promise<T> |
nextOp(Function<? super T,? extends Operation> function)
Executes the operation returned by the given function.
|
static <T> Promise<T> |
of(Upstream<T> upstream)
Creates a promise for value that will be available later.
|
static <T> Promise<T> |
ofLazy(Factory<T> factory)
Creates a promise for value produced by the given factory.
|
default Promise<T> |
onError(Action<? super Throwable> errorHandler)
Specifies the action to take if the an error occurs trying to produce the promised value.
|
default <E extends Throwable> |
onError(Class<E> errorType,
Action<? super E> errorHandler)
Specifies the action to take if the an error of the given type occurs trying to produce the promised value.
|
default Promise<T> |
onError(Predicate<? super Throwable> predicate,
Action<? super Throwable> errorHandler)
Specifies the action to take if the an error occurs trying to produce the promised value, that the given predicate applies to.
|
default Promise<T> |
onNull(Block action)
A convenience shorthand for
routing null values. |
default Promise<T> |
onYield(Runnable onYield)
Registers a listener that is invoked when
this promise is initiated. |
default Operation |
operation() |
default Operation |
operation(Action<? super T> action) |
default <O> Promise<O> |
replace(Promise<O> next)
Replaces
this promise with the provided promise for downstream subscribers. |
default void |
result(Action<? super Result<T>> resultHandler)
Consume the promised value as a
Result . |
default <O> Promise<Pair<T,O>> |
right(Promise<O> right) |
default Promise<T> |
route(Predicate<? super T> predicate,
Action<? super T> action)
Allows the promised value to be handled specially if it meets the given predicate, instead of being handled by the promise subscriber.
|
void |
then(Action<? super T> then)
Specifies what should be done with the promised object when it becomes available.
|
default Promise<T> |
throttled(Throttle throttle)
Throttles
this promise, using the given throttle . |
default <O> O |
to(Function<? super Promise<T>,? extends O> function)
Applies the given function to
this and returns the result. |
<O> Promise<O> |
transform(Function<? super Upstream<? extends T>,? extends Upstream<O>> upstreamTransformer)
Apply a custom transform to this promise.
|
static <T> Promise<T> |
value(T t)
Creates a promise for the given value.
|
default Promise<T> |
wiretap(Action<? super Result<T>> listener)
Registers a listener for the promise outcome.
|
static <T> Promise<T> |
wrap(Factory<? extends Promise<T>> factory) |
static <T> Promise<T> of(Upstream<T> upstream)
This method can be used to integrate with APIs that produce values asynchronously.
The Upstream.connect(Downstream)
method will be invoked every time the value is requested.
This method should propagate the value (or error) to the given downstream object when it is available.
T
- the type of promised valueupstream
- the producer of the valuestatic <T> Promise<T> value(T t)
This method can be used when a promise is called for, but the value is immediately available.
T
- the type of promised valuet
- the promised valuestatic <T> Promise<T> ofLazy(Factory<T> factory)
This method can be used when a promise is called for, and the value is available synchronously as the result of a function.
T
- the type of promised valuefactory
- the producer of the valuestatic <T> Promise<T> error(Throwable t)
This method can be used when a promise is called for, but the failure is immediately available.
T
- the type of promised valuet
- the errorvoid then(Action<? super T> then)
Important: this method can only be used from a Ratpack managed compute thread.
If it is called on a non Ratpack managed compute thread it will immediately throw an ExecutionException
.
then
- the receiver of the promised valueExecutionException
- if not called on a Ratpack managed compute threadvoid connect(Downstream<T> downstream)
It is generally preferable to use then(Action)
over this method.
downstream
- the downstream consumer<O> Promise<O> transform(Function<? super Upstream<? extends T>,? extends Upstream<O>> upstreamTransformer)
This method is the basis for the standard operations of this interface, such as map(Function)
.
The following is a non generic implementation of a map that converts the value to upper case.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
Promise.value("foo")
.transform(up -> down ->
up.connect(down.<String>onSuccess(value -> {
try {
down.success(value.toUpperCase());
} catch (Throwable e) {
down.error(e);
}
}))
)
);
assertEquals("FOO", result.getValue());
}
}
The “upstreamTransformer” function takes an upstream data source, and returns another upstream that wraps it.
It is typical for the returned upstream to invoke the Upstream.connect(Downstream)
method of the given upstream during its connect method.
For more examples of transform implementations, please see the implementations of the methods of this interface.
O
- the type of item emitted by the transformed upstreamupstreamTransformer
- a function that returns a new upstream, typically wrapping the given upstream argumentdefault Promise<T> onError(Predicate<? super Throwable> predicate, Action<? super Throwable> errorHandler)
If the given action throws an exception, the original exception will be rethrown with the exception thrown by the action added to the suppressed exceptions list.
predicate
- the predicate to test against the errorerrorHandler
- the action to take if an error occursdefault <E extends Throwable> Promise<T> onError(Class<E> errorType, Action<? super E> errorHandler)
import ratpack.http.TypedData;
import ratpack.test.embed.EmbeddedApp;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
EmbeddedApp.fromHandler(ctx ->
ctx.getRequest().getBody()
.map(TypedData::getText)
.map(t -> {
if (t.equals("1")) {
throw new IllegalArgumentException("validation error!");
} else {
throw new RuntimeException("some other error!");
}
})
.onError(IllegalArgumentException.class, e -> ctx.render("the value is invalid"))
.onError(e -> ctx.render("unknown error: " + e.getMessage()))
.then(t -> ctx.render("ok"))
).test(httpClient -> {
assertEquals(httpClient.requestSpec(r -> r.getBody().text("0")).postText(), "unknown error: some other error!");
assertEquals(httpClient.requestSpec(r -> r.getBody().text("1")).postText(), "the value is invalid");
});
}
}
If the given action throws an exception, the original exception will be rethrown with the exception thrown by the action added to the suppressed exceptions list.
E
- the type of exception to handle with the given actionerrorType
- the type of exception to handle with the given actionerrorHandler
- the action to take if an error occursdefault Promise<T> onError(Action<? super Throwable> errorHandler)
If the given action throws an exception, the original exception will be rethrown with the exception thrown by the action added to the suppressed exceptions list.
errorHandler
- the action to take if an error occursdefault void result(Action<? super Result<T>> resultHandler)
Result
.
This method is an alternative to then(Action)
and onError(Action)
.
resultHandler
- the consumer of the resultdefault <O> Promise<O> map(Function<? super T,? extends O> transformer)
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
Promise.value("foo")
.map(String::toUpperCase)
.map(s -> s + "-BAR")
);
assertEquals("FOO-BAR", result.getValue());
}
}
O
- the type of the transformed objecttransformer
- the transformation to apply to the promised valuedefault <O> Promise<O> blockingMap(Function<? super T,? extends O> transformer)
map(Function)
, but performs the transformation on a blocking thread.
This is simply a more convenient form of using Blocking.get(Factory)
and flatMap(Function)
.
O
- the type of the transformed objecttransformer
- the transformation to apply to the promised value, on a blocking threaddefault Promise<T> blockingOp(Action<? super T> action)
Similar to blockingMap(Function)
, but does not provide a new value.
This can be used to do something with the value, without terminating the promise.
action
- the action to to perform with the value, on a blocking thread@Deprecated default <O> Promise<O> next(Promise<O> next)
replace(Promise)
as of 1.1.0
Use replace(Promise)
.
O
- the type of value provided by the replacement promisenext
- the promise to replace this
withdefault Promise<T> next(@NonBlocking Action<? super T> action)
Action
with the promised value as input.
This method can be used when needing to perform an action with the promised value, without substituting the promised value. That is, the exact same object provided to the given action will be propagated downstream.
The given action is executed within an Operation
, allowing it to perform asynchronous work.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import com.google.common.collect.Lists;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
List<String> events = Lists.newLinkedList();
ExecHarness.runSingle(c ->
Promise.value("foo")
.next(v ->
Promise.value(v) // may be async
.map(String::toUpperCase)
.then(events::add)
)
.then(events::add)
);
assertEquals(Arrays.asList("FOO", "foo"), events);
}
}
action
- the action to execute with the promised valuenextOp(Function)
default Promise<T> nextOp(Function<? super T,? extends Operation> function)
This method can be used when needing to perform an operation returned by another object, based on the promised value.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import ratpack.exec.Operation;
import com.google.common.collect.Lists;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class Example {
public static class CaseService {
public Operation toUpper(String value, List<String> values) {
return Operation.of(() -> values.add(value.toUpperCase()));
}
}
public static void main(String... args) throws Exception {
CaseService service = new CaseService();
List<String> events = Lists.newLinkedList();
ExecHarness.runSingle(c ->
Promise.value("foo")
.nextOp(v -> service.toUpper(v, events))
.then(events::add)
);
assertEquals(Arrays.asList("FOO", "foo"), events);
}
}
function
- a function that returns an operation that acts on the promised valuenext(Action)
default <O> Promise<O> replace(Promise<O> next)
this
promise with the provided promise for downstream subscribers.
This is simply a more convenient form of flatMap(Function)
, where the given promise is returned.
This method can be used when a subsequent operation on a promise isn't dependent on the actual promised value.
If the upstream promise fails, its error will propagate downstream and the given promise will never be subscribed to.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import static org.junit.Assert.assertEquals;
public class Example {
private static String value;
public static void main(String... args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
Promise.value("foo")
.next(v -> value = v)
.replace(Promise.value("bar"))
);
assertEquals("bar", result.getValue());
assertEquals("foo", value);
}
}
O
- the type of the value of the replacement promisenext
- the promise to replace this
withdefault Operation operation()
default Promise<T> mapError(Function<? super Throwable,? extends T> transformer)
If the function returns a value, the promise will now be considered successful.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
Promise.<String>error(new Exception("!"))
.mapError(e -> "value")
);
assertEquals("value", result.getValue());
}
}
If the function throws an exception, that exception will now represent the promise failure.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
Promise.<String>error(new Exception("!"))
.mapError(e -> { throw new RuntimeException("mapped", e); })
);
assertEquals("mapped", result.getThrowable().getMessage());
}
}
The function will not be called if the promise is successful.
transformer
- the transformation to apply to the promise failuredefault <O> Promise<O> apply(Function<? super Promise<T>,? extends Promise<O>> function)
This method can be used to apply custom operations without breaking the “code flow”. It works particularly well with method references.
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
Integer value = ExecHarness.yieldSingle(e ->
Promise.value(1)
.apply(Example::dubble)
.apply(Example::triple)
).getValue();
assertEquals(Integer.valueOf(6), value);
}
public static Promise<Integer> dubble(Promise<Integer> input) {
return input.map(i -> i * 2);
}
public static Promise<Integer> triple(Promise<Integer> input) {
return input.map(i -> i * 3);
}
}
If the apply function throws an exception, the returned promise will fail.
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
Throwable error = ExecHarness.yieldSingle(e ->
Promise.value(1)
.apply(Example::explode)
).getThrowable();
assertEquals("bang!", error.getMessage());
}
public static Promise<Integer> explode(Promise<Integer> input) throws Exception {
throw new Exception("bang!");
}
}
If the promise having the operation applied to fails, the operation will not be applied.
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
Throwable error = ExecHarness.yieldSingle(e ->
Promise.<Integer>error(new Exception("bang!"))
.apply(Example::dubble)
).getThrowable();
assertEquals("bang!", error.getMessage());
}
public static Promise<Integer> dubble(Promise<Integer> input) {
return input.map(i -> i * 2);
}
}
O
- the type of promised object after the operationfunction
- the operation implementationdefault <O> O to(Function<? super Promise<T>,? extends O> function) throws Exception
this
and returns the result.
This method can be useful when needing to convert a promise to another type as it facilitates doing so without breaking the “code flow”. For example, this can be used when integrating with RxJava.
import ratpack.rx.RxRatpack;
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class Example {
private static final List<String> LOG = new LinkedList<>();
public static void main(String... args) throws Exception {
ExecHarness.runSingle(e ->
Promise.value("foo")
.to(RxRatpack::observe)
.doOnNext(i -> LOG.add("doOnNext"))
.subscribe(LOG::add)
);
assertEquals(Arrays.asList("doOnNext", "foo"), LOG);
}
}
The given function is executed immediately.
This method should only be used when converting a promise to another type.
See apply(Function)
for applying custom promise operators.
O
- the type the promise will be converted tofunction
- the promise conversion functionException
- any thrown by the given functiondefault <O> Promise<O> flatMap(Function<? super T,? extends Promise<O>> transformer)
This is useful when the transformation involves an asynchronous operation.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import ratpack.exec.Blocking;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String[] args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
Promise.value("foo")
.flatMap(s -> Blocking.get(s::toUpperCase))
.map(s -> s + "-BAR")
);
assertEquals("FOO-BAR", result.getValue());
}
}
O
- the type of the transformed objecttransformer
- the transformation to apply to the promised valuedefault Promise<T> route(Predicate<? super T> predicate, Action<? super T> action)
This is typically used for validating values, centrally.
import com.google.common.collect.Lists;
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import java.util.List;
import static org.junit.Assert.*;
public class Example {
public static ExecResult<Integer> yield(int i, List<Integer> collector) throws Exception {
return ExecHarness.yieldSingle(c ->
Promise.value(i)
.route(v -> v > 5, collector::add)
);
}
public static void main(String... args) throws Exception {
List<Integer> routed = Lists.newLinkedList();
ExecResult<Integer> result1 = yield(1, routed);
assertEquals(new Integer(1), result1.getValue());
assertFalse(result1.isComplete()); // false because promise returned a value before the execution completed
assertTrue(routed.isEmpty());
ExecResult<Integer> result10 = yield(10, routed);
assertNull(result10.getValue());
assertTrue(result10.isComplete()); // true because the execution completed before the promised value was returned (i.e. it was routed)
assertTrue(routed.contains(10));
}
}
Be careful about using this where the eventual promise subscriber is unlikely to know that the promise will routed as it can be surprising when neither the promised value nor an error appears.
It can be useful at the handler layer to provide common validation.
import ratpack.exec.Promise;
import ratpack.handling.Context;
import ratpack.test.embed.EmbeddedApp;
import static org.junit.Assert.assertEquals;
public class Example {
public static Promise<Integer> getAge(Context ctx) {
return Promise.value(10)
.route(
i -> i < 21,
i -> ctx.render(i + " is too young to be here!")
);
}
public static void main(String... args) throws Exception {
EmbeddedApp.fromHandler(ctx ->
getAge(ctx).then(age -> ctx.render("welcome!"))
).test(httpClient -> {
assertEquals("10 is too young to be here!", httpClient.getText());
});
}
}
If the routed-to action throws an exception, it will be forwarded down the promise chain.
predicate
- the condition under which the value should be routedaction
- the terminal action for the valuedefault Promise<T> onNull(Block action)
routing
null
values.
If the promised value is null
, the given action will be called.
action
- the action to route to if the promised value is nulldefault Promise<T> cache()
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
ExecHarness.runSingle(c -> {
AtomicLong counter = new AtomicLong();
Promise<Long> uncached = Promise.of(f -> f.success(counter.getAndIncrement()));
uncached.then(i -> assertEquals(0l, i.longValue()));
uncached.then(i -> assertEquals(1l, i.longValue()));
uncached.then(i -> assertEquals(2l, i.longValue()));
Promise<Long> cached = uncached.cache();
cached.then(i -> assertEquals(3l, i.longValue()));
cached.then(i -> assertEquals(3l, i.longValue()));
uncached.then(i -> assertEquals(4l, i.longValue()));
cached.then(i -> assertEquals(3l, i.longValue()));
});
}
}
If the cached promise fails, the same exception will be returned every time.
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.assertTrue;
public class Example {
public static void main(String... args) throws Exception {
ExecHarness.runSingle(c -> {
Throwable error = new Exception("bang!");
Promise<Object> cached = Promise.of(f -> f.error(error)).cache();
cached.onError(t -> assertTrue(t == error)).then(i -> assertTrue("not called", false));
cached.onError(t -> assertTrue(t == error)).then(i -> assertTrue("not called", false));
cached.onError(t -> assertTrue(t == error)).then(i -> assertTrue("not called", false));
});
}
}
default Promise<T> defer(Action<? super Runnable> releaser)
When the returned promise is subscribed to, the given releaser
action will be invoked.
The execution of this
promise is deferred until the runnable given to the releaser
is run.
It is generally more convenient to use throttled(Throttle)
or onYield(Runnable)
than this operation.
releaser
- the action that will initiate the execution some time laterdefault Promise<T> onYield(Runnable onYield)
this
promise is initiated.
import com.google.common.collect.Lists;
import ratpack.test.exec.ExecHarness;
import ratpack.exec.Promise;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
List<String> events = Lists.newLinkedList();
ExecHarness.runSingle(c ->
Promise.<String>of(f -> {
events.add("promise");
f.success("foo");
})
.onYield(() -> events.add("onYield"))
.then(v -> events.add("then"))
);
assertEquals(Arrays.asList("onYield", "promise", "then"), events);
}
}
onYield
- the action to take when the promise is initiatedthis
promisedefault Promise<T> wiretap(Action<? super Result<T>> listener)
import com.google.common.collect.Lists;
import ratpack.test.exec.ExecHarness;
import ratpack.exec.Promise;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
List<String> events = Lists.newLinkedList();
ExecHarness.runSingle(c ->
Promise.<String>of(f -> {
events.add("promise");
f.success("foo");
})
.wiretap(r -> events.add("wiretap: " + r.getValue()))
.then(v -> events.add("then"))
);
assertEquals(Arrays.asList("promise", "wiretap: foo", "then"), events);
}
}
listener
- the result listenerthis
promisedefault Promise<T> throttled(Throttle throttle)
this
promise, using the given throttle
.
Throttling can be used to limit concurrency. Typically to limit concurrent use of an external resource, such as a HTTP API.
Note that the Throttle
instance given defines the actual throttling semantics.
import ratpack.exec.Throttle;
import ratpack.exec.Promise;
import ratpack.exec.Execution;
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
public class Example {
public static void main(String... args) throws Exception {
int numJobs = 1000;
int maxAtOnce = 10;
ExecResult<Integer> result = ExecHarness.yieldSingle(exec -> {
AtomicInteger maxConcurrent = new AtomicInteger();
AtomicInteger active = new AtomicInteger();
AtomicInteger done = new AtomicInteger();
Throttle throttle = Throttle.ofSize(maxAtOnce);
// Launch numJobs forked executions, and return the maximum number that were executing at any given time
return Promise.of(outerFulfiller -> {
for (int i = 0; i < numJobs; i++) {
Execution.fork().start(forkedExec ->
Promise.<Integer>of(innerFulfiller -> {
int activeNow = active.incrementAndGet();
int maxConcurrentVal = maxConcurrent.updateAndGet(m -> Math.max(m, activeNow));
active.decrementAndGet();
innerFulfiller.success(maxConcurrentVal);
})
.throttled(throttle) // limit concurrency
.then(max -> {
if (done.incrementAndGet() == numJobs) {
outerFulfiller.success(max);
}
})
);
}
});
});
assertTrue(result.getValue() <= maxAtOnce);
}
}
throttle
- the particular throttle to use to throttle the operation