package it.agilelab.bigdata.wasp.consumers.spark.streaming;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.SupervisorStrategy;
import akka.actor.package$;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import it.agilelab.bigdata.wasp.consumers.spark.MlModels.MlModelsDB;
import it.agilelab.bigdata.wasp.consumers.spark.SparkSingletons$;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.WaspConsumersSparkPlugin;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkReader;
import it.agilelab.bigdata.wasp.consumers.spark.readers.StreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.strategies.ReaderKey;
import it.agilelab.bigdata.wasp.consumers.spark.strategies.Strategy;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkLegacyStreamingWriter;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkWriterFactory;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.models.LegacyStreamingETLModel;
import it.agilelab.bigdata.wasp.core.models.PipegraphModel;
import it.agilelab.bigdata.wasp.core.models.ReaderModel;
import it.agilelab.bigdata.wasp.core.models.StrategyModel;
import it.agilelab.bigdata.wasp.core.models.TopicModel;
import it.agilelab.bigdata.wasp.core.models.WriterType;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: LegacyStreamingETLActor.scala */
@ScalaSignature(bytes = "\u0006\u0001\t-b\u0001B\u0001\u0003\u0001E\u0011q\u0003T3hC\u000eL8\u000b\u001e:fC6LgnZ#U\u0019\u0006\u001bGo\u001c:\u000b\u0005\r!\u0011!C:ue\u0016\fW.\u001b8h\u0015\t)a!A\u0003ta\u0006\u00148N\u0003\u0002\b\u0011\u0005I1m\u001c8tk6,'o\u001d\u0006\u0003\u0013)\tAa^1ta*\u00111\u0002D\u0001\bE&<G-\u0019;b\u0015\tia\"\u0001\u0005bO&dW\r\\1c\u0015\u0005y\u0011AA5u\u0007\u0001\u0019B\u0001\u0001\n\u0019AA\u00111CF\u0007\u0002))\tQ#A\u0003tG\u0006d\u0017-\u0003\u0002\u0018)\t1\u0011I\\=SK\u001a\u0004\"!\u0007\u0010\u000e\u0003iQ!a\u0007\u000f\u0002\u000b\u0005\u001cGo\u001c:\u000b\u0003u\tA!Y6lC&\u0011qD\u0007\u0002\u0006\u0003\u000e$xN\u001d\t\u0003C\u0019j\u0011A\t\u0006\u0003G\u0011\nq\u0001\\8hO&twM\u0003\u0002&\u0011\u0005!1m\u001c:f\u0013\t9#EA\u0004M_\u001e<\u0017N\\4\t\u0011%\u0002!\u0011!Q\u0001\n)\n1!\u001a8w%\tY#C\u0002\u0003-\u0001\u0001Q#\u0001\u0004\u001fsK\u001aLg.Z7f]Rt\u0004b\u0002\u0018,\u0005\u00045\taL\u0001\bi>\u0004\u0018n\u0019\"M+\u0005\u0001\u0004CA\u00195\u001b\u0005\u0011$BA\u001a%\u0003\t\u0011G.\u0003\u00026e\t9Ak\u001c9jG\nc\u0005bB\u001c,\u0005\u00045\t\u0001O\u0001\bS:$W\r\u001f\"M+\u0005I\u0004CA\u0019;\u0013\tY$GA\u0004J]\u0012,\u0007P\u0011'\t\u000fuZ#\u0019!D\u0001}\u0005)!/Y<C\u0019V\tq\b\u0005\u00022\u0001&\u0011\u0011I\r\u0002\u0006%\u0006<(\t\u0014\u0005\b\u0007.\u0012\rQ\"\u0001E\u0003)YW-\u001f,bYV,'\tT\u000b\u0002\u000bB\u0011\u0011GR\u0005\u0003\u000fJ\u0012!bS3z-\u0006dW/\u001a\"M\u0011\u001dI5F1A\u0007\u0002)\u000b\u0011\"\u001c7N_\u0012,GN\u0011'\u0016\u0003-\u0003\"!\r'\n\u00055\u0013$!C'm\u001b>$W\r\u001c\"M\u0011!y\u0005A!A!\u0002\u0013\u0001\u0016AE:qCJ\\wK]5uKJ4\u0015m\u0019;pef\u0004\"!\u0015+\u000e\u0003IS!a\u0015\u0003\u0002\u000f]\u0014\u0018\u000e^3sg&\u0011QK\u0015\u0002\u0013'B\f'o[,sSR,'OR1di>\u0014\u0018\u0010\u0003\u0005X\u0001\t\u0005\t\u0015!\u0003Y\u0003=\u0019HO]3b[&twMU3bI\u0016\u0014\bCA-]\u001b\u0005Q&BA.\u0005\u0003\u001d\u0011X-\u00193feNL!!\u0018.\u0003\u001fM#(/Z1nS:<'+Z1eKJD\u0001b\u0018\u0001\u0003\u0002\u0003\u0006I\u0001Y\u0001\u0004gN\u001c\u0007CA1i\u001b\u0005\u0011'BA\u0002d\u0015\t)AM\u0003\u0002fM\u00061\u0011\r]1dQ\u0016T\u0011aZ\u0001\u0004_J<\u0017BA5c\u0005A\u0019FO]3b[&twmQ8oi\u0016DH\u000f\u0003\u0005l\u0001\t\u0005\t\u0015!\u0003m\u0003%\u0001\u0018\u000e]3he\u0006\u0004\b\u000e\u0005\u0002na6\taN\u0003\u0002pI\u00051Qn\u001c3fYNL!!\u001d8\u0003\u001dAK\u0007/Z4sCBDWj\u001c3fY\"A1\u000f\u0001B\u0001B\u0003%A/\u0001\nmK\u001e\f7-_*ue\u0016\fW.\u001b8h\u000bRc\u0005CA7v\u0013\t1hNA\fMK\u001e\f7-_*ue\u0016\fW.\u001b8h\u000bRcUj\u001c3fY\"A\u0001\u0010\u0001B\u0001B\u0003%\u00110\u0001\u0005mSN$XM\\3s!\tI\"0\u0003\u0002|5\tA\u0011i\u0019;peJ+g\r\u0003\u0005~\u0001\t\u0005\t\u0015!\u0003\u007f\u0003\u001d\u0001H.^4j]N\u0004ra`A\u0003\u0003\u0017\t\tBD\u0002\u0014\u0003\u0003I1!a\u0001\u0015\u0003\u0019\u0001&/\u001a3fM&!\u0011qAA\u0005\u0005\ri\u0015\r\u001d\u0006\u0004\u0003\u0007!\u0002cA@\u0002\u000e%!\u0011qBA\u0005\u0005\u0019\u0019FO]5oOB!\u00111CA\f\u001b\t\t)B\u0003\u0002~\t%!\u0011\u0011DA\u000b\u0005a9\u0016m\u001d9D_:\u001cX/\\3sgN\u0003\u0018M]6QYV<\u0017N\u001c\u0005\b\u0003;\u0001A\u0011AA\u0010\u0003\u0019a\u0014N\\5u}Q\u0011\u0012\u0011EA\u0013\u0003k\t9$!\u000f\u0002<\u0005u\u0012qHA!!\r\t\u0019\u0003A\u0007\u0002\u0005!9\u0011&a\u0007A\u0002\u0005\u001d\"cAA\u0015%\u0019)A\u0006\u0001\u0001\u0002(!Aa&!\u000bC\u0002\u001b\u0005q\u0006\u0003\u00058\u0003S\u0011\rQ\"\u00019\u0011!i\u0014\u0011\u0006b\u0001\u000e\u0003q\u0004\u0002C\"\u0002*\t\u0007i\u0011\u0001#\t\u0011%\u000bIC1A\u0007\u0002)CaaTA\u000e\u0001\u0004\u0001\u0006BB,\u0002\u001c\u0001\u0007\u0001\f\u0003\u0004`\u00037\u0001\r\u0001\u0019\u0005\u0007W\u0006m\u0001\u0019\u00017\t\rM\fY\u00021\u0001u\u0011\u0019A\u00181\u0004a\u0001s\"1Q0a\u0007A\u0002yDq!!\u0012\u0001\t\u0003\n9%A\u0004sK\u000e,\u0017N^3\u0016\u0005\u0005%\u0003\u0003BA&\u0003#r1!GA'\u0013\r\tyEG\u0001\u0006\u0003\u000e$xN]\u0005\u0005\u0003'\n)FA\u0004SK\u000e,\u0017N^3\u000b\u0007\u0005=#\u0004C\u0004\u0002Z\u0001!\t%a\u0017\u0002\u0011A\u0014Xm\u0015;beR$\"!!\u0018\u0011\u0007M\ty&C\u0002\u0002bQ\u0011A!\u00168ji\"Q\u0011Q\r\u0001\t\u0006\u0004%I!a\u001a\u0002\u001d\r\u0014X-\u0019;f'R\u0014\u0018\r^3hsV\u0011\u0011\u0011\u000e\t\u0006'\u0005-\u0014qN\u0005\u0004\u0003[\"\"AB(qi&|g\u000e\u0005\u0003\u0002r\u0005]TBAA:\u0015\r\t)\bB\u0001\u000bgR\u0014\u0018\r^3hS\u0016\u001c\u0018\u0002BA=\u0003g\u0012\u0001b\u0015;sCR,w-\u001f\u0005\u000b\u0003{\u0002\u0001\u0012!Q!\n\u0005%\u0014aD2sK\u0006$Xm\u0015;sCR,w-\u001f\u0011\t\u000f\u0005\u0005\u0005\u0001\"\u0003\u0002\u0004\u0006\u0001\u0012\r\u001c7Ti\u0006$\u0018n\u0019*fC\u0012,'o\u001d\u000b\u0005\u0003\u000b\u000b\u0019\u000b\u0005\u0004\u0002\b\u0006]\u0015Q\u0014\b\u0005\u0003\u0013\u000b\u0019J\u0004\u0003\u0002\f\u0006EUBAAG\u0015\r\ty\tE\u0001\u0007yI|w\u000e\u001e \n\u0003UI1!!&\u0015\u0003\u001d\u0001\u0018mY6bO\u0016LA!!'\u0002\u001c\n!A*[:u\u0015\r\t)\n\u0006\t\u00043\u0006}\u0015bAAQ5\nY1\u000b]1sWJ+\u0017\rZ3s\u0011!\t)+a A\u0002\u0005\u001d\u0016AE:uCRL7MU3bI\u0016\u0014Xj\u001c3fYN\u0004b!a\"\u0002\u0018\u0006%\u0006cA7\u0002,&\u0019\u0011Q\u00168\u0003\u0017I+\u0017\rZ3s\u001b>$W\r\u001c\u0005\b\u0003c\u0003A\u0011BAZ\u0003U\u0011X\r\u001e:jKZ,7\u000b^1uS\u000e\u0014V-\u00193feN$B!!\"\u00026\"A\u0011QUAX\u0001\u0004\t9\u000bC\u0004\u0002:\u0002!I!a/\u0002\u0017Q|\u0007/[2N_\u0012,Gn\u001d\u000b\u0003\u0003{\u0003b!a\"\u0002\u0018\u0006}\u0006#B\n\u0002l\u0005\u0005\u0007cA7\u0002D&\u0019\u0011Q\u00198\u0003\u0015Q{\u0007/[2N_\u0012,G\u000eC\u0004\u0002J\u0002!I!a\u0017\u0002\u001dY\fG.\u001b3bi&|g\u000eV1tW\"9\u0011Q\u001a\u0001\u0005\u0002\u0005m\u0013\u0001C7bS:$\u0016m]6\t\u000f\u0005E\u0007\u0001\"\u0003\u0002T\u0006Y!/\u001a;sS\u00164X\r\u0012$t)\u0011\t).!@\u0011\u000f}\f)!a6\u0002^B!\u0011\u0011OAm\u0013\u0011\tY.a\u001d\u0003\u0013I+\u0017\rZ3s\u0017\u0016L\b\u0003BAp\u0003otA!!9\u0002t:!\u00111]Ax\u001d\u0011\t)/!<\u000f\t\u0005\u001d\u00181\u001e\b\u0005\u0003\u0017\u000bI/C\u0001h\u0013\t)g-\u0003\u0002\u0006I&\u0019\u0011\u0011_2\u0002\u0007M\fH.\u0003\u0003\u0002\u0016\u0006U(bAAyG&!\u0011\u0011`A~\u0005%!\u0015\r^1Ge\u0006lWM\u0003\u0003\u0002\u0016\u0006U\b\u0002CAS\u0003\u001f\u0004\r!a*\t\u000f\t\u0005\u0001\u0001\"\u0003\u0003\u0004\u0005IAO]1og\u001a|'/\u001c\u000b\r\u0005\u000b\u0011\tB!\u0006\u0003\u001a\tu!\u0011\u0005\t\u0007\u0005\u000f\u0011i!a\u0003\u000e\u0005\t%!b\u0001B\u0006E\u00069Am\u001d;sK\u0006l\u0017\u0002\u0002B\b\u0005\u0013\u0011q\u0001R*ue\u0016\fW\u000e\u0003\u0005\u0003\u0014\u0005}\b\u0019AAl\u0003%\u0011X-\u00193fe.+\u0017\u0010\u0003\u0005\u0003\u0018\u0005}\b\u0019\u0001B\u0003\u0003\u0019\u0019HO]3b[\"A!1DA��\u0001\u0004\t).\u0001\u0007eCR\f7\u000b^8sK\u001235\u000f\u0003\u0005\u0003 \u0005}\b\u0019AA8\u0003!\u0019HO]1uK\u001eL\b\u0002\u0003B\u0012\u0003\u007f\u0004\rA!\n\u0002\u0015]\u0014\u0018\u000e^3s)f\u0004X\rE\u0002n\u0005OI1A!\u000bo\u0005)9&/\u001b;feRK\b/\u001a")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/streaming/LegacyStreamingETLActor.class */
public class LegacyStreamingETLActor implements Actor, Logging {
    public final Object it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$env;
    private final SparkWriterFactory sparkWriterFactory;
    public final StreamingReader it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$streamingReader;
    public final StreamingContext it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$ssc;
    private final PipegraphModel pipegraph;
    public final LegacyStreamingETLModel it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL;
    private final ActorRef listener;
    public final Map<String, WaspConsumersSparkPlugin> it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$plugins;
    private Option<Strategy> createStrategy;
    private final WaspLogger logger;
    private final ActorContext context;
    private final ActorRef self;
    private volatile boolean bitmap$0;

    /* JADX WARN: Multi-variable type inference failed */
    private Option createStrategy$lzycompute() {
        Config config;
        None$ some;
        synchronized (this) {
            if (!this.bitmap$0) {
                Some strategy = this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.strategy();
                if (None$.MODULE$.equals(strategy)) {
                    some = None$.MODULE$;
                } else {
                    if (!(strategy instanceof Some)) {
                        throw new MatchError(strategy);
                    }
                    StrategyModel strategyModel = (StrategyModel) strategy.x();
                    Strategy strategy2 = (Strategy) Class.forName(strategyModel.className()).newInstance();
                    Some configurationConfig = strategyModel.configurationConfig();
                    if (None$.MODULE$.equals(configurationConfig)) {
                        config = ConfigFactory.empty();
                    } else {
                        if (!(configurationConfig instanceof Some)) {
                            throw new MatchError(configurationConfig);
                        }
                        config = (Config) configurationConfig.x();
                    }
                    strategy2.configuration_$eq(config);
                    logger().info(new LegacyStream$$$$b4a3cf26bbe1409d5ed427b59f9854a$$$$teStrategy$1(this, strategy2));
                    some = new Some(strategy2);
                }
                this.createStrategy = some;
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        return this.createStrategy;
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public final ActorRef sender() {
        return Actor.class.sender(this);
    }

    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Actor.class.aroundReceive(this, partialFunction, obj);
    }

    public void aroundPreStart() {
        Actor.class.aroundPreStart(this);
    }

    public void aroundPostStop() {
        Actor.class.aroundPostStop(this);
    }

    public void aroundPreRestart(Throwable th, Option<Object> option) {
        Actor.class.aroundPreRestart(this, th, option);
    }

    public void aroundPostRestart(Throwable th) {
        Actor.class.aroundPostRestart(this, th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.class.supervisorStrategy(this);
    }

    public void postStop() throws Exception {
        Actor.class.postStop(this);
    }

    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        Actor.class.preRestart(this, th, option);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.class.postRestart(this, th);
    }

    public void unhandled(Object obj) {
        Actor.class.unhandled(this, obj);
    }

    public PartialFunction<Object, BoxedUnit> receive() {
        return new LegacyStreamingETLActor$$anonfun$receive$1(this);
    }

    public void preStart() {
        logger().info(new LegacyStreamingETLActor$$anonfun$preStart$1(this));
        try {
            validationTask();
            mainTask();
        } catch (Error e) {
            String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Pipegraph '", "' - LegacyStreamingETLActor '", "': Error: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.pipegraph.name(), this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.name(), e.getMessage()}));
            logger().error(new LegacyStreamingETLActor$$anonfun$preStart$3(this, s), e);
            package$.MODULE$.actorRef2Scala(this.listener).$bang(scala.package$.MODULE$.Left().apply(s), self());
        } catch (Exception e2) {
            String s2 = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Pipegraph '", "' - LegacyStreamingETLActor '", "': Exception: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.pipegraph.name(), this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.name(), e2.getMessage()}));
            logger().error(new LegacyStreamingETLActor$$anonfun$preStart$2(this, s2), e2);
            package$.MODULE$.actorRef2Scala(this.listener).$bang(scala.package$.MODULE$.Left().apply(s2), self());
        }
    }

    private Option<Strategy> createStrategy() {
        return this.bitmap$0 ? this.createStrategy : createStrategy$lzycompute();
    }

    private List<SparkReader> allStaticReaders(List<ReaderModel> list) {
        return (List) list.flatMap(new LegacyStream$$$$1c9c2e7438ed81faa713f74237662a4$$$$ticReaders$1(this), List$.MODULE$.canBuildFrom());
    }

    private List<SparkReader> retrieveStaticReaders(List<ReaderModel> list) {
        return allStaticReaders(list);
    }

    private List<Option<TopicModel>> topicModels() {
        return (List) this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.inputs().flatMap(new LegacyStreamingETLActor$$anonfun$topicModels$1(this), List$.MODULE$.canBuildFrom());
    }

    private void validationTask() {
        this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.inputs().foreach(new LegacyStream$$$$c88c1bd337be641c69e96a5a544d9f7$$$$dationTask$1(this));
        int count = this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.inputs().count(new LegacyStreamingETLActor$$anonfun$1(this));
        if (count == 0) {
            throw new Exception(new StringBuilder().append("There is NO topic to read data, inputs: ").append(this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.inputs()).toString());
        }
        if (count != 1) {
            throw new Exception(new StringBuilder().append("MUST be only ONE topic, inputs: ").append(this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.inputs()).toString());
        }
    }

    public void mainTask() {
        DStream<String> dStream;
        List list = (List) topicModels().map(new LegacyStreamingETLActor$$anonfun$2(this), List$.MODULE$.canBuildFrom());
        Predef$.MODULE$.assert(list.nonEmpty());
        Predef$.MODULE$.assert(list.size() == 1);
        Tuple2 tuple2 = (Tuple2) list.head();
        if (createStrategy().isDefined()) {
            Strategy strategy = (Strategy) createStrategy().get();
            List<ReaderModel> list2 = (List) this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.inputs().filterNot(new LegacyStreamingETLActor$$anonfun$3(this));
            Map<ReaderKey, Dataset<Row>> empty = list2.isEmpty() ? Predef$.MODULE$.Map().empty() : retrieveDFs(list2);
            int size = list2.size();
            int size2 = empty.size();
            if (size2 != size) {
                logger().error(new LegacyStreamingETLActor$$anonfun$4(this, new StringBuilder().append("DFs not retrieved successfully!\n").append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", " DFs required - ", " DFs retrieved!\\n"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(size), BoxesRunTime.boxToInteger(size2)}))).append(empty.toString()).toString()));
                throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"DFs not retrieved successful - ", " DFs required - ", " DFs retrieved!"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(size), BoxesRunTime.boxToInteger(size2)})));
            }
            if (!empty.isEmpty()) {
                logger().info(new LegacyStreamingETLActor$$anonfun$5(this));
            }
            strategy.mlModelsBroadcast_$eq(new MlModelsDB(this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$env).createModelsBroadcast(this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.mlModels(), this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$ssc.sparkContext()));
            dStream = transform((ReaderKey) tuple2._1(), (DStream) tuple2._2(), empty, strategy, this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.output().writerType());
        } else {
            dStream = (DStream) tuple2._2();
        }
        DStream<String> dStream2 = dStream;
        Some createSparkWriterStreaming = this.sparkWriterFactory.createSparkWriterStreaming(this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$env, this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$ssc, this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.output());
        if (!(createSparkWriterStreaming instanceof Some)) {
            if (!None$.MODULE$.equals(createSparkWriterStreaming)) {
                throw new MatchError(createSparkWriterStreaming);
            }
            throw new Exception(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"No Spark Streaming writer available for writer ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.output()})));
        }
        ((SparkLegacyStreamingWriter) createSparkWriterStreaming.x()).write(dStream2);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        package$.MODULE$.actorRef2Scala(this.listener).$bang(scala.package$.MODULE$.Right().apply(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Pipegraph '", "' - LegacyStreamingETLActor '", "' started"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.pipegraph.name(), this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.name()}))), self());
    }

    private Map<ReaderKey, Dataset<Row>> retrieveDFs(List<ReaderModel> list) {
        return ((TraversableOnce) retrieveStaticReaders(list).flatMap(new LegacyStreamingETLActor$$anonfun$retrieveDFs$1(this), List$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
    }

    private DStream<String> transform(ReaderKey readerKey, DStream<String> dStream, Map<ReaderKey, Dataset<Row>> map, Strategy strategy, WriterType writerType) {
        SQLContext sQLContext = SparkSingletons$.MODULE$.getSQLContext();
        Broadcast broadcast = this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$ssc.sparkContext().broadcast(strategy, ClassTag$.MODULE$.apply(Strategy.class));
        String name = this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL.name();
        logger().debug(new LegacyStreamingETLActor$$anonfun$transform$1(this, readerKey));
        return dStream.transform(new LegacyStreamingETLActor$$anonfun$transform$2(this, readerKey, map, writerType, sQLContext, broadcast, name), ClassTag$.MODULE$.apply(String.class));
    }

    public LegacyStreamingETLActor(Object obj, SparkWriterFactory sparkWriterFactory, StreamingReader streamingReader, StreamingContext streamingContext, PipegraphModel pipegraphModel, LegacyStreamingETLModel legacyStreamingETLModel, ActorRef actorRef, Map<String, WaspConsumersSparkPlugin> map) {
        this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$env = obj;
        this.sparkWriterFactory = sparkWriterFactory;
        this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$streamingReader = streamingReader;
        this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$ssc = streamingContext;
        this.pipegraph = pipegraphModel;
        this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$legacyStreamingETL = legacyStreamingETLModel;
        this.listener = actorRef;
        this.it$agilelab$bigdata$wasp$consumers$spark$streaming$LegacyStreamingETLActor$$plugins = map;
        Actor.class.$init$(this);
        Logging.class.$init$(this);
    }
}
