package it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka;

import com.typesafe.config.Config;
import it.agilelab.bigdata.wasp.consumers.spark.utils.AvroSerializerExpression;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.datastores.TopicCategory;
import it.agilelab.bigdata.wasp.models.DatastoreModel;
import it.agilelab.bigdata.wasp.models.MultiTopicModel;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;

/* compiled from: KafkaWriters.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/kafka/KafkaWriters$.class */
public final class KafkaWriters$ implements Logging {
    public static final KafkaWriters$ MODULE$ = null;
    private final WaspLogger logger;

    static {
        new KafkaWriters$();
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public Dataset<Row> convertDfForBinary(Option<String> option, Option<String> option2, Option<String> option3, Option<Seq<String>> option4, Dataset<Row> dataset, TopicModel topicModel) {
        Predef$.MODULE$.require(option4.isDefined() && ((SeqLike) option4.get()).size() == 1, new KafkaWriters$$anonfun$convertDfForBinary$1(option4));
        String str = (String) ((IterableLike) option4.get()).head();
        Option find = dataset.schema().find(new KafkaWriters$$anonfun$1(str));
        Predef$.MODULE$.require(find.isDefined(), new KafkaWriters$$anonfun$convertDfForBinary$2(dataset, str));
        DataType dataType = ((StructField) find.get()).dataType();
        Predef$ predef$ = Predef$.MODULE$;
        BinaryType$ binaryType$ = BinaryType$.MODULE$;
        predef$.require(dataType != null ? dataType.equals(binaryType$) : binaryType$ == null, new KafkaWriters$$anonfun$convertDfForBinary$3(str, dataType));
        List list = (List) ((SeqLike) ((List) option.map(new KafkaWriters$$anonfun$2()).toList().$plus$plus(option2.map(new KafkaWriters$$anonfun$3()).toList(), List$.MODULE$.canBuildFrom())).$plus$plus(option3.map(new KafkaWriters$$anonfun$4()).toList(), List$.MODULE$.canBuildFrom())).$colon$plus(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", " AS value"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})), List$.MODULE$.canBuildFrom());
        logger().debug(new KafkaWriters$$anonfun$convertDfForBinary$4(list));
        return dataset.selectExpr(list);
    }

    public Dataset<Row> convertDfForAvro(Option<String> option, Option<String> option2, Option<String> option3, Option<Seq<String>> option4, Dataset<Row> dataset, TopicModel topicModel) {
        Seq seq = (Seq) option4.getOrElse(new KafkaWriters$$anonfun$5(dataset));
        StructType apply = StructType$.MODULE$.apply((Seq) seq.map(new KafkaWriters$$anonfun$6(dataset.schema()), Seq$.MODULE$.canBuildFrom()));
        Some some = topicModel.useAvroSchemaManager() ? new Some(ConfigManager$.MODULE$.getAvroSchemaManagerConfig()) : None$.MODULE$;
        Column struct = functions$.MODULE$.struct((Seq) seq.map(new KafkaWriters$$anonfun$7(), Seq$.MODULE$.canBuildFrom()));
        String name = topicModel.name();
        Dataset<Row> select = dataset.select((Seq) ((TraversableOnce) ((TraversableLike) Option$.MODULE$.option2Iterable(option.map(new KafkaWriters$$anonfun$10())).$plus$plus(Option$.MODULE$.option2Iterable(option2.map(new KafkaWriters$$anonfun$11())), Iterable$.MODULE$.canBuildFrom())).$plus$plus(Option$.MODULE$.option2Iterable(option3.map(new KafkaWriters$$anonfun$12())), Iterable$.MODULE$.canBuildFrom())).toSeq().$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Column[]{new Column((AvroSerializerExpression) (topicModel.useAvroSchemaManager() ? new KafkaWriters$$anonfun$8(name, "wasp", new Schema.Parser().parse(topicModel.getJsonSchema()), (Config) some.get()) : new KafkaWriters$$anonfun$9(name, "wasp", new Some(topicModel.getJsonSchema()))).apply(struct.expr(), apply)).as("value")})), Seq$.MODULE$.canBuildFrom()));
        logger().debug(new KafkaWriters$$anonfun$convertDfForAvro$1(select));
        return select;
    }

    public Dataset<Row> convertDfForJson(Option<String> option, Option<String> option2, Option<String> option3, Option<Seq<String>> option4, Dataset<Row> dataset, TopicModel topicModel) {
        List list = (List) ((SeqLike) ((List) option.map(new KafkaWriters$$anonfun$15()).toList().$plus$plus(option2.map(new KafkaWriters$$anonfun$16()).toList(), List$.MODULE$.canBuildFrom())).$plus$plus(option3.map(new KafkaWriters$$anonfun$17()).toList(), List$.MODULE$.canBuildFrom())).$colon$plus(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"to_json(struct(", ")) AS value"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{((TraversableOnce) option4.map(new KafkaWriters$$anonfun$13()).getOrElse(new KafkaWriters$$anonfun$14())).mkString(", ")})), List$.MODULE$.canBuildFrom());
        logger().debug(new KafkaWriters$$anonfun$convertDfForJson$1(list));
        return dataset.selectExpr(list);
    }

    public Dataset<Row> convertDfForPlaintext(Option<String> option, Option<String> option2, Option<String> option3, Option<Seq<String>> option4, Dataset<Row> dataset, TopicModel topicModel) {
        Predef$.MODULE$.require(option4.isDefined() && ((SeqLike) option4.get()).size() == 1, new KafkaWriters$$anonfun$convertDfForPlaintext$1(option4));
        String str = (String) ((IterableLike) option4.get()).head();
        Option find = dataset.schema().find(new KafkaWriters$$anonfun$18(str));
        Predef$.MODULE$.require(find.isDefined(), new KafkaWriters$$anonfun$convertDfForPlaintext$2(dataset, str));
        DataType dataType = ((StructField) find.get()).dataType();
        Predef$ predef$ = Predef$.MODULE$;
        StringType$ stringType$ = StringType$.MODULE$;
        predef$.require(dataType != null ? dataType.equals(stringType$) : stringType$ == null, new KafkaWriters$$anonfun$convertDfForPlaintext$3(str, dataType));
        List list = (List) ((SeqLike) ((List) option.map(new KafkaWriters$$anonfun$19()).toList().$plus$plus(option2.map(new KafkaWriters$$anonfun$20()).toList(), List$.MODULE$.canBuildFrom())).$plus$plus(option3.map(new KafkaWriters$$anonfun$21()).toList(), List$.MODULE$.canBuildFrom())).$colon$plus(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", " AS value_string"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})), List$.MODULE$.canBuildFrom());
        logger().debug(new KafkaWriters$$anonfun$convertDfForPlaintext$4(list));
        return dataset.selectExpr(list).withColumn("value", functions$.MODULE$.udf(new KafkaWriters$$anonfun$22(), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Array"), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Types.TypeApi[]{mirror.staticClass("scala.Byte").asType().toTypeConstructor()})));
            }
        }), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$$typecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        })).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("value_string")}))).drop("value_string");
    }

    public Dataset<Row> addTopicNameCheckIfNeeded(Option<String> option, Seq<TopicModel> seq, Dataset<Row> dataset) {
        if (option.isEmpty()) {
            return dataset;
        }
        KafkaWriters$$anonfun$24 kafkaWriters$$anonfun$24 = new KafkaWriters$$anonfun$24(((TraversableOnce) seq.map(new KafkaWriters$$anonfun$23(), Seq$.MODULE$.canBuildFrom())).toSet());
        return dataset.withColumn("topic", functions$.MODULE$.udf(kafkaWriters$$anonfun$24, package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$$typecreator3$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }), package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.plugins.kafka.KafkaWriters$$typecreator4$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        })).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("topic")})));
    }

    public void askToCheckOrCreateTopics(Seq<TopicModel> seq) {
        logger().info(new KafkaWriters$$anonfun$askToCheckOrCreateTopics$1(seq));
        seq.foreach(new KafkaWriters$$anonfun$askToCheckOrCreateTopics$2());
    }

    public Tuple2<Option<String>, Seq<TopicModel>> retrieveTopicFieldNameAndTopicModels(Option<DatastoreModel<TopicCategory>> option, TopicBL topicBL, String str) {
        Tuple2<Option<String>, Seq<TopicModel>> tuple2;
        boolean z = false;
        Some some = null;
        if (option instanceof Some) {
            z = true;
            some = (Some) option;
            TopicModel topicModel = (DatastoreModel) some.x();
            if (topicModel instanceof TopicModel) {
                tuple2 = new Tuple2<>(None$.MODULE$, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new TopicModel[]{topicModel})));
                return tuple2;
            }
        }
        if (z) {
            MultiTopicModel multiTopicModel = (DatastoreModel) some.x();
            if (multiTopicModel instanceof MultiTopicModel) {
                MultiTopicModel multiTopicModel2 = multiTopicModel;
                tuple2 = new Tuple2<>(new Some(multiTopicModel2.topicNameField()), (Seq) ((TraversableLike) multiTopicModel2.topicModelNames().map(new KafkaWriters$$anonfun$25(topicBL), Seq$.MODULE$.canBuildFrom())).flatMap(new KafkaWriters$$anonfun$26(str), Seq$.MODULE$.canBuildFrom()));
                return tuple2;
            }
        }
        if (None$.MODULE$.equals(option)) {
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unable to retrieve topic datastore model with name \"", "\""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
        }
        throw new MatchError(option);
    }

    public Dataset<Row> prepareDfToWrite(Dataset<Row> dataset, Option<String> option, Seq<TopicModel> seq, TopicModel topicModel, Option<String> option2, Option<String> option3, Option<Seq<String>> option4) {
        Dataset<Row> convertDfForBinary;
        String str = topicModel.topicDataType();
        if ("avro".equals(str)) {
            convertDfForBinary = convertDfForAvro(option2, option3, option, option4, dataset, topicModel);
        } else if ("json".equals(str)) {
            convertDfForBinary = convertDfForJson(option2, option3, option, option4, dataset, topicModel);
        } else if ("plaintext".equals(str)) {
            convertDfForBinary = convertDfForPlaintext(option2, option3, option, option4, dataset, topicModel);
        } else {
            if (!"binary".equals(str)) {
                throw new UnsupportedOperationException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unknown topic data type ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
            }
            convertDfForBinary = convertDfForBinary(option2, option3, option, option4, dataset, topicModel);
        }
        return addTopicNameCheckIfNeeded(option, seq, convertDfForBinary);
    }

    private KafkaWriters$() {
        MODULE$ = this;
        Logging.class.$init$(this);
    }
}
