package io.rsocket.rpc.rsocket;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.ResponderRSocket;
import io.rsocket.rpc.RSocketRpcService;
import io.rsocket.rpc.exception.ServiceNotFound;
import io.rsocket.rpc.frames.Metadata;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/rpc/rsocket/RequestHandlingRSocket.class */
public class RequestHandlingRSocket extends AbstractRSocket implements ResponderRSocket {
    private final ConcurrentMap<String, RSocketRpcService> registeredServices = new ConcurrentHashMap();

    public RequestHandlingRSocket(RSocketRpcService... rSocketRpcServiceArr) {
        for (RSocketRpcService rSocketRpcService : rSocketRpcServiceArr) {
            this.registeredServices.put(rSocketRpcService.getService(), rSocketRpcService);
        }
    }

    @Deprecated
    public void addService(RSocketRpcService rSocketRpcService) {
        this.registeredServices.put(rSocketRpcService.getService(), rSocketRpcService);
    }

    public RequestHandlingRSocket withService(RSocketRpcService rSocketRpcService) {
        addService(rSocketRpcService);
        return this;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        try {
            String service = Metadata.getService(payload.sliceMetadata());
            RSocketRpcService rSocketRpcService = this.registeredServices.get(service);
            if (rSocketRpcService != null) {
                return rSocketRpcService.fireAndForget(payload);
            }
            ReferenceCountUtil.safeRelease(payload);
            return Mono.error(new ServiceNotFound(service));
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(payload);
            return Mono.error(th);
        }
    }

    public Mono<Payload> requestResponse(Payload payload) {
        try {
            String service = Metadata.getService(payload.sliceMetadata());
            RSocketRpcService rSocketRpcService = this.registeredServices.get(service);
            if (rSocketRpcService != null) {
                return rSocketRpcService.requestResponse(payload);
            }
            ReferenceCountUtil.safeRelease(payload);
            return Mono.error(new ServiceNotFound(service));
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(payload);
            return Mono.error(th);
        }
    }

    public Flux<Payload> requestStream(Payload payload) {
        try {
            String service = Metadata.getService(payload.sliceMetadata());
            RSocketRpcService rSocketRpcService = this.registeredServices.get(service);
            if (rSocketRpcService != null) {
                return rSocketRpcService.requestStream(payload);
            }
            ReferenceCountUtil.safeRelease(payload);
            return Flux.error(new ServiceNotFound(service));
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(payload);
            return Flux.error(th);
        }
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.from(publisher).switchOnFirst((signal, flux) -> {
            if (!signal.hasValue()) {
                return flux;
            }
            Payload payload = (Payload) signal.get();
            try {
                String service = Metadata.getService(payload.sliceMetadata());
                RSocketRpcService rSocketRpcService = this.registeredServices.get(service);
                if (rSocketRpcService != null) {
                    return rSocketRpcService.requestChannel(payload, flux);
                }
                ReferenceCountUtil.safeRelease(payload);
                return Flux.error(new ServiceNotFound(service));
            } catch (Throwable th) {
                ReferenceCountUtil.safeRelease(payload);
                return Flux.error(th);
            }
        });
    }

    public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> publisher) {
        try {
            String service = Metadata.getService(payload.sliceMetadata());
            RSocketRpcService rSocketRpcService = this.registeredServices.get(service);
            if (rSocketRpcService != null) {
                return rSocketRpcService.requestChannel(payload, publisher);
            }
            ReferenceCountUtil.safeRelease(payload);
            return Flux.error(new ServiceNotFound(service));
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(payload);
            return Flux.error(th);
        }
    }
}
