/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.spark.sql.streaming;

import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.streaming.MetadataLog;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.types.StructType;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.spark.sql.streaming.EsCommitProtocol;
import org.elasticsearch.spark.sql.streaming.EsSinkMetadataLog;
import org.elasticsearch.spark.sql.streaming.EsSinkStatus;
import org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$;
import org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter;
import org.elasticsearch.spark.sql.streaming.JobState;
import org.elasticsearch.spark.sql.streaming.NullMetadataLog;
import org.elasticsearch.spark.sql.streaming.SparkSqlStreamingConfigs$;
import org.elasticsearch.spark.sql.streaming.TaskCommit;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001U4A!\u0001\u0002\u0001\u001b\t9Ri]*qCJ\\7+\u001d7TiJ,\u0017-\\5oONKgn\u001b\u0006\u0003\u0007\u0011\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u00151\u0011aA:rY*\u0011q\u0001C\u0001\u0006gB\f'o\u001b\u0006\u0003\u0013)\tQ\"\u001a7bgRL7m]3be\u000eD'\"A\u0006\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0007\u0001qa\u0003\u0005\u0002\u0010)5\t\u0001C\u0003\u0002\u0012%\u0005!A.\u00198h\u0015\u0005\u0019\u0012\u0001\u00026bm\u0006L!!\u0006\t\u0003\r=\u0013'.Z2u!\t9r$D\u0001\u0019\u0015\t\u0019\u0011D\u0003\u0002\u001b7\u0005IQ\r_3dkRLwN\u001c\u0006\u0003\u000bqQ!aB\u000f\u000b\u0005yQ\u0011AB1qC\u000eDW-\u0003\u0002!1\t!1+\u001b8l\u0011!\u0011\u0003A!A!\u0002\u0013\u0019\u0013\u0001D:qCJ\\7+Z:tS>t\u0007C\u0001\u0013&\u001b\u0005Y\u0012B\u0001\u0014\u001c\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o\u0011!A\u0003A!A!\u0002\u0013I\u0013\u0001C:fiRLgnZ:\u0011\u0005)zS\"A\u0016\u000b\u00051j\u0013aA2gO*\u0011a\u0006C\u0001\u0007Q\u0006$wn\u001c9\n\u0005AZ#\u0001C*fiRLgnZ:\t\u000bI\u0002A\u0011A\u001a\u0002\rqJg.\u001b;?)\r!dg\u000e\t\u0003k\u0001i\u0011A\u0001\u0005\u0006EE\u0002\ra\t\u0005\u0006QE\u0002\r!\u000b\u0005\bs\u0001\u0011\r\u0011\"\u0003;\u0003\u0019awnZ4feV\t1\b\u0005\u0002=\u00036\tQH\u0003\u0002?\u007f\u00059An\\4hS:<'B\u0001!\u001e\u0003\u001d\u0019w.\\7p]NL!AQ\u001f\u0003\u00071{w\r\u0003\u0004E\u0001\u0001\u0006IaO\u0001\bY><w-\u001a:!\u0011\u001d1\u0005A1A\u0005\n\u001d\u000b\u0001b\u001e:ji\u0016dunZ\u000b\u0002\u0011B\u0019q#S&\n\u0005)C\"aC'fi\u0006$\u0017\r^1M_\u001e\u00042\u0001T(R\u001b\u0005i%\"\u0001(\u0002\u000bM\u001c\u0017\r\\1\n\u0005Ak%!B!se\u0006L\bCA\u001bS\u0013\t\u0019&A\u0001\u0007FgNKgn[*uCR,8\u000f\u0003\u0004V\u0001\u0001\u0006I\u0001S\u0001\noJLG/\u001a'pO\u0002BQa\u0016\u0001\u0005Ba\u000b\u0001\"\u00193e\u0005\u0006$8\r\u001b\u000b\u00043r\u000b\u0007C\u0001'[\u0013\tYVJ\u0001\u0003V]&$\b\"B/W\u0001\u0004q\u0016a\u00022bi\u000eD\u0017\n\u001a\t\u0003\u0019~K!\u0001Y'\u0003\t1{gn\u001a\u0005\u0006EZ\u0003\raY\u0001\u0005I\u0006$\u0018\r\u0005\u0002ee:\u0011Q\r\u001d\b\u0003M>t!a\u001a8\u000f\u0005!lgBA5m\u001b\u0005Q'BA6\r\u0003\u0019a$o\\8u}%\t1\"\u0003\u0002\u001f\u0015%\u0011q!H\u0005\u0003\u000bqI!!]\u000e\u0002\u000fA\f7m[1hK&\u00111\u000f\u001e\u0002\n\t\u0006$\u0018M\u0012:b[\u0016T!!]\u000e")
public class EsSparkSqlStreamingSink
implements Sink {
    public final SparkSession org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$sparkSession;
    public final Settings org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$settings;
    private final Log logger;
    private final MetadataLog<EsSinkStatus[]> writeLog;

    private Log logger() {
        return this.logger;
    }

    private MetadataLog<EsSinkStatus[]> writeLog() {
        return this.writeLog;
    }

    public void addBatch(long batchId, Dataset<Row> data) {
        if (batchId <= BoxesRunTime.unboxToLong((Object)this.writeLog().getLatest().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final long apply(Tuple2<Object, EsSinkStatus[]> x$1) {
                return x$1._1$mcJ$sp();
            }
        }).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final long apply() {
                return this.apply$mcJ$sp();
            }

            public long apply$mcJ$sp() {
                return -1L;
            }
        }))) {
            this.logger().info((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Skipping already committed batch [", "]"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)batchId)})));
        } else {
            EsCommitProtocol commitProtocol = new EsCommitProtocol(this.writeLog());
            QueryExecution queryExecution = data.queryExecution();
            StructType schema2 = data.schema();
            SQLExecution$.MODULE$.withNewExecutionId(this.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$sparkSession, queryExecution, (Function0)new Serializable(this, batchId, commitProtocol, queryExecution, schema2){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ EsSparkSqlStreamingSink $outer;
                private final long batchId$1;
                public final EsCommitProtocol commitProtocol$1;
                private final QueryExecution queryExecution$1;
                public final StructType schema$1;

                public final void apply() {
                    this.apply$mcV$sp();
                }

                public void apply$mcV$sp() {
                    String queryName = (String)SparkSqlStreamingConfigs$.MODULE$.getQueryName(this.$outer.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$settings).getOrElse((Function0)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final String apply() {
                            return UUID.randomUUID().toString();
                        }
                    });
                    JobState jobState = new JobState(queryName, this.batchId$1);
                    this.commitProtocol$1.initJob(jobState);
                    try {
                        String serializedSettings = this.$outer.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$settings.save();
                        TaskCommit[] taskCommits = (TaskCommit[])this.$outer.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$sparkSession.sparkContext().runJob(this.queryExecution$1.toRdd(), (Function2)new Serializable(this, serializedSettings){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ $anonfun$addBatch$2 $outer;
                            private final String serializedSettings$1;

                            public final TaskCommit apply(TaskContext taskContext, Iterator<InternalRow> iter) {
                                return new EsStreamQueryWriter(this.serializedSettings$1, this.$outer.schema$1, this.$outer.commitProtocol$1).run(taskContext, iter);
                            }
                            {
                                if ($outer == null) {
                                    throw null;
                                }
                                this.$outer = $outer;
                                this.serializedSettings$1 = serializedSettings$1;
                            }
                        }, ClassTag$.MODULE$.apply(TaskCommit.class));
                        this.commitProtocol$1.commitJob(jobState, (Seq<TaskCommit>)Predef$.MODULE$.wrapRefArray((Object[])taskCommits));
                        return;
                    }
                    catch (Throwable throwable) {
                        this.commitProtocol$1.abortJob(jobState);
                        throw throwable;
                    }
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.batchId$1 = batchId$1;
                    this.commitProtocol$1 = commitProtocol$1;
                    this.queryExecution$1 = queryExecution$1;
                    this.schema$1 = schema$1;
                }
            });
        }
    }

    public EsSparkSqlStreamingSink(SparkSession sparkSession, Settings settings) {
        Object object;
        this.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$sparkSession = sparkSession;
        this.org$elasticsearch$spark$sql$streaming$EsSparkSqlStreamingSink$$settings = settings;
        this.logger = LogFactory.getLog(EsSparkSqlStreamingSink.class);
        if (SparkSqlStreamingConfigs$.MODULE$.getSinkLogEnabled(settings)) {
            String logPath = SparkSqlStreamingConfigs$.MODULE$.constructCommitLogPath(settings);
            this.logger().info((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Using log path of [", "]"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{logPath})));
            object = new EsSinkMetadataLog(settings, sparkSession, logPath);
        } else {
            this.logger().warn((Object)"EsSparkSqlStreamingSink is continuing without write commit log. Be advised that data may be duplicated!");
            object = new NullMetadataLog<EsSinkStatus[]>();
        }
        this.writeLog = object;
    }
}

