package ratpack.exec.util.internal;

import io.netty.util.concurrent.ScheduledFuture;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import ratpack.exec.Downstream;
import ratpack.exec.Promise;
import ratpack.exec.Upstream;
import ratpack.exec.internal.Continuation;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.util.ReadWriteAccess;

/* loaded from: input_file:ratpack/exec/util/internal/DefaultReadWriteAccess.class */
public class DefaultReadWriteAccess implements ReadWriteAccess {
    private final Duration defaultTimeout;
    private final Queue<Access<?>> queue = new ConcurrentLinkedQueue();
    private final AtomicBoolean draining = new AtomicBoolean();
    private final AtomicInteger activeReaders = new AtomicInteger();
    private AtomicReference<Access<?>> pendingWriteRef = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ratpack/exec/util/internal/DefaultReadWriteAccess$Access.class */
    public class Access<T> {
        private final boolean read;
        private final Upstream<? extends T> upstream;
        private final Duration timeout;
        private final Downstream<? super T> downstream;
        private final DefaultExecution execution;
        private boolean fired;
        private Continuation continuation;
        private ScheduledFuture<?> timeoutFuture;

        private Access(boolean z, Upstream<? extends T> upstream, Duration duration, Downstream<? super T> downstream) {
            if (duration.isNegative()) {
                throw new IllegalArgumentException("Timeout value must not be negative");
            }
            this.read = z;
            this.upstream = upstream;
            this.timeout = duration;
            this.downstream = downstream;
            this.execution = DefaultExecution.get();
            this.execution.delimit(th -> {
                relinquish(false);
                if (fire()) {
                    downstream.error(th);
                }
            }, continuation -> {
                if (!duration.isZero()) {
                    this.timeoutFuture = this.execution.getEventLoop().schedule(this::timeout, duration.toMillis(), TimeUnit.MILLISECONDS);
                }
                this.continuation = continuation;
                DefaultReadWriteAccess.this.queue.add(this);
                DefaultReadWriteAccess.this.drain();
            });
        }

        private boolean fire() {
            if (this.fired) {
                return false;
            }
            this.fired = true;
            return true;
        }

        private void timeout() {
            if (fire()) {
                this.continuation.resume(() -> {
                    DefaultReadWriteAccess.this.drain();
                    this.downstream.error(new ReadWriteAccess.TimeoutException("Could not acquire " + (this.read ? "read" : "write") + " access within " + this.timeout));
                });
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void access() {
            if (this.read) {
                DefaultReadWriteAccess.this.activeReaders.incrementAndGet();
            }
            if (!fire()) {
                relinquish(true);
                return;
            }
            if (this.timeoutFuture != null) {
                this.timeoutFuture.cancel(false);
            }
            this.continuation.resume(() -> {
                this.upstream.connect(new Downstream<T>() { // from class: ratpack.exec.util.internal.DefaultReadWriteAccess.Access.1
                    @Override // ratpack.exec.Downstream
                    public void success(T t) {
                        Access.this.relinquish(false);
                        Access.this.downstream.success(t);
                    }

                    @Override // ratpack.exec.Downstream
                    public void error(Throwable th) {
                        Access.this.relinquish(false);
                        Access.this.downstream.error(th);
                    }

                    @Override // ratpack.exec.Downstream
                    public void complete() {
                        Access.this.relinquish(false);
                        Access.this.downstream.complete();
                    }
                });
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void relinquish(boolean z) {
            Access access;
            if (!this.read) {
                DefaultReadWriteAccess.this.draining.set(false);
            } else if (DefaultReadWriteAccess.this.activeReaders.decrementAndGet() == 0 && (access = (Access) DefaultReadWriteAccess.this.pendingWriteRef.getAndSet(null)) != null) {
                access.access();
                return;
            }
            DefaultReadWriteAccess.this.drain();
        }
    }

    public DefaultReadWriteAccess(Duration duration) {
        if (duration.isNegative()) {
            throw new IllegalArgumentException("defaultTimeout must not be negative");
        }
        this.defaultTimeout = duration;
    }

    @Override // ratpack.exec.util.ReadWriteAccess
    public Duration getDefaultTimeout() {
        return this.defaultTimeout;
    }

    @Override // ratpack.exec.util.ReadWriteAccess
    public <T> Promise<T> read(Promise<T> promise) {
        return (Promise<T>) promise.transform(upstream -> {
            return downstream -> {
                new Access(true, upstream, this.defaultTimeout, downstream);
            };
        });
    }

    @Override // ratpack.exec.util.ReadWriteAccess
    public <T> Promise<T> read(Promise<T> promise, Duration duration) {
        return (Promise<T>) promise.transform(upstream -> {
            return downstream -> {
                new Access(true, upstream, duration, downstream);
            };
        });
    }

    @Override // ratpack.exec.util.ReadWriteAccess
    public <T> Promise<T> write(Promise<T> promise) {
        return (Promise<T>) promise.transform(upstream -> {
            return downstream -> {
                new Access(false, upstream, this.defaultTimeout, downstream);
            };
        });
    }

    @Override // ratpack.exec.util.ReadWriteAccess
    public <T> Promise<T> write(Promise<T> promise, Duration duration) {
        return (Promise<T>) promise.transform(upstream -> {
            return downstream -> {
                new Access(false, upstream, duration, downstream);
            };
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void drain() {
        Access<?> andSet;
        if (!this.draining.compareAndSet(false, true)) {
            return;
        }
        Access<?> poll = this.queue.poll();
        while (true) {
            Access<?> access = poll;
            if (access == null) {
                this.draining.set(false);
                if (this.queue.isEmpty()) {
                    return;
                }
                drain();
                return;
            }
            if (!((Access) access).read) {
                if (this.activeReaders.get() == 0) {
                    access.access();
                    return;
                }
                this.pendingWriteRef.set(access);
                if (this.activeReaders.get() != 0 || (andSet = this.pendingWriteRef.getAndSet(null)) == null) {
                    return;
                }
                andSet.access();
                return;
            }
            access.access();
            poll = this.queue.poll();
        }
    }
}
