package ratpack.stream;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.reactivestreams.Publisher;
import ratpack.exec.ExecController;
import ratpack.exec.Promise;
import ratpack.exec.internal.ExecutionBacking;
import ratpack.func.Action;
import ratpack.func.Function;
import ratpack.func.Predicate;
import ratpack.registry.Registry;
import ratpack.stream.internal.BufferingPublisher;
import ratpack.stream.internal.CollectingSubscriber;
import ratpack.stream.internal.DefaultTransformablePublisher;
import ratpack.stream.internal.FanOutPublisher;
import ratpack.stream.internal.FlatMapPublisher;
import ratpack.stream.internal.FlatYieldingPublisher;
import ratpack.stream.internal.GatedPublisher;
import ratpack.stream.internal.IterablePublisher;
import ratpack.stream.internal.MapPublisher;
import ratpack.stream.internal.MergingPublisher;
import ratpack.stream.internal.MulticastPublisher;
import ratpack.stream.internal.PeriodicPublisher;
import ratpack.stream.internal.SingleElementSubscriber;
import ratpack.stream.internal.StreamMapPublisher;
import ratpack.stream.internal.WiretapPublisher;
import ratpack.stream.internal.YieldingPublisher;
import ratpack.util.Types;

/* loaded from: input_file:ratpack/stream/Streams.class */
public class Streams {
    public static <T> TransformablePublisher<T> transformable(Publisher<T> publisher) {
        return publisher instanceof TransformablePublisher ? (TransformablePublisher) Types.cast(publisher) : new DefaultTransformablePublisher(publisher);
    }

    public static <T> TransformablePublisher<T> publish(Iterable<T> iterable) {
        return new IterablePublisher(iterable);
    }

    public static <T> TransformablePublisher<T> yield(Function<? super YieldRequest, T> function) {
        return new YieldingPublisher(function);
    }

    public static <T> TransformablePublisher<T> flatYield(Function<? super YieldRequest, ? extends Promise<? extends T>> function) {
        return new FlatYieldingPublisher(function);
    }

    public static <T> TransformablePublisher<T> constant(T t) {
        return yield(yieldRequest -> {
            return t;
        });
    }

    public static <I, O> TransformablePublisher<O> map(Publisher<I> publisher, Function<? super I, ? extends O> function) {
        return new MapPublisher(publisher, function);
    }

    public static <T> TransformablePublisher<T> filter(Publisher<T> publisher, Predicate<T> predicate) {
        return streamMap(publisher, writeStream -> {
            return new WriteStream<T>() { // from class: ratpack.stream.Streams.1
                @Override // ratpack.stream.WriteStream
                public void item(T t) {
                    try {
                        if (Predicate.this.apply(t)) {
                            writeStream.item(t);
                        }
                    } catch (Throwable th) {
                        writeStream.error(th);
                    }
                }

                @Override // ratpack.stream.WriteStream
                public void error(Throwable th) {
                    writeStream.error(th);
                }

                @Override // ratpack.stream.WriteStream
                public void complete() {
                    writeStream.complete();
                }
            };
        });
    }

    public static <I, O> TransformablePublisher<O> streamMap(Publisher<I> publisher, Function<? super WriteStream<O>, ? extends WriteStream<I>> function) {
        return new StreamMapPublisher(publisher, function).buffer();
    }

    public static <I, O> TransformablePublisher<O> flatMap(Publisher<I> publisher, Function<? super I, ? extends Promise<? extends O>> function) {
        return new FlatMapPublisher(publisher, function);
    }

    public static <T> TransformablePublisher<T> buffer(Publisher<T> publisher) {
        return new BufferingPublisher(publisher);
    }

    public static <T> TransformablePublisher<T> gate(Publisher<T> publisher, Action<? super Runnable> action) {
        return new GatedPublisher(publisher, action);
    }

    public static <T> TransformablePublisher<T> periodically(ScheduledExecutorService scheduledExecutorService, Duration duration, Function<Integer, T> function) {
        return new PeriodicPublisher(scheduledExecutorService, function, duration).buffer();
    }

    public static <T> TransformablePublisher<T> periodically(Registry registry, Duration duration, Function<Integer, T> function) {
        return new PeriodicPublisher(((ExecController) registry.get(ExecController.class)).getExecutor(), function, duration).buffer();
    }

    public static <T> TransformablePublisher<T> wiretap(Publisher<T> publisher, Action<? super StreamEvent<? super T>> action) {
        return new WiretapPublisher(publisher, action);
    }

    public static <T> TransformablePublisher<T> multicast(Publisher<T> publisher) {
        return new MulticastPublisher(publisher);
    }

    public static <T> TransformablePublisher<T> fanOut(Publisher<? extends Iterable<T>> publisher) {
        return new FanOutPublisher(publisher).buffer();
    }

    @SafeVarargs
    public static <T> TransformablePublisher<T> merge(Publisher<? extends T>... publisherArr) {
        return new MergingPublisher(publisherArr).buffer();
    }

    public static <T> Promise<T> toPromise(Publisher<T> publisher) {
        return Promise.of(downstream -> {
            downstream.getClass();
            publisher.subscribe(SingleElementSubscriber.to(downstream::accept));
        });
    }

    public static <T> Promise<List<T>> toList(Publisher<T> publisher) {
        return Promise.of(downstream -> {
            downstream.getClass();
            publisher.subscribe(new CollectingSubscriber(downstream::accept, subscription -> {
                subscription.request(Long.MAX_VALUE);
            }));
        });
    }

    public static <T> TransformablePublisher<T> bindExec(Publisher<T> publisher) {
        return ExecutionBacking.stream(publisher);
    }
}
