public interface ReadWriteAccess
ReadWriteLock
.
Can be used whenever a “resource” has safe concurrent usages and mutually exclusive usages, such as updating a file.
The read(Promise)
and write(Promise)
methods decorate promises with serialization.
Read serialized promises may execute concurrently with other read serialized promises,
but not with write serialized promises.
Write serialized promises may not execute concurrently with read or write serialized promises.
Access is generally fair. That is, access is granted in the order that promises execute (n.b. not in the order they are decorated).
Access is not reentrant. Deadlocks are not detected or prevented.
import com.google.common.io.Files;
import io.netty.buffer.ByteBufAllocator;
import ratpack.exec.Promise;
import ratpack.exec.util.ParallelBatch;
import ratpack.exec.util.ReadWriteAccess;
import ratpack.file.FileIo;
import ratpack.test.embed.EmbeddedApp;
import ratpack.test.embed.EphemeralBaseDir;
import ratpack.test.exec.ExecHarness;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.time.Duration;
import static java.nio.file.StandardOpenOption.*;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
EphemeralBaseDir.tmpDir().use(baseDir -> {
ReadWriteAccess access = ReadWriteAccess.create(Duration.ofSeconds(5));
Path f = baseDir.write("f", "foo");
EmbeddedApp.of(a -> a
.serverConfig(c -> c.baseDir(baseDir.getRoot()))
.handlers(c -> {
ByteBufAllocator allocator = c.getRegistry().get(ByteBufAllocator.class);
c.path(ctx ->
ctx.byMethod(m -> m
.get(() ->
FileIo.read(FileIo.open(f, READ, CREATE), allocator, 8192)
.apply(access::read)
.map(b -> b.toString(Charset.defaultCharset()))
.then(ctx::render)
)
.post(() ->
FileIo.write(ctx.getRequest().getBodyStream(), FileIo.open(f, WRITE, CREATE, TRUNCATE_EXISTING))
.apply(access::write)
.then(written -> ctx.render(written.toString()))
)
)
);
})
).test(httpClient -> {
// Create a bunch of reads and writes
List<Promise<String>> requests = new ArrayList<>();
for (int i = 0; i < 200; ++i) {
requests.add(Promise.sync(httpClient::getText));
}
for (int i = 0; i < 200; ++i) {
requests.add(Promise.sync(() ->
httpClient.request(r -> r
.post().getBody().text("foo")
).getBody().getText()
));
}
// Interleave
Collections.shuffle(requests);
// Execute them in parallel
List<String> results = ExecHarness.yieldSingle(r ->
ParallelBatch.of(requests).yield()
).getValueOrThrow();
assertEquals("foo", Files.asCharSource(f.toFile(), Charset.defaultCharset()).read());
assertEquals(400, results.size());
});
});
}
}
Modifier and Type | Interface and Description |
---|---|
static class |
ReadWriteAccess.TimeoutException
Thrown if access could not be acquired within the given timeout value.
|
Modifier and Type | Method and Description |
---|---|
static ReadWriteAccess |
create(java.time.Duration defaultTimeout)
Create a new read/write access object with the given default timeout.
|
java.time.Duration |
getDefaultTimeout()
The default timeout value.
|
<T> Promise<T> |
read(Promise<T> promise)
Decorates the given promise with read serialization.
|
<T> Promise<T> |
read(Promise<T> promise,
java.time.Duration timeout)
Decorates the given promise with read serialization and the given timeout.
|
<T> Promise<T> |
write(Promise<T> promise)
Decorates the given promise with write serialization.
|
<T> Promise<T> |
write(Promise<T> promise,
java.time.Duration timeout)
Decorates the given promise with write serialization.
|
static ReadWriteAccess create(java.time.Duration defaultTimeout)
defaultTimeout
- the default maximum amount of time to wait for access (must not be negative, 0 == infinite)java.time.Duration getDefaultTimeout()
<T> Promise<T> read(Promise<T> promise)
Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises.
If access is not granted within the default timeout, the promise will wail with ReadWriteAccess.TimeoutException
.
T
- the type of promised valuepromise
- the promise to decorate<T> Promise<T> read(Promise<T> promise, java.time.Duration timeout)
Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises.
If access is not granted within the given timeout, the promise will wail with ReadWriteAccess.TimeoutException
.
T
- the type of promised valuepromise
- the promise to decoratetimeout
- the maximum amount of time to wait for access (must not be negative, 0 == infinite)<T> Promise<T> write(Promise<T> promise)
Write serialized promises may not execute concurrently with read or write serialized promises.
If access is not granted within the default timeout, the promise will wail with ReadWriteAccess.TimeoutException
.
T
- the type of promised valuepromise
- the promise to decorate<T> Promise<T> write(Promise<T> promise, java.time.Duration timeout)
Write serialized promises may not execute concurrently with read or write serialized promises.
If access is not granted within the given timeout, the promise will wail with ReadWriteAccess.TimeoutException
.
T
- the type of promised valuepromise
- the promise to decoratetimeout
- the maximum amount of time to wait for access (must not be negative, 0 == infinite)