/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.state;

import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.execution.streaming.CheckpointFileManager;
import org.apache.spark.sql.execution.streaming.CheckpointFileManager$;
import org.apache.spark.sql.execution.streaming.state.SchemaHelper;
import org.apache.spark.sql.execution.streaming.state.SchemaHelper$SchemaReader$;
import org.apache.spark.sql.execution.streaming.state.SchemaHelper$SchemaWriter$;
import org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker$;
import org.apache.spark.sql.execution.streaming.state.StateSchemaNotCompatible;
import org.apache.spark.sql.execution.streaming.state.StateStoreProviderId;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataType$;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Tuple2;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u00055c\u0001B\f\u0019\u0001\u001dB\u0001\u0002\u000e\u0001\u0003\u0002\u0003\u0006I!\u000e\u0005\ts\u0001\u0011\t\u0011)A\u0005u!)!\t\u0001C\u0001\u0007\"9q\t\u0001b\u0001\n\u0013A\u0005BB(\u0001A\u0003%\u0011\nC\u0004Q\u0001\t\u0007I\u0011B)\t\rY\u0003\u0001\u0015!\u0003S\u0011\u001d9\u0006A1A\u0005\n!Ca\u0001\u0017\u0001!\u0002\u0013I\u0005bB-\u0001\u0005\u0004%IA\u0017\u0005\u0007g\u0002\u0001\u000b\u0011B.\t\u000bQ\u0004A\u0011A;\t\u000f\u0005\u001d\u0001\u0001\"\u0003\u0002\n!A\u0011\u0011\u0004\u0001\u0005\u0002y\tY\u0002C\u0004\u0002$\u0001!I!!\n\t\u0011\u0005\r\u0002\u0001\"\u0001\u001f\u0003WAq!a\r\u0001\t\u0013\t)dB\u0004\u0002:aA\t!a\u000f\u0007\r]A\u0002\u0012AA\u001f\u0011\u0019\u00115\u0003\"\u0001\u0002@!I\u0011\u0011I\nC\u0002\u0013\u0005\u00111\t\u0005\t\u0003\u0017\u001a\u0002\u0015!\u0003\u0002F\ty2\u000b^1uKN\u001b\u0007.Z7b\u0007>l\u0007/\u0019;jE&d\u0017\u000e^=DQ\u0016\u001c7.\u001a:\u000b\u0005eQ\u0012!B:uCR,'BA\u000e\u001d\u0003%\u0019HO]3b[&twM\u0003\u0002\u001e=\u0005IQ\r_3dkRLwN\u001c\u0006\u0003?\u0001\n1a]9m\u0015\t\t#%A\u0003ta\u0006\u00148N\u0003\u0002$I\u00051\u0011\r]1dQ\u0016T\u0011!J\u0001\u0004_J<7\u0001A\n\u0004\u0001!r\u0003CA\u0015-\u001b\u0005Q#\"A\u0016\u0002\u000bM\u001c\u0017\r\\1\n\u00055R#AB!osJ+g\r\u0005\u00020e5\t\u0001G\u0003\u00022A\u0005A\u0011N\u001c;fe:\fG.\u0003\u00024a\t9Aj\\4hS:<\u0017A\u00039s_ZLG-\u001a:JIB\u0011agN\u0007\u00021%\u0011\u0001\b\u0007\u0002\u0015'R\fG/Z*u_J,\u0007K]8wS\u0012,'/\u00133\u0002\u0015!\fGm\\8q\u0007>tg\r\u0005\u0002<\u00016\tAH\u0003\u0002>}\u0005!1m\u001c8g\u0015\ty$%\u0001\u0004iC\u0012|w\u000e]\u0005\u0003\u0003r\u0012QbQ8oM&<WO]1uS>t\u0017A\u0002\u001fj]&$h\bF\u0002E\u000b\u001a\u0003\"A\u000e\u0001\t\u000bQ\u001a\u0001\u0019A\u001b\t\u000be\u001a\u0001\u0019\u0001\u001e\u0002\u001fM$xN]3Da2{7-\u0019;j_:,\u0012!\u0013\t\u0003\u00156k\u0011a\u0013\u0006\u0003\u0019z\n!AZ:\n\u00059[%\u0001\u0002)bi\"\f\u0001c\u001d;pe\u0016\u001c\u0005\u000fT8dCRLwN\u001c\u0011\u0002\u0005\u0019lW#\u0001*\u0011\u0005M#V\"\u0001\u000e\n\u0005US\"!F\"iK\u000e\\\u0007o\\5oi\u001aKG.Z'b]\u0006<WM]\u0001\u0004M6\u0004\u0013AE:dQ\u0016l\u0017MR5mK2{7-\u0019;j_:\f1c]2iK6\fg)\u001b7f\u0019>\u001c\u0017\r^5p]\u0002\nAb]2iK6\fwK]5uKJ,\u0012a\u0017\t\u00039Bt!!\u00188\u000f\u0005ykgBA0m\u001d\t\u00017N\u0004\u0002bU:\u0011!-\u001b\b\u0003G\"t!\u0001Z4\u000e\u0003\u0015T!A\u001a\u0014\u0002\rq\u0012xn\u001c;?\u0013\u0005)\u0013BA\u0012%\u0013\t\t#%\u0003\u0002 A%\u0011QDH\u0005\u00037qI!!\u0007\u000e\n\u0005=D\u0012\u0001D*dQ\u0016l\u0017\rS3ma\u0016\u0014\u0018BA9s\u00051\u00196\r[3nC^\u0013\u0018\u000e^3s\u0015\ty\u0007$A\u0007tG\",W.Y,sSR,'\u000fI\u0001\u0006G\",7m\u001b\u000b\u0005mf\f\u0019\u0001\u0005\u0002*o&\u0011\u0001P\u000b\u0002\u0005+:LG\u000fC\u0003{\u0019\u0001\u000710A\u0005lKf\u001c6\r[3nCB\u0011Ap`\u0007\u0002{*\u0011aPH\u0001\u0006if\u0004Xm]\u0005\u0004\u0003\u0003i(AC*ueV\u001cG\u000fV=qK\"1\u0011Q\u0001\u0007A\u0002m\f1B^1mk\u0016\u001c6\r[3nC\u0006\t2o\u00195f[\u0006\u001c8i\\7qCRL'\r\\3\u0015\r\u0005-\u0011\u0011CA\u000b!\rI\u0013QB\u0005\u0004\u0003\u001fQ#a\u0002\"p_2,\u0017M\u001c\u0005\u0007\u0003'i\u0001\u0019A>\u0002\u0019M$xN]3e'\u000eDW-\\1\t\r\u0005]Q\u00021\u0001|\u0003\u0019\u00198\r[3nC\u0006q!/Z1e'\u000eDW-\\1GS2,GCAA\u000f!\u0015I\u0013qD>|\u0013\r\t\tC\u000b\u0002\u0007)V\u0004H.\u001a\u001a\u0002!\r\u0014X-\u0019;f'\u000eDW-\\1GS2,G#\u0002<\u0002(\u0005%\u0002\"\u0002>\u0010\u0001\u0004Y\bBBA\u0003\u001f\u0001\u00071\u0010F\u0004w\u0003[\ty#!\r\t\u000bi\u0004\u0002\u0019A>\t\r\u0005\u0015\u0001\u00031\u0001|\u0011\u0015I\u0006\u00031\u0001\\\u0003)\u00198\r[3nC\u001aKG.\u001a\u000b\u0004\u0013\u0006]\u0002\"B$\u0012\u0001\u0004I\u0015aH*uCR,7k\u00195f[\u0006\u001cu.\u001c9bi&\u0014\u0017\u000e\\5us\u000eCWmY6feB\u0011agE\n\u0003'!\"\"!a\u000f\u0002\u000fY+%kU%P\u001dV\u0011\u0011Q\t\t\u0004S\u0005\u001d\u0013bAA%U\t\u0019\u0011J\u001c;\u0002\u0011Y+%kU%P\u001d\u0002\u0002")
public class StateSchemaCompatibilityChecker
implements Logging {
    private final StateStoreProviderId providerId;
    private final Path storeCpLocation;
    private final CheckpointFileManager fm;
    private final Path schemaFileLocation;
    private final SchemaHelper.SchemaWriter schemaWriter;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static int VERSION() {
        return StateSchemaCompatibilityChecker$.MODULE$.VERSION();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Path storeCpLocation() {
        return this.storeCpLocation;
    }

    private CheckpointFileManager fm() {
        return this.fm;
    }

    private Path schemaFileLocation() {
        return this.schemaFileLocation;
    }

    private SchemaHelper.SchemaWriter schemaWriter() {
        return this.schemaWriter;
    }

    public void check(StructType keySchema, StructType valueSchema) {
        if (this.fm().exists(this.schemaFileLocation())) {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(65).append("Schema file for provider ").append($this.providerId).append(" exists. Comparing with provided schema.").toString());
            Tuple2<StructType, StructType> tuple2 = this.readSchemaFile();
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            StructType storedKeySchema = (StructType)tuple2._1();
            StructType storedValueSchema = (StructType)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)storedKeySchema, (Object)storedValueSchema);
            Tuple2 tuple23 = tuple22;
            StructType storedKeySchema2 = (StructType)tuple23._1();
            StructType storedValueSchema2 = (StructType)tuple23._2();
            if (!storedKeySchema2.equals((Object)keySchema) || !storedValueSchema2.equals((Object)valueSchema)) {
                if (!this.schemasCompatible(storedKeySchema2, keySchema) || !this.schemasCompatible(storedValueSchema2, valueSchema)) {
                    String errorMsg = new StringBuilder(442).append("Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field.\n").append("- Provided key schema: ").append(keySchema).append("\n").append("- Provided value schema: ").append(valueSchema).append("\n").append("- Existing key schema: ").append(storedKeySchema2).append("\n").append("- Existing value schema: ").append(storedValueSchema2).append("\n").append("If you want to force running query without schema validation, please set ").append(SQLConf$.MODULE$.STATE_SCHEMA_CHECK_ENABLED().key()).append(" to false.\n").append("Please note running query with incompatible schema could cause indeterministic").append(" behavior.").toString();
                    this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> errorMsg);
                    throw new StateSchemaNotCompatible(errorMsg);
                }
                this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Detected schema change which is compatible. Allowing to put rows.");
            }
        } else {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(54).append("Schema file for provider ").append($this.providerId).append(" doesn't exist. Creating one.").toString());
            this.createSchemaFile(keySchema, valueSchema);
        }
    }

    private boolean schemasCompatible(StructType storedSchema, StructType schema) {
        return DataType$.MODULE$.equalsIgnoreNameAndCompatibleNullability((DataType)storedSchema, (DataType)schema);
    }

    public Tuple2<StructType, StructType> readSchemaFile() {
        Tuple2<StructType, StructType> tuple2;
        try (FSDataInputStream inStream = this.fm().open(this.schemaFileLocation());){
            try {
                String versionStr = inStream.readUTF();
                SchemaHelper.SchemaReader schemaReader = SchemaHelper$SchemaReader$.MODULE$.createSchemaReader(versionStr);
                tuple2 = schemaReader.read(inStream);
            }
            catch (Throwable e) {
                this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("Fail to read schema file from ").append(this.schemaFileLocation()).toString(), e);
                throw e;
            }
        }
        return tuple2;
    }

    private void createSchemaFile(StructType keySchema, StructType valueSchema) {
        this.createSchemaFile(keySchema, valueSchema, this.schemaWriter());
    }

    public void createSchemaFile(StructType keySchema, StructType valueSchema, SchemaHelper.SchemaWriter schemaWriter) {
        CheckpointFileManager.CancellableFSDataOutputStream outStream = this.fm().createAtomic(this.schemaFileLocation(), false);
        try {
            schemaWriter.write(keySchema, valueSchema, outStream);
            outStream.close();
        }
        catch (Throwable e) {
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(29).append("Fail to write schema file to ").append(this.schemaFileLocation()).toString(), e);
            outStream.cancel();
            throw e;
        }
    }

    private Path schemaFile(Path storeCpLocation) {
        return new Path(new Path(storeCpLocation, "_metadata"), "schema");
    }

    public StateSchemaCompatibilityChecker(StateStoreProviderId providerId, Configuration hadoopConf) {
        this.providerId = providerId;
        Logging.$init$((Logging)this);
        this.storeCpLocation = providerId.storeId().storeCheckpointLocation();
        this.fm = CheckpointFileManager$.MODULE$.create(this.storeCpLocation(), hadoopConf);
        this.schemaFileLocation = this.schemaFile(this.storeCpLocation());
        this.schemaWriter = SchemaHelper$SchemaWriter$.MODULE$.createSchemaWriter(StateSchemaCompatibilityChecker$.MODULE$.VERSION());
        this.fm().mkdirs(this.schemaFileLocation().getParent());
    }
}

