package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import it.agilelab.bigdata.wasp.consumers.spark.readers.StructuredStreamingReader;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.kafka.CheckOrCreateTopic;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.TopicModel;
import it.agilelab.bigdata.wasp.core.models.configuration.KafkaConfigModel;
import it.agilelab.bigdata.wasp.core.utils.AvroToRow;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaReader.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaStructuredReader$.class */
public final class KafkaStructuredReader$ implements StructuredStreamingReader, Logging {
    public static final KafkaStructuredReader$ MODULE$ = null;
    private final WaspLogger logger;

    static {
        new KafkaStructuredReader$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    @Override // it.agilelab.bigdata.wasp.consumers.spark.readers.StructuredStreamingReader
    public Dataset<Row> createStructuredStream(String str, String str2, TopicModel topicModel, SparkSession sparkSession) {
        Dataset<Row> select;
        KafkaConfigModel kafkaConfig = ConfigManager$.MODULE$.getKafkaConfig();
        if (!BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(WaspSystem$.MODULE$.kafkaAdminActor(), new CheckOrCreateTopic(topicModel.name(), topicModel.partitions(), topicModel.replicas()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Topic not found on Kafka: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{topicModel}));
            logger().error(new KafkaStructu$$$$d8e6deafeeec8e8e243839cc33a87bb$$$$uredStream$4(s));
            throw new Exception(s);
        }
        logger().info(new KafkaStructu$$$$f0555bf77af3b93139b4b19a6b3e0$$$$uredStream$1(topicModel, kafkaConfig));
        Dataset load = sparkSession.readStream().format("kafka").option("subscribe", topicModel.name()).option("kafka.bootstrap.servers", ((TraversableOnce) kafkaConfig.connections().map(new KafkaStructuredReader$$anonfun$2(), Seq$.MODULE$.canBuildFrom())).mkString(",")).option("kafkaConsumer.pollTimeoutMs", kafkaConfig.ingestRateToMills()).options(((TraversableOnce) kafkaConfig.others().map(new KafkaStructuredReader$$anonfun$3(), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).load();
        UserDefinedFunction udf = functions$.MODULE$.udf(new KafkaStructuredReader$$anonfun$4(), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaStructuredReader$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaStructuredReader$$typecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Array"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{mirror.staticClass("scala.Byte").asType().toTypeConstructor()})));
            }
        }));
        String str3 = topicModel.topicDataType();
        if ("avro".equals(str3)) {
            AvroToRow avroToRow = new AvroToRow(topicModel.getJsonSchema());
            select = load.select("value", Predef$.MODULE$.wrapRefArray(new String[0])).map(new KafkaStructuredReader$$anonfun$5(avroToRow), RowEncoder$.MODULE$.apply(avroToRow.getSchemaSpark()));
        } else {
            if (!"json".equals(str3)) {
                throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"No such topic data type ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{topicModel.topicDataType()})));
            }
            select = load.withColumn("value_parsed", udf.apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("value")}))).drop("value").select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.from_json(functions$.MODULE$.col("value_parsed"), topicModel.getDataType()).alias("value")})).select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("value.*")}));
        }
        Dataset<Row> dataset = select;
        logger().debug(new KafkaStructu$$$$3fe6d80e751b621786f3fcfb9e8f$$$$uredStream$2(topicModel));
        logger().debug(new KafkaStructu$$$$19bc27274af50b83ce2439194c722ae$$$$uredStream$3(dataset));
        return dataset;
    }

    private KafkaStructuredReader$() {
        MODULE$ = this;
        Logging.class.$init$(this);
    }
}
