package ratpack.exec.internal;

import io.netty.util.internal.PlatformDependent;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import ratpack.exec.Downstream;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import ratpack.exec.Result;
import ratpack.exec.Upstream;
import ratpack.func.Function;

/* loaded from: input_file:ratpack/exec/internal/CachingUpstream.class */
public class CachingUpstream<T> implements Upstream<T> {
    private Upstream<? extends T> upstream;
    private final Clock clock;
    private final AtomicReference<CachingUpstream<T>.Loading> loadingRef;
    private final Function<? super ExecResult<T>, Duration> ttlFunc;
    private final AtomicBoolean draining;
    private final Queue<Downstream<? super T>> waiting;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ratpack/exec/internal/CachingUpstream$Cached.class */
    public static class Cached<T> {
        final ExecResult<T> result;
        final Instant expireAt;

        Cached(ExecResult<T> execResult, Instant instant) {
            this.result = execResult;
            this.expireAt = instant;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ratpack/exec/internal/CachingUpstream$Loading.class */
    public class Loading {
        volatile Cached<T> cached;
        final AtomicBoolean pending;

        private Loading() {
            this.pending = new AtomicBoolean();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public LoadingState getState() {
            return this.cached == null ? this.pending.get() ? LoadingState.PENDING : LoadingState.INIT : (this.cached.expireAt == null || this.cached.expireAt.isAfter(CachingUpstream.this.clock.instant())) ? LoadingState.LOADED : LoadingState.EXPIRED;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:ratpack/exec/internal/CachingUpstream$LoadingState.class */
    public enum LoadingState {
        INIT,
        PENDING,
        EXPIRED,
        LOADED
    }

    public CachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> function) {
        this(upstream, function, Clock.systemUTC());
    }

    private CachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> function, Clock clock) {
        this.loadingRef = new AtomicReference<>(new Loading());
        this.draining = new AtomicBoolean();
        this.waiting = PlatformDependent.newMpscQueue();
        this.upstream = upstream;
        this.ttlFunc = function;
        this.clock = clock;
    }

    private void tryDrain() {
        if (this.draining.compareAndSet(false, true)) {
            try {
                if (!this.waiting.isEmpty()) {
                    CachingUpstream<T>.Loading loading = this.loadingRef.get();
                    LoadingState state = loading.getState();
                    if (state == LoadingState.INIT) {
                        startLoad(loading);
                    } else if (state == LoadingState.EXPIRED) {
                        this.loadingRef.compareAndSet(loading, new Loading());
                        startLoad(this.loadingRef.get());
                    } else if (state == LoadingState.LOADED) {
                        Downstream<? super T> poll = this.waiting.poll();
                        while (poll != null) {
                            poll.accept((ExecResult<? extends Object>) loading.cached.result);
                            poll = this.waiting.poll();
                        }
                    }
                }
            } finally {
                this.draining.set(false);
            }
        }
        if (this.waiting.isEmpty() || this.loadingRef.get().getState() == LoadingState.PENDING) {
            return;
        }
        tryDrain();
    }

    private void startLoad(CachingUpstream<T>.Loading loading) {
        if (loading.pending.compareAndSet(false, true)) {
            Downstream<? super T> poll = this.waiting.poll();
            try {
                yield(loading, poll);
            } catch (Throwable th) {
                receiveResult(loading, poll, ExecResult.of(Result.error(th)));
            }
        }
    }

    private void yield(final CachingUpstream<T>.Loading loading, final Downstream<? super T> downstream) throws Exception {
        this.upstream.connect(new Downstream<T>() { // from class: ratpack.exec.internal.CachingUpstream.1
            @Override // ratpack.exec.Downstream
            public void error(Throwable th) {
                CachingUpstream.this.receiveResult(loading, downstream, ExecResult.of(Result.error(th)));
            }

            @Override // ratpack.exec.Downstream
            public void success(T t) {
                CachingUpstream.this.receiveResult(loading, downstream, ExecResult.of(Result.success(t)));
            }

            @Override // ratpack.exec.Downstream
            public void complete() {
                CachingUpstream.this.receiveResult(loading, downstream, CompleteExecResult.get());
            }
        });
    }

    @Override // ratpack.exec.Upstream
    public void connect(Downstream<? super T> downstream) {
        CachingUpstream<T>.Loading loading = this.loadingRef.get();
        if (loading.getState() == LoadingState.LOADED) {
            downstream.accept((ExecResult<? extends Object>) loading.cached.result);
            return;
        }
        Promise async = Promise.async(downstream2 -> {
            this.waiting.add(downstream2);
            tryDrain();
        });
        downstream.getClass();
        async.result(downstream::accept);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public void receiveResult(CachingUpstream<T>.Loading loading, Downstream<? super T> downstream, ExecResult<T> execResult) {
        ExecResult<? extends Object> execResult2;
        Instant minus;
        Duration ofSeconds = Duration.ofSeconds(0L);
        try {
            ofSeconds = this.ttlFunc.apply(execResult);
            execResult2 = execResult;
        } catch (Throwable th) {
            if (execResult.isError()) {
                execResult.getThrowable().addSuppressed(th);
                execResult2 = execResult;
            } else {
                execResult2 = (ExecResult<T>) ExecResult.of(Result.error(th));
            }
        }
        if (ofSeconds.isNegative()) {
            minus = null;
            this.upstream = null;
        } else {
            minus = ofSeconds.isZero() ? this.clock.instant().minus((TemporalAmount) Duration.ofSeconds(1L)) : this.clock.instant().plus((TemporalAmount) ofSeconds);
        }
        loading.cached = new Cached<>(execResult2, minus);
        downstream.accept(execResult2);
        tryDrain();
    }
}
