package ratpack.stream.internal;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.ExecSpec;
import ratpack.exec.ExecStarter;
import ratpack.exec.Execution;
import ratpack.func.Action;

/* loaded from: input_file:ratpack/stream/internal/ForkingSubscription.class */
public class ForkingSubscription<T> implements Subscription {
    private final Action<? super ExecSpec> execConfig;
    private final Publisher<T> publisher;
    private final BufferedWriteStream<T> write;
    private volatile boolean started;
    private volatile Subscription upstream;
    private volatile boolean cancelled;

    public ForkingSubscription(Action<? super ExecSpec> action, Publisher<T> publisher, BufferedWriteStream<T> bufferedWriteStream) {
        this.execConfig = action;
        this.publisher = publisher;
        this.write = bufferedWriteStream;
    }

    public void request(long j) {
        if (this.started) {
            return;
        }
        this.started = true;
        try {
            ((ExecStarter) this.execConfig.with(Execution.fork())).start(execution -> {
                this.publisher.subscribe(new Subscriber<T>() { // from class: ratpack.stream.internal.ForkingSubscription.1
                    public void onSubscribe(Subscription subscription) {
                        ForkingSubscription.this.upstream = subscription;
                        if (ForkingSubscription.this.cancelled) {
                            ForkingSubscription.this.upstream.cancel();
                        } else {
                            subscription.request(Long.MAX_VALUE);
                        }
                    }

                    public void onNext(T t) {
                        ForkingSubscription.this.write.item(t);
                    }

                    public void onError(Throwable th) {
                        ForkingSubscription.this.write.error(th);
                    }

                    public void onComplete() {
                        ForkingSubscription.this.write.complete();
                    }
                });
            });
        } catch (Exception e) {
            this.write.error(e);
        }
    }

    public void cancel() {
        this.cancelled = true;
        if (this.upstream != null) {
            this.upstream.cancel();
        }
    }
}
