package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.readers.StreamingReader;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.kafka.CheckOrCreateTopic;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.TopicModel;
import it.agilelab.bigdata.wasp.core.models.configuration.KafkaConfigModel;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.core.utils.JsonConverter$;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka.KafkaUtils$;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: KafkaReader.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaReader$.class */
public final class KafkaReader$ implements StreamingReader, Logging {
    public static final KafkaReader$ MODULE$ = null;
    private final WaspLogger logger;

    static {
        new KafkaReader$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    @Override // it.agilelab.bigdata.wasp.consumers.spark.readers.StreamingReader
    public DStream<String> createStream(String str, String str2, TopicModel topicModel, StreamingContext streamingContext) {
        InputDStream createStream;
        KafkaConfigModel kafkaConfig = ConfigManager$.MODULE$.getKafkaConfig();
        Map map = ((TraversableOnce) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("zookeeper.connect"), kafkaConfig.zookeeperConnections().toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("zookeeper.connection.timeout.ms"), kafkaConfig.zookeeperConnections().connections().headOption().flatMap(new KafkaReader$$anonfun$6()).getOrElse(new KafkaReader$$anonfun$1()).toString())})).$plus$plus((GenTraversableOnce) kafkaConfig.others().map(new KafkaReader$$anonfun$7(), Seq$.MODULE$.canBuildFrom()), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        if (!BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(WaspSystem$.MODULE$.kafkaAdminActor(), new CheckOrCreateTopic(topicModel.name(), topicModel.partitions(), topicModel.replicas()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Topic not found on Kafka: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{topicModel}));
            logger().error(new KafkaReader$$anonfun$createStream$7(s));
            throw new Exception(s);
        }
        if ("direct".equals(str2)) {
            createStream = KafkaUtils$.MODULE$.createDirectStream(streamingContext, map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group.id"), str)).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("metadata.broker.list"), kafkaConfig.connections().mkString(","))), Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{topicModel.name()})), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE)), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(DefaultDecoder.class));
        } else {
            if (!("receiver-based".equals(str2) ? true : true)) {
                throw new MatchError(str2);
            }
            createStream = KafkaUtils$.MODULE$.createStream(streamingContext, map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group.id"), str)), Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicModel.name()), BoxesRunTime.boxToInteger(3))})), StorageLevel$.MODULE$.MEMORY_AND_DISK_2(), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE)), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(DefaultDecoder.class));
        }
        InputDStream inputDStream = createStream;
        String jsonConverter$ = JsonConverter$.MODULE$.toString(topicModel.schema().asDocument());
        String str3 = topicModel.topicDataType();
        return "avro".equals(str3) ? inputDStream.map(new KafkaReader$$anonfun$createStream$1(jsonConverter$), ClassTag$.MODULE$.apply(Tuple2.class)).map(new KafkaReader$$anonfun$createStream$2(), ClassTag$.MODULE$.apply(String.class)) : "json".equals(str3) ? inputDStream.map(new KafkaReader$$anonfun$createStream$3(), ClassTag$.MODULE$.apply(Tuple2.class)).map(new KafkaReader$$anonfun$createStream$4(), ClassTag$.MODULE$.apply(String.class)) : inputDStream.map(new KafkaReader$$anonfun$createStream$5(jsonConverter$), ClassTag$.MODULE$.apply(Tuple2.class)).map(new KafkaReader$$anonfun$createStream$6(), ClassTag$.MODULE$.apply(String.class));
    }

    private KafkaReader$() {
        MODULE$ = this;
        Logging.class.$init$(this);
    }
}
