Interface ReadWriteAccess


  • public interface ReadWriteAccess
    Provides read/write serialization, analogous to ReadWriteLock.

    Can be used whenever a “resource” has safe concurrent usages and mutually exclusive usages, such as updating a file.

    The read(Promise) and write(Promise) methods decorate promises with serialization. Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises. Write serialized promises may not execute concurrently with read or write serialized promises.

    Access is generally fair. That is, access is granted in the order that promises execute (n.b. not in the order they are decorated).

    Access is not reentrant. Deadlocks are not detected or prevented.

    
     import com.google.common.io.Files;
     import io.netty.buffer.ByteBufAllocator;
     import ratpack.exec.Promise;
     import ratpack.exec.util.ParallelBatch;
     import ratpack.exec.util.ReadWriteAccess;
     import ratpack.file.FileIo;
     import ratpack.test.embed.EmbeddedApp;
     import ratpack.test.embed.EphemeralBaseDir;
     import ratpack.test.exec.ExecHarness;
    
     import java.nio.charset.Charset;
     import java.nio.file.Path;
     import java.util.ArrayList;
     import java.util.Collections;
     import java.util.List;
     import java.time.Duration;
    
     import static java.nio.file.StandardOpenOption.*;
     import static org.junit.Assert.assertEquals;
    
     public class Example {
    
       public static void main(String... args) throws Exception {
         EphemeralBaseDir.tmpDir().use(baseDir -> {
           ReadWriteAccess access = ReadWriteAccess.create(Duration.ofSeconds(5));
           Path f = baseDir.write("f", "foo");
    
           EmbeddedApp.of(a -> a
             .serverConfig(c -> c.baseDir(baseDir.getRoot()))
             .handlers(c -> {
               ByteBufAllocator allocator = c.getRegistry().get(ByteBufAllocator.class);
    
               c.path(ctx ->
                 ctx.byMethod(m -> m
                   .get(() ->
                     FileIo.read(FileIo.open(f, READ, CREATE), allocator, 8192)
                       .apply(access::read)
                       .map(b -> { try { return b.toString(Charset.defaultCharset()); } finally { b.release(); } })
                       .then(ctx::render)
                   )
                   .post(() ->
                     FileIo.write(ctx.getRequest().getBodyStream(), FileIo.open(f, WRITE, CREATE, TRUNCATE_EXISTING))
                       .apply(access::write)
                       .then(written -> ctx.render(written.toString()))
                   )
                 )
               );
             })
           ).test(httpClient -> {
    
             // Create a bunch of reads and writes
             List<Promise<String>> requests = new ArrayList<>();
             for (int i = 0; i < 200; ++i) {
               requests.add(Promise.sync(httpClient::getText));
             }
             for (int i = 0; i < 200; ++i) {
               requests.add(Promise.sync(() ->
                 httpClient.request(r -> r
                   .post().getBody().text("foo")
                 ).getBody().getText()
               ));
             }
    
             // Interleave
             Collections.shuffle(requests);
    
             // Execute them in parallel
             List<String> results = ExecHarness.yieldSingle(r ->
               ParallelBatch.of(requests).yield()
             ).getValueOrThrow();
    
             assertEquals("foo", Files.asCharSource(f.toFile(), Charset.defaultCharset()).read());
             assertEquals(400, results.size());
           });
         });
       }
     }
    
     
    Since:
    1.5
    • Nested Class Summary

      Nested Classes 
      Modifier and Type Interface Description
      static class  ReadWriteAccess.TimeoutException
      Thrown if access could not be acquired within the given timeout value.
    • Method Summary

      All Methods Static Methods Instance Methods Abstract Methods 
      Modifier and Type Method Description
      static ReadWriteAccess create​(java.time.Duration defaultTimeout)
      Create a new read/write access object with the given default timeout.
      java.time.Duration getDefaultTimeout()
      The default timeout value.
      <T> Promise<T> read​(Promise<T> promise)
      Decorates the given promise with read serialization.
      <T> Promise<T> read​(Promise<T> promise, java.time.Duration timeout)
      Decorates the given promise with read serialization and the given timeout.
      <T> Promise<T> write​(Promise<T> promise)
      Decorates the given promise with write serialization.
      <T> Promise<T> write​(Promise<T> promise, java.time.Duration timeout)
      Decorates the given promise with write serialization.
    • Method Detail

      • create

        static ReadWriteAccess create​(java.time.Duration defaultTimeout)
        Create a new read/write access object with the given default timeout.
        Parameters:
        defaultTimeout - the default maximum amount of time to wait for access (must not be negative, 0 == infinite)
        Returns:
        a new read/write access object
      • getDefaultTimeout

        java.time.Duration getDefaultTimeout()
        The default timeout value.
        Returns:
        the default timeout value
      • read

        <T> Promise<T> read​(Promise<T> promise)
        Decorates the given promise with read serialization.

        Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises.

        If access is not granted within the default timeout, the promise will wail with ReadWriteAccess.TimeoutException.

        Type Parameters:
        T - the type of promised value
        Parameters:
        promise - the promise to decorate
        Returns:
        a decorated promise
      • read

        <T> Promise<T> read​(Promise<T> promise,
                            java.time.Duration timeout)
        Decorates the given promise with read serialization and the given timeout.

        Read serialized promises may execute concurrently with other read serialized promises, but not with write serialized promises.

        If access is not granted within the given timeout, the promise will wail with ReadWriteAccess.TimeoutException.

        Type Parameters:
        T - the type of promised value
        Parameters:
        promise - the promise to decorate
        timeout - the maximum amount of time to wait for access (must not be negative, 0 == infinite)
        Returns:
        a decorated promise
      • write

        <T> Promise<T> write​(Promise<T> promise)
        Decorates the given promise with write serialization.

        Write serialized promises may not execute concurrently with read or write serialized promises.

        If access is not granted within the default timeout, the promise will wail with ReadWriteAccess.TimeoutException.

        Type Parameters:
        T - the type of promised value
        Parameters:
        promise - the promise to decorate
        Returns:
        a decorated promise
      • write

        <T> Promise<T> write​(Promise<T> promise,
                             java.time.Duration timeout)
        Decorates the given promise with write serialization.

        Write serialized promises may not execute concurrently with read or write serialized promises.

        If access is not granted within the given timeout, the promise will wail with ReadWriteAccess.TimeoutException.

        Type Parameters:
        T - the type of promised value
        Parameters:
        promise - the promise to decorate
        timeout - the maximum amount of time to wait for access (must not be negative, 0 == infinite)
        Returns:
        a decorated promise