package ratpack.rx2;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import ratpack.exec.ExecController;
import ratpack.exec.ExecStarter;
import ratpack.exec.Execution;
import ratpack.exec.Operation;
import ratpack.exec.Promise;
import ratpack.exec.UnmanagedThreadException;
import ratpack.func.Action;
import ratpack.registry.RegistrySpec;
import ratpack.rx2.internal.DefaultSchedulers;
import ratpack.rx2.internal.ErrorHandler;
import ratpack.rx2.internal.ExecControllerBackedScheduler;
import ratpack.rx2.internal.ExecutionBackedObserver;
import ratpack.rx2.internal.ExecutionBackedSubscriber;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.util.Exceptions;

/* loaded from: input_file:ratpack/rx2/RxRatpack.class */
public abstract class RxRatpack {
    private RxRatpack() {
    }

    public static void initialize() {
        RxJavaPlugins.setErrorHandler(new ErrorHandler());
        RxJavaPlugins.setInitComputationSchedulerHandler(callable -> {
            return DefaultSchedulers.getComputationScheduler();
        });
        RxJavaPlugins.setInitIoSchedulerHandler(callable2 -> {
            return DefaultSchedulers.getIoScheduler();
        });
        RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
            return new ExecutionBackedObserver(observer);
        });
        RxJavaPlugins.setOnFlowableSubscribe((flowable, subscriber) -> {
            return new ExecutionBackedSubscriber(subscriber);
        });
    }

    public static <T> Single<T> single(Promise<T> promise) {
        return Single.create(singleEmitter -> {
            Objects.requireNonNull(singleEmitter);
            Promise onError = promise.onError(singleEmitter::onError);
            Objects.requireNonNull(singleEmitter);
            onError.then(singleEmitter::onSuccess);
        });
    }

    public static Completable complete(Operation operation) {
        return Completable.create(completableEmitter -> {
            Objects.requireNonNull(completableEmitter);
            Operation onError = operation.onError(completableEmitter::onError);
            Objects.requireNonNull(completableEmitter);
            onError.then(completableEmitter::onComplete);
        });
    }

    public static <T, I extends Iterable<T>> Observable<T> observe(Promise<I> promise) {
        return Observable.merge(single(promise).toObservable().map(Observable::fromIterable));
    }

    public static <T> Promise<List<T>> promiseAll(Observable<T> observable) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            Single list = observable.toList();
            Objects.requireNonNull(downstream);
            Consumer consumer = (v1) -> {
                r1.success(v1);
            };
            Objects.requireNonNull(downstream);
            list.subscribe(consumer, downstream::error);
        });
    }

    public static <T> Promise<List<T>> promiseAll(ObservableOnSubscribe<T> observableOnSubscribe) throws UnmanagedThreadException {
        return promiseAll(Observable.create(observableOnSubscribe));
    }

    public static <T> Promise<T> promise(Single<T> single) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            Objects.requireNonNull(downstream);
            Consumer consumer = downstream::success;
            Objects.requireNonNull(downstream);
            single.subscribe(consumer, downstream::error);
        });
    }

    public static <T> Promise<T> promise(SingleOnSubscribe<T> singleOnSubscribe) throws UnmanagedThreadException {
        return promise(Single.create(singleOnSubscribe));
    }

    public static <T> TransformablePublisher<T> publisher(Observable<T> observable, BackpressureStrategy backpressureStrategy) {
        return Streams.transformable(observable.toFlowable(backpressureStrategy));
    }

    public static <T> TransformablePublisher<T> publisher(ObservableOnSubscribe<T> observableOnSubscribe, BackpressureStrategy backpressureStrategy) {
        return publisher(Observable.create(observableOnSubscribe), backpressureStrategy);
    }

    public static <T> Observable<T> bindExec(Observable<T> observable) {
        return (Observable) Exceptions.uncheck(() -> {
            return (Observable) promiseAll(observable).to(RxRatpack::observe);
        });
    }

    public static <T> Observable<T> fork(Observable<T> observable) {
        return observe(promiseAll(observable).fork());
    }

    public static <T> Observable<T> fork(Observable<T> observable, Action<? super RegistrySpec> action) throws Exception {
        return observe(promiseAll(observable).fork(execSpec -> {
            execSpec.register(action);
        }));
    }

    public static <T> Observable<T> forkEach(Observable<T> observable) {
        return forkEach(observable, Action.noop());
    }

    public static <T> Observable<T> forkEach(Observable<T> observable, Action<? super RegistrySpec> action) {
        return observable.lift(observer -> {
            return new Observer<T>() { // from class: ratpack.rx2.RxRatpack.1
                private final AtomicInteger wip = new AtomicInteger(1);
                private final AtomicBoolean closed = new AtomicBoolean();
                private Disposable disposable;

                public void onSubscribe(Disposable disposable) {
                    this.disposable = disposable;
                    observer.onSubscribe(disposable);
                }

                public void onComplete() {
                    maybeDone();
                }

                public void onError(Throwable th) {
                    Observer observer = observer;
                    terminate(() -> {
                        observer.onError(th);
                    });
                }

                private void maybeDone() {
                    if (this.wip.decrementAndGet() == 0) {
                        Observer observer = observer;
                        Objects.requireNonNull(observer);
                        terminate(observer::onComplete);
                    }
                }

                private void terminate(Runnable runnable) {
                    if (this.closed.compareAndSet(false, true)) {
                        runnable.run();
                    }
                }

                public void onNext(T t) {
                    if (this.disposable.isDisposed() || this.closed.get()) {
                        return;
                    }
                    this.wip.incrementAndGet();
                    ExecStarter onError = Execution.fork().register(action).onComplete(execution -> {
                        maybeDone();
                    }).onError(this::onError);
                    Observer observer = observer;
                    onError.start(execution2 -> {
                        if (this.closed.get()) {
                            return;
                        }
                        observer.onNext(t);
                    });
                }
            };
        });
    }

    public static Scheduler scheduler(ExecController execController) {
        return new ExecControllerBackedScheduler(execController);
    }

    public static Scheduler scheduler() {
        return scheduler(ExecController.require());
    }
}
