package ratpack.stream.internal;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Function;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.WriteStream;

/* loaded from: input_file:ratpack/stream/internal/StreamMapPublisher.class */
public class StreamMapPublisher<T, O> implements TransformablePublisher<O> {
    private final Publisher<? extends T> upstream;
    private final Function<? super WriteStream<O>, ? extends WriteStream<? super T>> mapper;
    private WriteStream<? super T> input;

    public StreamMapPublisher(Publisher<? extends T> publisher, Function<? super WriteStream<O>, ? extends WriteStream<? super T>> function) {
        this.upstream = publisher;
        this.mapper = function;
    }

    public void subscribe(final Subscriber<? super O> subscriber) {
        this.upstream.subscribe(new Subscriber<T>() { // from class: ratpack.stream.internal.StreamMapPublisher.1
            public void onSubscribe(final Subscription subscription) {
                try {
                    StreamMapPublisher.this.input = (WriteStream) StreamMapPublisher.this.mapper.apply(new WriteStream<O>() { // from class: ratpack.stream.internal.StreamMapPublisher.1.1
                        @Override // ratpack.stream.WriteStream
                        public void item(O o) {
                            subscriber.onNext(o);
                        }

                        @Override // ratpack.stream.WriteStream
                        public void error(Throwable th) {
                            subscription.cancel();
                            subscriber.onError(th);
                        }

                        @Override // ratpack.stream.WriteStream
                        public void complete() {
                            subscription.cancel();
                            subscriber.onComplete();
                        }
                    });
                    subscriber.onSubscribe(new Subscription() { // from class: ratpack.stream.internal.StreamMapPublisher.1.2
                        public void request(long j) {
                            subscription.request(j);
                        }

                        public void cancel() {
                            subscription.cancel();
                        }
                    });
                } catch (Exception e) {
                    subscription.cancel();
                    subscriber.onError(e);
                }
            }

            public void onNext(T t) {
                StreamMapPublisher.this.input.item(t);
            }

            public void onError(Throwable th) {
                StreamMapPublisher.this.input.error(th);
            }

            public void onComplete() {
                StreamMapPublisher.this.input.complete();
            }
        });
    }
}
