/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.config.Instantiator;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.KafkaConnectUtil;
import io.debezium.embedded.Transformations;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.VariableLatch;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class EmbeddedEngine
implements DebeziumEngine<SourceRecord>,
EmbeddedEngineConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedEngine.class);
    private final Configuration config;
    private final Clock clock;
    private final ClassLoader classLoader;
    private final DebeziumEngine.ChangeConsumer<SourceRecord> handler;
    private final DebeziumEngine.CompletionCallback completionCallback;
    private final DebeziumEngine.ConnectorCallback connectorCallback;
    private final AtomicReference<Thread> runningThread = new AtomicReference();
    private final VariableLatch latch = new VariableLatch(0);
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final WorkerConfig workerConfig;
    private final CompletionResult completionResult;
    private long recordsSinceLastCommit = 0L;
    private long timeOfLastCommitMillis = 0L;
    private OffsetCommitPolicy offsetCommitPolicy;
    private SourceTask task;
    private final Transformations transformations;

    private static DebeziumEngine.ChangeConsumer<SourceRecord> buildDefaultChangeConsumer(final Consumer<SourceRecord> consumer) {
        return new DebeziumEngine.ChangeConsumer<SourceRecord>(){

            public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {
                for (SourceRecord record : records) {
                    try {
                        consumer.accept(record);
                        committer.markProcessed((Object)record);
                    }
                    catch (StopEngineException ex) {
                        committer.markProcessed((Object)record);
                        throw ex;
                    }
                }
                committer.markBatchFinished();
            }
        };
    }

    private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer<SourceRecord> handler, DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy) {
        this.config = config;
        this.handler = handler;
        this.classLoader = classLoader;
        this.clock = clock;
        this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {
            if (!success) {
                LOGGER.error(msg, error);
            }
        };
        this.connectorCallback = connectorCallback;
        this.completionResult = new CompletionResult();
        this.offsetCommitPolicy = offsetCommitPolicy;
        assert (this.config != null);
        assert (this.handler != null);
        assert (this.classLoader != null);
        assert (this.clock != null);
        Map<String, String> internalConverterConfig = Collections.singletonMap("schemas.enable", "false");
        this.keyConverter = (Converter)Instantiator.getInstance((String)JsonConverter.class.getName());
        this.keyConverter.configure(internalConverterConfig, true);
        this.valueConverter = (Converter)Instantiator.getInstance((String)JsonConverter.class.getName());
        this.valueConverter.configure(internalConverterConfig, false);
        this.transformations = new Transformations(config);
        Map embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS);
        embeddedConfig.put("key.converter", JsonConverter.class.getName());
        embeddedConfig.put("value.converter", JsonConverter.class.getName());
        this.workerConfig = new EmbeddedConfig(embeddedConfig);
    }

    public boolean isRunning() {
        return this.runningThread.get() != null;
    }

    private void fail(String msg) {
        this.fail(msg, null);
    }

    private void fail(String msg, Throwable error) {
        if (this.completionResult.hasError()) {
            LOGGER.error(msg, error);
            return;
        }
        this.completionResult.handle(false, msg, error);
    }

    private void failAndThrow(String msg, Throwable error) throws EmbeddedEngineRuntimeException {
        this.fail(msg, error);
        throw new EmbeddedEngineRuntimeException();
    }

    private void succeed(String msg) {
        this.completionResult.handle(true, msg, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        if (this.runningThread.compareAndSet(null, Thread.currentThread())) {
            String engineName = this.config.getString(EmbeddedEngineConfig.ENGINE_NAME);
            String connectorClassName = this.config.getString(EmbeddedEngineConfig.CONNECTOR_CLASS);
            Optional<DebeziumEngine.ConnectorCallback> connectorCallback = Optional.ofNullable(this.connectorCallback);
            this.latch.countUp();
            try {
                if (!this.config.validateAndRecord((Iterable)EmbeddedEngineConfig.CONNECTOR_FIELDS, arg_0 -> ((Logger)LOGGER).error(arg_0))) {
                    this.failAndThrow("Failed to start connector with invalid configuration (see logs for actual errors)", null);
                }
                SourceConnector connector = this.instantiateConnector(connectorClassName);
                Map<String, String> connectorConfig = this.getConnectorConfig(connector, connectorClassName);
                OffsetBackingStore offsetStore = this.initializeOffsetStore(connectorConfig);
                this.setOffsetCommitPolicy();
                Duration commitTimeout = Duration.ofMillis(this.config.getLong(EmbeddedEngineConfig.OFFSET_COMMIT_TIMEOUT_MS));
                OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName, this.keyConverter, this.valueConverter);
                OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName, this.keyConverter, this.valueConverter);
                this.initializeConnector(connector, (OffsetStorageReader)offsetReader);
                try {
                    connector.start(connectorConfig);
                    connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);
                    List taskConfigs = connector.taskConfigs(1);
                    Class taskClass = connector.taskClass();
                    this.task = this.createSourceTask(connector, taskConfigs, taskClass);
                    try {
                        this.startSourceTask(taskConfigs, (OffsetStorageReader)offsetReader);
                        connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted);
                    }
                    catch (Throwable t) {
                        this.stopSourceTask();
                        Configuration config = Configuration.from((Map)((Map)taskConfigs.get(0))).withMaskedPasswords();
                        String msg = "Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: " + config;
                        this.failAndThrow(msg, t);
                    }
                    this.recordsSinceLastCommit = 0L;
                    HandlerErrors errros = new HandlerErrors(null, null);
                    try {
                        this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
                        DebeziumEngine.RecordCommitter committer = this.buildRecordCommitter(offsetWriter, this.task, commitTimeout);
                        this.pollRecords(taskConfigs, committer, errros);
                    }
                    finally {
                        this.setCompletionResult(connectorClassName, errros);
                        this.stopTaskAndCommitOffset(offsetWriter, commitTimeout, connectorCallback);
                    }
                    this.stopOffsetStoreAndConnector(connector, connectorClassName, offsetStore, connectorCallback);
                }
                catch (Throwable t) {
                    try {
                        if (!(t instanceof EmbeddedEngineRuntimeException)) {
                            this.fail("Error while trying to run connector class '" + connectorClassName + "'", t);
                        }
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                    finally {
                        this.stopOffsetStoreAndConnector(connector, connectorClassName, offsetStore, connectorCallback);
                    }
                }
            }
            catch (EmbeddedEngineRuntimeException e) {
                LOGGER.debug("Failed to run EmbeddedEngine.", (Throwable)e);
            }
            finally {
                this.latch.countDown();
                this.runningThread.set(null);
                this.completionCallback.handle(this.completionResult.success(), this.completionResult.message(), this.completionResult.error());
            }
        }
    }

    private SourceConnector instantiateConnector(String connectorClassName) throws EmbeddedEngineRuntimeException {
        try {
            Class<?> connectorClass = this.classLoader.loadClass(connectorClassName);
            return (SourceConnector)connectorClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (Throwable t) {
            this.failAndThrow("Unable to instantiate connector class '" + connectorClassName + "'", t);
            return null;
        }
    }

    private Map<String, String> getConnectorConfig(SourceConnector connector, String connectorClassName) throws EmbeddedEngineRuntimeException {
        Map connectorConfig = this.workerConfig.originalsStrings();
        Config validatedConnectorConfig = connector.validate(connectorConfig);
        ConfigInfos configInfos = AbstractHerder.generateResult((String)connectorClassName, Collections.emptyMap(), (List)validatedConnectorConfig.configValues(), (List)connector.config().groups());
        if (configInfos.errorCount() > 0) {
            String errors = configInfos.values().stream().flatMap(v -> v.configValue().errors().stream()).collect(Collectors.joining(" "));
            this.failAndThrow("Connector configuration is not valid. " + errors, null);
        }
        return connectorConfig;
    }

    private OffsetBackingStore initializeOffsetStore(Map<String, String> connectorConfig) throws EmbeddedEngineRuntimeException {
        String offsetStoreClassName = this.config.getString(EmbeddedEngineConfig.OFFSET_STORAGE);
        MemoryOffsetBackingStore offsetStore = null;
        try {
            if (offsetStoreClassName.equals(MemoryOffsetBackingStore.class.getName())) {
                offsetStore = KafkaConnectUtil.memoryOffsetBackingStore();
            } else if (offsetStoreClassName.equals(FileOffsetBackingStore.class.getName())) {
                offsetStore = KafkaConnectUtil.fileOffsetBackingStore();
            } else if (offsetStoreClassName.equals(KafkaOffsetBackingStore.class.getName())) {
                offsetStore = KafkaConnectUtil.kafkaOffsetBackingStore(connectorConfig);
            } else {
                Class<?> offsetStoreClass = this.classLoader.loadClass(offsetStoreClassName);
                offsetStore = (OffsetBackingStore)offsetStoreClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            }
        }
        catch (Throwable t) {
            this.failAndThrow("Unable to instantiate OffsetBackingStore class '" + offsetStoreClassName + "'", t);
        }
        try {
            offsetStore.configure(this.workerConfig);
            offsetStore.start();
        }
        catch (Throwable t) {
            this.fail("Unable to configure and start the '" + offsetStoreClassName + "' offset backing store", t);
            offsetStore.stop();
            throw new EmbeddedEngineRuntimeException();
        }
        return offsetStore;
    }

    private void setOffsetCommitPolicy() throws EmbeddedEngineRuntimeException {
        if (this.offsetCommitPolicy == null) {
            try {
                this.offsetCommitPolicy = (OffsetCommitPolicy)Instantiator.getInstanceWithProperties((String)this.config.getString(EmbeddedEngineConfig.OFFSET_COMMIT_POLICY), (Properties)this.config.asProperties());
            }
            catch (Throwable t) {
                this.failAndThrow("Unable to instantiate OffsetCommitPolicy class '" + this.config.getString(EmbeddedEngineConfig.OFFSET_STORAGE) + "'", t);
            }
        }
    }

    private void initializeConnector(SourceConnector connector, final OffsetStorageReader offsetReader) {
        SourceConnectorContext context = new SourceConnectorContext(){

            public void requestTaskReconfiguration() {
            }

            public void raiseError(Exception e) {
                EmbeddedEngine.this.fail(e.getMessage(), e);
            }

            public OffsetStorageReader offsetStorageReader() {
                return offsetReader;
            }
        };
        connector.initialize((ConnectorContext)context);
    }

    private SourceTask createSourceTask(SourceConnector connector, List<Map<String, String>> taskConfigs, Class<? extends Task> taskClass) throws EmbeddedEngineRuntimeException, NoSuchMethodException, InvocationTargetException {
        if (taskConfigs.isEmpty()) {
            String msg = "Unable to start connector's task class '" + taskClass.getName() + "' with no task configuration";
            this.failAndThrow(msg, null);
        }
        SourceTask task = null;
        try {
            task = (SourceTask)taskClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (IllegalAccessException | InstantiationException t) {
            this.failAndThrow("Unable to instantiate connector's task class '" + taskClass.getName() + "'", t);
        }
        return task;
    }

    private void startSourceTask(List<Map<String, String>> taskConfigs, final OffsetStorageReader offsetReader) {
        SourceTaskContext taskContext = new SourceTaskContext(){

            public OffsetStorageReader offsetStorageReader() {
                return offsetReader;
            }

            public Map<String, String> configs() {
                return null;
            }
        };
        this.task.initialize(taskContext);
        this.task.start(taskConfigs.get(0));
    }

    private void stopSourceTask() {
        try {
            LOGGER.debug("Stopping the task");
            this.task.stop();
        }
        catch (Throwable tstop) {
            LOGGER.info("Error while trying to stop the task");
        }
    }

    private Throwable handleRetries(RetriableException e, List<Map<String, String>> taskConfigs) {
        Object retryError = null;
        int maxRetries = this.getErrorsMaxRetries();
        LOGGER.info("Retriable exception thrown, connector will be restarted; errors.max.retries={}", (Object)maxRetries, (Object)e);
        if (maxRetries == 0) {
            retryError = e;
        } else if (maxRetries < -1) {
            LOGGER.warn("Setting {}={} is deprecated. To disable retries on connection errors, set {}=0", new Object[]{EmbeddedEngineConfig.ERRORS_MAX_RETRIES.name(), maxRetries, EmbeddedEngineConfig.ERRORS_MAX_RETRIES.name()});
            retryError = e;
        } else {
            DelayStrategy delayStrategy = this.delayStrategy(this.config);
            int totalRetries = 0;
            boolean startedSuccessfully = false;
            while (!startedSuccessfully) {
                try {
                    LOGGER.info("Starting connector, attempt {}", (Object)(++totalRetries));
                    this.task.stop();
                    this.task.start(taskConfigs.get(0));
                    startedSuccessfully = true;
                }
                catch (Exception ex) {
                    if (totalRetries == maxRetries) {
                        LOGGER.error("Can't start the connector, max retries to connect exceeded; stopping connector...", (Throwable)ex);
                        retryError = ex;
                    }
                    LOGGER.error("Can't start the connector, will retry later...", (Throwable)ex);
                }
                delayStrategy.sleepWhen(!startedSuccessfully);
            }
        }
        return retryError;
    }

    private void pollRecords(List<Map<String, String>> taskConfigs, DebeziumEngine.RecordCommitter committer, HandlerErrors errors) throws Throwable {
        while (this.runningThread.get() != null) {
            List changeRecords;
            block10: {
                changeRecords = null;
                try {
                    LOGGER.debug("Embedded engine is polling task for records on thread {}", (Object)this.runningThread.get());
                    changeRecords = this.task.poll();
                    LOGGER.debug("Embedded engine returned from polling task for records");
                }
                catch (InterruptedException e) {
                    LOGGER.debug("Embedded engine interrupted on thread {} while polling the task for records", (Object)this.runningThread.get());
                    if (this.runningThread.get() != Thread.currentThread()) break;
                    Thread.currentThread().interrupt();
                    break;
                }
                catch (RetriableException e) {
                    errors.retryError = this.handleRetries(e, taskConfigs);
                    if (errors.retryError == null) break block10;
                    throw errors.retryError;
                }
            }
            try {
                if (changeRecords != null && !changeRecords.isEmpty()) {
                    LOGGER.debug("Received {} records from the task", (Object)changeRecords.size());
                    changeRecords = changeRecords.stream().map(this.transformations::transform).filter(x -> x != null).collect(Collectors.toList());
                }
                if (changeRecords != null && !changeRecords.isEmpty()) {
                    LOGGER.debug("Received {} transformed records from the task", (Object)changeRecords.size());
                    try {
                        this.handler.handleBatch(changeRecords, committer);
                        continue;
                    }
                    catch (StopEngineException e) {
                        break;
                    }
                }
                LOGGER.debug("Received no records from the task");
            }
            catch (Throwable t) {
                errors.handlerError = t;
                break;
            }
        }
    }

    private void setCompletionResult(String connectorClassName, HandlerErrors errors) {
        if (errors.handlerError != null) {
            this.fail("Stopping connector after error in the application's handler method: " + errors.handlerError.getMessage(), errors.handlerError);
        } else if (errors.retryError != null) {
            this.fail("Stopping connector after retry error: " + errors.retryError.getMessage(), errors.retryError);
        } else {
            this.succeed("Connector '" + connectorClassName + "' completed normally.");
        }
    }

    private void stopTaskAndCommitOffset(OffsetStorageWriter offsetWriter, Duration commitTimeout, Optional<DebeziumEngine.ConnectorCallback> connectorCallback) {
        try {
            LOGGER.info("Stopping the task and engine");
            this.task.stop();
            connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStopped);
            this.commitOffsets(offsetWriter, commitTimeout, this.task);
        }
        catch (InterruptedException e) {
            LOGGER.debug("Interrupted while committing offsets");
            Thread.currentThread().interrupt();
        }
        catch (Throwable t) {
            this.fail("Error while trying to stop the task and commit the offsets", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopOffsetStoreAndConnector(SourceConnector connector, String connectorClassName, OffsetBackingStore offsetStore, Optional<DebeziumEngine.ConnectorCallback> connectorCallback) {
        try {
            offsetStore.stop();
        }
        catch (Throwable t) {
            this.fail("Error while trying to stop the offset store", t);
        }
        finally {
            try {
                connector.stop();
                connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStopped);
            }
            catch (Throwable t) {
                this.fail("Error while trying to stop connector class '" + connectorClassName + "'", t);
            }
        }
    }

    private int getErrorsMaxRetries() {
        int maxRetries = this.config.getInteger(EmbeddedEngineConfig.ERRORS_MAX_RETRIES);
        return maxRetries;
    }

    protected DebeziumEngine.RecordCommitter buildRecordCommitter(final OffsetStorageWriter offsetWriter, final SourceTask task, final Duration commitTimeout) {
        return new DebeziumEngine.RecordCommitter<SourceRecord>(){

            public synchronized void markProcessed(SourceRecord record) throws InterruptedException {
                task.commitRecord(record);
                ++EmbeddedEngine.this.recordsSinceLastCommit;
                offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
            }

            public synchronized void markBatchFinished() throws InterruptedException {
                EmbeddedEngine.this.maybeFlush(offsetWriter, EmbeddedEngine.this.offsetCommitPolicy, commitTimeout, task);
            }

            public synchronized void markProcessed(SourceRecord record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
                SourceRecordOffsets offsets = (SourceRecordOffsets)sourceOffsets;
                SourceRecord recordWithUpdatedOffsets = new SourceRecord(record.sourcePartition(), offsets.getOffsets(), record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), (Iterable)record.headers());
                this.markProcessed(recordWithUpdatedOffsets);
            }

            public DebeziumEngine.Offsets buildOffsets() {
                return new SourceRecordOffsets();
            }
        };
    }

    protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, Duration commitTimeout, SourceTask task) throws InterruptedException {
        long timeSinceLastCommitMillis = this.clock.currentTimeInMillis() - this.timeOfLastCommitMillis;
        if (policy.performCommit(this.recordsSinceLastCommit, Duration.ofMillis(timeSinceLastCommitMillis))) {
            this.commitOffsets(offsetWriter, commitTimeout, task);
        }
    }

    protected void commitOffsets(OffsetStorageWriter offsetWriter, Duration commitTimeout, SourceTask task) throws InterruptedException {
        long started = this.clock.currentTimeInMillis();
        long timeout = started + commitTimeout.toMillis();
        if (!offsetWriter.beginFlush()) {
            return;
        }
        Future flush = offsetWriter.doFlush(this::completedFlush);
        if (flush == null) {
            return;
        }
        try {
            flush.get(Math.max(timeout - this.clock.currentTimeInMillis(), 0L), TimeUnit.MILLISECONDS);
            task.commit();
            this.recordsSinceLastCommit = 0L;
            this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
        }
        catch (InterruptedException e) {
            LOGGER.warn("Flush of {} offsets interrupted, cancelling", (Object)this);
            offsetWriter.cancelFlush();
            if (this.runningThread.get() == Thread.currentThread()) {
                Thread.currentThread().interrupt();
                throw e;
            }
        }
        catch (ExecutionException e) {
            LOGGER.error("Flush of {} offsets threw an unexpected exception: ", (Object)this, (Object)e);
            offsetWriter.cancelFlush();
        }
        catch (TimeoutException e) {
            LOGGER.error("Timed out waiting to flush {} offsets to storage", (Object)this);
            offsetWriter.cancelFlush();
        }
    }

    protected void completedFlush(Throwable error, Void result) {
        if (error != null) {
            LOGGER.error("Failed to flush {} offsets to storage: ", (Object)this, (Object)error);
        } else {
            LOGGER.trace("Finished flushing {} offsets to storage", (Object)this);
        }
    }

    public boolean stop() {
        LOGGER.info("Stopping the embedded engine");
        Thread thread = this.runningThread.getAndSet(null);
        if (thread != null) {
            try {
                Duration timeout = Duration.ofMillis(this.config.getLong(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS));
                LOGGER.info("Waiting for {} for connector to stop", (Object)timeout);
                this.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            LOGGER.debug("Interrupting the embedded engine's thread {} (already interrupted: {})", (Object)thread, (Object)thread.isInterrupted());
            thread.interrupt();
            return true;
        }
        return false;
    }

    public void close() throws IOException {
        this.stop();
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return this.latch.await(timeout, unit);
    }

    public String toString() {
        return "EmbeddedEngine{id=" + this.config.getString(EmbeddedEngineConfig.ENGINE_NAME) + "}";
    }

    public void runWithTask(Consumer<SourceTask> consumer) {
        consumer.accept(this.task);
    }

    private DelayStrategy delayStrategy(Configuration config) {
        return DelayStrategy.exponential((Duration)Duration.ofMillis(config.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_INITIAL_MS)), (Duration)Duration.ofMillis(config.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_MAX_MS)));
    }

    public static class CompletionResult
    implements DebeziumEngine.CompletionCallback {
        private final DebeziumEngine.CompletionCallback delegate;
        private final CountDownLatch completed = new CountDownLatch(1);
        private boolean success;
        private String message;
        private Throwable error;

        public CompletionResult() {
            this(null);
        }

        public CompletionResult(DebeziumEngine.CompletionCallback delegate) {
            this.delegate = delegate;
        }

        public void handle(boolean success, String message, Throwable error) {
            this.success = success;
            this.message = message;
            this.error = error;
            this.completed.countDown();
            if (this.delegate != null) {
                this.delegate.handle(success, message, error);
            }
        }

        public void await() throws InterruptedException {
            this.completed.await();
        }

        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
            return this.completed.await(timeout, unit);
        }

        public boolean hasCompleted() {
            return this.completed.getCount() == 0L;
        }

        public boolean success() {
            return this.success;
        }

        public String message() {
            return this.message;
        }

        public Throwable error() {
            return this.error;
        }

        public boolean hasError() {
            return this.error != null;
        }
    }

    protected static class EmbeddedConfig
    extends WorkerConfig {
        private static final ConfigDef CONFIG;

        protected EmbeddedConfig(Map<String, String> props) {
            super(CONFIG, props);
        }

        static {
            ConfigDef config = EmbeddedConfig.baseConfigDef();
            Field.group((ConfigDef)config, (String)"file", (Field[])new Field[]{EmbeddedEngineConfig.OFFSET_STORAGE_FILE_FILENAME});
            Field.group((ConfigDef)config, (String)"kafka", (Field[])new Field[]{EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_TOPIC});
            Field.group((ConfigDef)config, (String)"kafka", (Field[])new Field[]{EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_PARTITIONS});
            Field.group((ConfigDef)config, (String)"kafka", (Field[])new Field[]{EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR});
            CONFIG = config;
        }
    }

    private static class EmbeddedEngineRuntimeException
    extends RuntimeException {
        EmbeddedEngineRuntimeException() {
        }
    }

    private class HandlerErrors {
        private Throwable handlerError;
        private Throwable retryError;

        HandlerErrors(Throwable handlerError, Throwable retryError) {
            this.handlerError = handlerError;
            this.retryError = retryError;
        }
    }

    protected class SourceRecordOffsets
    implements DebeziumEngine.Offsets {
        private HashMap<String, Object> offsets = new HashMap();

        protected SourceRecordOffsets() {
        }

        public void set(String key, Object value) {
            this.offsets.put(key, value);
        }

        protected HashMap<String, Object> getOffsets() {
            return this.offsets;
        }
    }

    public static final class EngineBuilder
    implements DebeziumEngine.Builder<SourceRecord> {
        private Configuration config;
        private DebeziumEngine.ChangeConsumer<SourceRecord> handler;
        private ClassLoader classLoader;
        private Clock clock;
        private DebeziumEngine.CompletionCallback completionCallback;
        private DebeziumEngine.ConnectorCallback connectorCallback;
        private OffsetCommitPolicy offsetCommitPolicy = null;

        public DebeziumEngine.Builder using(Properties config) {
            this.config = Configuration.from((Properties)config);
            return this;
        }

        public DebeziumEngine.Builder using(ClassLoader classLoader) {
            this.classLoader = classLoader;
            return this;
        }

        public DebeziumEngine.Builder using(DebeziumEngine.CompletionCallback completionCallback) {
            this.completionCallback = completionCallback;
            return this;
        }

        public DebeziumEngine.Builder using(DebeziumEngine.ConnectorCallback connectorCallback) {
            this.connectorCallback = connectorCallback;
            return this;
        }

        public DebeziumEngine.Builder using(OffsetCommitPolicy offsetCommitPolicy) {
            this.offsetCommitPolicy = offsetCommitPolicy;
            return this;
        }

        public DebeziumEngine.Builder notifying(Consumer<SourceRecord> consumer) {
            this.handler = EmbeddedEngine.buildDefaultChangeConsumer(consumer);
            return this;
        }

        public DebeziumEngine.Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> handler) {
            this.handler = handler;
            if (!this.config.hasKey(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()) && !handler.supportsTombstoneEvents()) {
                LOGGER.info("Consumer doesn't support tombstone events, setting '{}' to false.", (Object)CommonConnectorConfig.TOMBSTONES_ON_DELETE.name());
                this.config = ((Configuration.Builder)this.config.edit().with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).build();
            }
            return this;
        }

        public DebeziumEngine.Builder using(final java.time.Clock clock) {
            this.clock = new Clock(){

                public long currentTimeInMillis() {
                    return clock.millis();
                }
            };
            return this;
        }

        public EmbeddedEngine build() {
            if (this.classLoader == null) {
                this.classLoader = Instantiator.getClassLoader();
            }
            if (this.clock == null) {
                this.clock = Clock.system();
            }
            Objects.requireNonNull(this.config, "A connector configuration must be specified.");
            Objects.requireNonNull(this.handler, "A connector consumer or changeHandler must be specified.");
            return new EmbeddedEngine(this.config, this.classLoader, this.clock, this.handler, this.completionCallback, this.connectorCallback, this.offsetCommitPolicy);
        }
    }
}

