package ratpack.exec.util.internal;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscription;
import ratpack.exec.ExecResult;
import ratpack.exec.ExecStarter;
import ratpack.exec.Execution;
import ratpack.exec.Operation;
import ratpack.exec.Promise;
import ratpack.exec.util.ParallelBatch;
import ratpack.func.Action;
import ratpack.func.BiAction;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.internal.BufferedWriteStream;
import ratpack.stream.internal.BufferingPublisher;
import ratpack.util.Types;

/* loaded from: input_file:ratpack/exec/util/internal/DefaultParallelBatch.class */
public class DefaultParallelBatch<T> implements ParallelBatch<T> {
    private final Iterable<? extends Promise<T>> promises;
    private final Action<? super Execution> execInit;

    public DefaultParallelBatch(Iterable<? extends Promise<? extends T>> iterable, Action<? super Execution> action) {
        this.promises = (Iterable) Types.cast(iterable);
        this.execInit = action;
    }

    @Override // ratpack.exec.util.ParallelBatch
    public ParallelBatch<T> execInit(Action<? super Execution> action) {
        return new DefaultParallelBatch(this.promises, action);
    }

    @Override // ratpack.exec.util.ParallelBatch, ratpack.exec.util.Batch
    public Promise<List<? extends ExecResult<T>>> yieldAll() {
        ArrayList newArrayList = Lists.newArrayList(this.promises);
        if (newArrayList.isEmpty()) {
            return Promise.value(Collections.emptyList());
        }
        List list = (List) Types.cast(newArrayList);
        AtomicInteger atomicInteger = new AtomicInteger(newArrayList.size());
        return Promise.async(downstream -> {
            for (int i = 0; i < newArrayList.size(); i++) {
                int i2 = i;
                Execution.fork().onStart(this.execInit).onComplete(execution -> {
                    if (atomicInteger.decrementAndGet() == 0) {
                        downstream.success(list);
                    }
                }).start(execution2 -> {
                    ((Promise) newArrayList.get(i2)).result(execResult -> {
                        list.set(i2, execResult);
                    });
                });
            }
        });
    }

    @Override // ratpack.exec.util.ParallelBatch, ratpack.exec.util.Batch
    public Promise<List<T>> yield() {
        ArrayList newArrayList = Lists.newArrayList(this.promises);
        if (newArrayList.isEmpty()) {
            return Promise.value(Collections.emptyList());
        }
        List list = (List) Types.cast(newArrayList);
        return Promise.async(downstream -> {
            Objects.requireNonNull(list);
            Operation forEach = forEach((v1, v2) -> {
                r1.set(v1, v2);
            });
            Objects.requireNonNull(downstream);
            forEach.onError(downstream::error).then(() -> {
                downstream.success(list);
            });
        });
    }

    @Override // ratpack.exec.util.ParallelBatch, ratpack.exec.util.Batch
    public Operation forEach(BiAction<? super Integer, ? super T> biAction) {
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicInteger atomicInteger = new AtomicInteger();
        return Promise.async(downstream -> {
            int i = 0;
            Iterator<? extends Promise<T>> it = this.promises.iterator();
            while (it.hasNext()) {
                Promise<T> next = it.next();
                int i2 = i;
                i++;
                atomicInteger.incrementAndGet();
                if (!it.hasNext()) {
                    atomicBoolean.set(true);
                }
                Execution.fork().onStart(this.execInit).onComplete(execution -> {
                    if (atomicInteger.decrementAndGet() == 0 && atomicBoolean.get()) {
                        Throwable th = (Throwable) atomicReference.get();
                        if (th == null) {
                            downstream.success(null);
                        } else {
                            downstream.error(th);
                        }
                    }
                }).start(execution2 -> {
                    if (atomicReference.get() == null) {
                        next.result(execResult -> {
                            Throwable th;
                            if (!execResult.isError()) {
                                biAction.execute(Integer.valueOf(i2), execResult.getValue());
                                return;
                            }
                            Throwable throwable = execResult.getThrowable();
                            if (atomicReference.compareAndSet(null, throwable) || (th = (Throwable) atomicReference.get()) == throwable) {
                                return;
                            }
                            th.addSuppressed(throwable);
                        });
                    }
                });
            }
            if (i == 0) {
                downstream.success(null);
            }
        }).operation();
    }

    @Override // ratpack.exec.util.ParallelBatch, ratpack.exec.util.Batch
    public TransformablePublisher<T> publisher() {
        Iterator<? extends Promise<T>> it = this.promises.iterator();
        return new BufferingPublisher(Action.noop(), bufferedWriteStream -> {
            return new Subscription() { // from class: ratpack.exec.util.internal.DefaultParallelBatch.1
                volatile boolean cancelled;
                volatile boolean complete;
                final AtomicLong finished = new AtomicLong();
                volatile long started;

                public void request(long j) {
                    while (true) {
                        long j2 = j;
                        j = j2 - 1;
                        if (j2 <= 0 || this.cancelled) {
                            return;
                        }
                        if (!it.hasNext()) {
                            if (this.started == 0) {
                                bufferedWriteStream.complete();
                                return;
                            }
                            return;
                        }
                        this.started++;
                        Promise promise = (Promise) it.next();
                        if (!it.hasNext()) {
                            this.complete = true;
                        }
                        ExecStarter onStart = Execution.fork().onStart(DefaultParallelBatch.this.execInit);
                        BufferedWriteStream bufferedWriteStream = bufferedWriteStream;
                        ExecStarter onComplete = onStart.onComplete(execution -> {
                            if (this.finished.incrementAndGet() == this.started && this.complete && !this.cancelled) {
                                bufferedWriteStream.complete();
                            }
                        });
                        BufferedWriteStream bufferedWriteStream2 = bufferedWriteStream;
                        onComplete.start(execution2 -> {
                            Objects.requireNonNull(bufferedWriteStream2);
                            Promise<T> onError = promise.onError(bufferedWriteStream2::error);
                            Objects.requireNonNull(bufferedWriteStream2);
                            onError.then(bufferedWriteStream2::item);
                        });
                    }
                }

                public void cancel() {
                    this.cancelled = true;
                }
            };
        });
    }
}
