/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.factories;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.TestManagedTableSource;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.factories.TestCatalogFactory;
import org.apache.flink.table.factories.TestConflictingDynamicTableFactory1;
import org.apache.flink.table.factories.TestConflictingDynamicTableFactory2;
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.factories.TestFormatFactory;
import org.apache.flink.table.factories.utils.FactoryMocks;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.MutableURLClassLoader;
import org.assertj.core.api.AbstractListAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class FactoryUtilTest {
    FactoryUtilTest() {
    }

    @Test
    void testManagedConnector() {
        Map<String, String> options = FactoryUtilTest.createAllOptions();
        options.remove("connector");
        DynamicTableSource actualSource = FactoryMocks.createTableSource(FactoryMocks.SCHEMA, options);
        Assertions.assertThat((Object)actualSource).isExactlyInstanceOf(TestManagedTableSource.class);
    }

    @Test
    void testInvalidConnector() {
        FactoryUtilTest.assertCreateTableSourceWithOptionModifier(options -> options.put("connector", "FAIL"), "Could not find any factory for identifier 'FAIL' that implements '" + DynamicTableFactory.class.getName() + "' in the classpath.\n\nAvailable factory identifiers are:\n\nconflicting\nsink-only\nsource-only\ntest\ntest-connector");
    }

    @Test
    void testConflictingConnector() {
        FactoryUtilTest.assertCreateTableSourceWithOptionModifier(options -> options.put("connector", "conflicting"), "Multiple factories for identifier 'conflicting' that implement '" + DynamicTableFactory.class.getName() + "' found in the classpath.\n\nAmbiguous factory classes are:\n\n" + TestConflictingDynamicTableFactory1.class.getName() + "\n" + TestConflictingDynamicTableFactory2.class.getName());
    }

    @Test
    void testMissingConnectorOption() {
        FactoryUtilTest.assertCreateTableSourceWithOptionModifier(options -> {
            String cfr_ignored_0 = (String)options.remove("target");
        }, "One or more required options are missing.\n\nMissing required options are:\n\ntarget");
    }

    @Test
    void testInvalidConnectorOption() {
        FactoryUtilTest.assertCreateTableSourceWithOptionModifier(options -> options.put("buffer-size", "FAIL"), "Invalid value for option 'buffer-size'.");
    }

    @Test
    void testMissingFormat() {
        FactoryUtilTest.assertCreateTableSourceWithOptionModifier(options -> {
            String cfr_ignored_0 = (String)options.remove("value.format");
        }, "Could not find required scan format 'value.format'.");
    }

    @Test
    void testInvalidFormat() {
        FactoryUtilTest.assertCreateTableSourceWithOptionModifier(options -> options.put("value.format", "FAIL"), "Could not find any factory for identifier 'FAIL' that implements '" + DeserializationFormatFactory.class.getName() + "' in the classpath.\n\nAvailable factory identifiers are:\n\ntest-format");
    }

    @Test
    void testMissingFormatOption() {
        FactoryUtilTest.assertCreateTableSourceWithOptionModifier(options -> {
            String cfr_ignored_0 = (String)options.remove("key.test-format.delimiter");
        }, "One or more required options are missing.\n\nMissing required options are:\n\ndelimiter", "Error creating scan format 'test-format' in option space 'key.test-format.'.");
    }

    @Test
    void testInvalidFormatOption() {
        FactoryUtilTest.assertCreateTableSourceWithOptionModifier(options -> options.put("key.test-format.fail-on-missing", "FAIL"), "Invalid value for option 'fail-on-missing'.");
    }

    @Test
    void testSecretOption() {
        FactoryUtilTest.assertCreateTableSourceWithOptionModifier(options -> {
            options.remove("target");
            options.put("password", "123");
        }, "Table options are:\n\n'buffer-size'='1000'\n'connector'='test-connector'\n'key.format'='test-format'\n'key.test-format.delimiter'=','\n'password'='******'\n'property-version'='1'\n'value.format'='test-format'\n'value.test-format.delimiter'='|'\n'value.test-format.fail-on-missing'='true'");
    }

    @Test
    void testUnconsumedOption() {
        FactoryUtilTest.assertCreateTableSourceWithOptionModifier(options -> {
            options.put("this-is-not-consumed", "42");
            options.put("this-is-also-not-consumed", "true");
        }, "Unsupported options found for 'test-connector'.\n\nUnsupported options:\n\nthis-is-also-not-consumed\nthis-is-not-consumed\n\nSupported options:\n\nbuffer-size\nconnector\ndeprecated-target (deprecated)\nfallback-buffer-size\nformat\nkey.format\nkey.test-format.changelog-mode\nkey.test-format.delimiter\nkey.test-format.deprecated-delimiter (deprecated)\nkey.test-format.fail-on-missing\nkey.test-format.fallback-fail-on-missing\nkey.test-format.readable-metadata\npassword\nproperty-version\ntarget\nvalue.format\nvalue.test-format.changelog-mode\nvalue.test-format.delimiter\nvalue.test-format.deprecated-delimiter (deprecated)\nvalue.test-format.fail-on-missing\nvalue.test-format.fallback-fail-on-missing\nvalue.test-format.readable-metadata");
    }

    @Test
    void testAllOptions() {
        Map<String, String> options = FactoryUtilTest.createAllOptions();
        DynamicTableSource actualSource = FactoryMocks.createTableSource(FactoryMocks.SCHEMA, options);
        TestDynamicTableFactory.DynamicTableSourceMock expectedSource = new TestDynamicTableFactory.DynamicTableSourceMock("MyTarget", null, new TestFormatFactory.DecodingFormatMock(",", false), new TestFormatFactory.DecodingFormatMock("|", true));
        Assertions.assertThat((Object)actualSource).isEqualTo((Object)expectedSource);
        DynamicTableSink actualSink = FactoryMocks.createTableSink(FactoryMocks.SCHEMA, options);
        TestDynamicTableFactory.DynamicTableSinkMock expectedSink = new TestDynamicTableFactory.DynamicTableSinkMock("MyTarget", 1000L, new TestFormatFactory.EncodingFormatMock(","), new TestFormatFactory.EncodingFormatMock("|"));
        Assertions.assertThat((Object)actualSink).isEqualTo((Object)expectedSink);
    }

    @Test
    void testDiscoveryForSeparateSourceSinkFactory() {
        Map<String, String> options = FactoryUtilTest.createAllOptions();
        options.put("connector", "test");
        DynamicTableSource actualSource = FactoryMocks.createTableSource(FactoryMocks.SCHEMA, options);
        TestDynamicTableFactory.DynamicTableSourceMock expectedSource = new TestDynamicTableFactory.DynamicTableSourceMock("MyTarget", null, new TestFormatFactory.DecodingFormatMock(",", false), new TestFormatFactory.DecodingFormatMock("|", true));
        Assertions.assertThat((Object)actualSource).isEqualTo((Object)expectedSource);
        DynamicTableSink actualSink = FactoryMocks.createTableSink(FactoryMocks.SCHEMA, options);
        TestDynamicTableFactory.DynamicTableSinkMock expectedSink = new TestDynamicTableFactory.DynamicTableSinkMock("MyTarget", 1000L, new TestFormatFactory.EncodingFormatMock(","), new TestFormatFactory.EncodingFormatMock("|"));
        Assertions.assertThat((Object)actualSink).isEqualTo((Object)expectedSink);
    }

    @Test
    void testOptionalFormat() {
        Map<String, String> options = FactoryUtilTest.createAllOptions();
        options.remove("key.format");
        options.remove("key.test-format.delimiter");
        DynamicTableSource actualSource = FactoryMocks.createTableSource(FactoryMocks.SCHEMA, options);
        TestDynamicTableFactory.DynamicTableSourceMock expectedSource = new TestDynamicTableFactory.DynamicTableSourceMock("MyTarget", null, null, new TestFormatFactory.DecodingFormatMock("|", true));
        Assertions.assertThat((Object)actualSource).isEqualTo((Object)expectedSource);
        DynamicTableSink actualSink = FactoryMocks.createTableSink(FactoryMocks.SCHEMA, options);
        TestDynamicTableFactory.DynamicTableSinkMock expectedSink = new TestDynamicTableFactory.DynamicTableSinkMock("MyTarget", 1000L, null, new TestFormatFactory.EncodingFormatMock("|"));
        Assertions.assertThat((Object)actualSink).isEqualTo((Object)expectedSink);
    }

    @Test
    void testAlternativeValueFormat() {
        Map<String, String> options = FactoryUtilTest.createAllOptions();
        options.remove("value.format");
        options.remove("value.test-format.delimiter");
        options.remove("value.test-format.fail-on-missing");
        options.put("format", "test-format");
        options.put("test-format.delimiter", ";");
        options.put("test-format.fail-on-missing", "true");
        DynamicTableSource actualSource = FactoryMocks.createTableSource(FactoryMocks.SCHEMA, options);
        TestDynamicTableFactory.DynamicTableSourceMock expectedSource = new TestDynamicTableFactory.DynamicTableSourceMock("MyTarget", null, new TestFormatFactory.DecodingFormatMock(",", false), new TestFormatFactory.DecodingFormatMock(";", true));
        Assertions.assertThat((Object)actualSource).isEqualTo((Object)expectedSource);
        DynamicTableSink actualSink = FactoryMocks.createTableSink(FactoryMocks.SCHEMA, options);
        TestDynamicTableFactory.DynamicTableSinkMock expectedSink = new TestDynamicTableFactory.DynamicTableSinkMock("MyTarget", 1000L, new TestFormatFactory.EncodingFormatMock(","), new TestFormatFactory.EncodingFormatMock(";"));
        Assertions.assertThat((Object)actualSink).isEqualTo((Object)expectedSink);
    }

    @Test
    void testConnectorErrorHint() {
        Assertions.assertThatThrownBy(() -> FactoryMocks.createTableSource(FactoryMocks.SCHEMA, Collections.singletonMap("connector", "sink-only"))).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, (String)"Connector 'sink-only' can only be used as a sink. It cannot be used as a source.")});
        Assertions.assertThatThrownBy(() -> FactoryMocks.createTableSink(FactoryMocks.SCHEMA, Collections.singletonMap("connector", "source-only"))).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, (String)"Connector 'source-only' can only be used as a source. It cannot be used as a sink.")});
    }

    @Test
    void testRequiredPlaceholderOption() {
        HashSet<ConfigOption> requiredOptions = new HashSet<ConfigOption>();
        requiredOptions.add(ConfigOptions.key((String)"fields.#.min").intType().noDefaultValue());
        requiredOptions.add(ConfigOptions.key((String)"no.placeholder.anymore").intType().noDefaultValue().withFallbackKeys(new String[]{"old.fields.#.min"}));
        FactoryUtil.validateFactoryOptions(requiredOptions, new HashSet(), (ReadableConfig)new Configuration());
    }

    @Test
    void testCreateCatalog() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CommonCatalogOptions.CATALOG_TYPE.key(), "test-catalog");
        options.put(TestCatalogFactory.DEFAULT_DATABASE.key(), "my-database");
        Catalog catalog = FactoryUtil.createCatalog((String)"my-catalog", options, null, (ClassLoader)Thread.currentThread().getContextClassLoader());
        Assertions.assertThat((Object)catalog).isInstanceOf(TestCatalogFactory.TestCatalog.class);
        TestCatalogFactory.TestCatalog testCatalog = (TestCatalogFactory.TestCatalog)catalog;
        Assertions.assertThat((String)"my-catalog").isEqualTo(testCatalog.getName());
        Assertions.assertThat((String)"my-database").isEqualTo(testCatalog.getOptions().get(TestCatalogFactory.DEFAULT_DATABASE.key()));
    }

    @Test
    void testCatalogFactoryHelper() {
        FactoryUtil.CatalogFactoryHelper helper1 = FactoryUtil.createCatalogFactoryHelper((CatalogFactory)new TestCatalogFactory(), (CatalogFactory.Context)new FactoryUtil.DefaultCatalogContext("test", Collections.emptyMap(), null, Thread.currentThread().getContextClassLoader()));
        helper1.validate();
        FactoryUtil.CatalogFactoryHelper helper2 = FactoryUtil.createCatalogFactoryHelper((CatalogFactory)new TestCatalogFactory(), (CatalogFactory.Context)new FactoryUtil.DefaultCatalogContext("test", Collections.singletonMap("x", "y"), null, Thread.currentThread().getContextClassLoader()));
        Assertions.assertThatThrownBy(() -> ((FactoryUtil.CatalogFactoryHelper)helper2).validate()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, (String)"Unsupported options found for 'test-catalog'")});
    }

    @Test
    void testFactoryHelperWithDeprecatedOptions() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("deprecated-target", "MyTarget");
        options.put("fallback-buffer-size", "1000");
        options.put("value.format", "test-format");
        options.put("value.test-format.deprecated-delimiter", "|");
        options.put("value.test-format.fallback-fail-on-missing", "true");
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)new TestDynamicTableFactory(), (DynamicTableFactory.Context)FactoryMocks.createTableContext(FactoryMocks.SCHEMA, options));
        helper.discoverDecodingFormat(DeserializationFormatFactory.class, TestDynamicTableFactory.VALUE_FORMAT);
        helper.validate();
    }

    @Test
    void testFactoryHelperWithEnrichmentOptions() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(TestDynamicTableFactory.TARGET.key(), "abc");
        options.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "1000");
        HashMap<String, String> enrichment = new HashMap<String, String>();
        enrichment.put(TestDynamicTableFactory.TARGET.key(), "xyz");
        enrichment.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "2000");
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)new TestDynamicTableFactory(), (DynamicTableFactory.Context)FactoryMocks.createTableContext(FactoryMocks.SCHEMA, options, enrichment));
        helper.validate();
        Assertions.assertThat((String)((String)helper.getOptions().get(TestDynamicTableFactory.TARGET))).isEqualTo("abc");
        Assertions.assertThat((Long)((Long)helper.getOptions().get(TestDynamicTableFactory.BUFFER_SIZE))).isEqualTo(2000L);
    }

    @Test
    void testFactoryHelperWithEnrichmentOptionsAndFormat() {
        String keyFormatPrefix = FactoryUtil.getFormatPrefix(TestDynamicTableFactory.KEY_FORMAT, (String)"test-format");
        String valueFormatPrefix = FactoryUtil.getFormatPrefix(TestDynamicTableFactory.VALUE_FORMAT, (String)"test-format");
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(TestDynamicTableFactory.TARGET.key(), "abc");
        options.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "1000");
        options.put(TestDynamicTableFactory.KEY_FORMAT.key(), "test-format");
        options.put(keyFormatPrefix + TestFormatFactory.DELIMITER.key(), "|");
        options.put(keyFormatPrefix + TestFormatFactory.FAIL_ON_MISSING.key(), "true");
        options.put(TestDynamicTableFactory.VALUE_FORMAT.key(), "test-format");
        options.put(valueFormatPrefix + TestFormatFactory.DELIMITER.key(), "|");
        options.put(valueFormatPrefix + TestFormatFactory.FAIL_ON_MISSING.key(), "true");
        HashMap<String, String> enrichment = new HashMap<String, String>();
        enrichment.put(TestDynamicTableFactory.TARGET.key(), "xyz");
        enrichment.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "2000");
        enrichment.put(TestDynamicTableFactory.KEY_FORMAT.key(), "test-format");
        enrichment.put(keyFormatPrefix + TestFormatFactory.DELIMITER.key(), ",");
        enrichment.put(keyFormatPrefix + TestFormatFactory.FAIL_ON_MISSING.key(), "true");
        enrichment.put(TestDynamicTableFactory.VALUE_FORMAT.key(), "test-format");
        enrichment.put(valueFormatPrefix + TestFormatFactory.DELIMITER.key(), "|");
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)new TestDynamicTableFactory(), (DynamicTableFactory.Context)FactoryMocks.createTableContext(FactoryMocks.SCHEMA, options, enrichment));
        TestFormatFactory.DecodingFormatMock keyDecodingFormatMock = (TestFormatFactory.DecodingFormatMock)helper.discoverDecodingFormat(DeserializationFormatFactory.class, TestDynamicTableFactory.KEY_FORMAT);
        TestFormatFactory.DecodingFormatMock valueDecodingFormatMock = (TestFormatFactory.DecodingFormatMock)helper.discoverDecodingFormat(DeserializationFormatFactory.class, TestDynamicTableFactory.VALUE_FORMAT);
        helper.validate();
        Assertions.assertThat((String)((String)helper.getOptions().get(TestDynamicTableFactory.TARGET))).isEqualTo("abc");
        Assertions.assertThat((Long)((Long)helper.getOptions().get(TestDynamicTableFactory.BUFFER_SIZE))).isEqualTo(2000L);
        Assertions.assertThat((String)keyDecodingFormatMock.delimiter).isEqualTo(",");
        Assertions.assertThat((Boolean)keyDecodingFormatMock.failOnMissing).isTrue();
        Assertions.assertThat((String)valueDecodingFormatMock.delimiter).isEqualTo("|");
        Assertions.assertThat((Boolean)valueDecodingFormatMock.failOnMissing).isTrue();
    }

    @Test
    void testFactoryHelperWithEnrichmentOptionsMissingFormatIdentifier() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(TestDynamicTableFactory.TARGET.key(), "abc");
        HashMap<String, String> enrichment = new HashMap<String, String>();
        enrichment.put(TestDynamicTableFactory.TARGET.key(), "xyz");
        enrichment.put(TestDynamicTableFactory.KEY_FORMAT.key(), "test-format");
        enrichment.put(FactoryUtil.getFormatPrefix(TestDynamicTableFactory.KEY_FORMAT, (String)"test-format") + TestFormatFactory.DELIMITER.key(), ",");
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)new TestDynamicTableFactory(), (DynamicTableFactory.Context)FactoryMocks.createTableContext(FactoryMocks.SCHEMA, options, enrichment));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> helper.discoverDecodingFormat(DeserializationFormatFactory.class, TestDynamicTableFactory.KEY_FORMAT)).isInstanceOf(ValidationException.class)).hasMessageContaining(String.format("The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'", TestDynamicTableFactory.KEY_FORMAT, "test-format"));
    }

    @Test
    void testFactoryHelperWithEnrichmentOptionsFormatMismatch() {
        String keyFormatPrefix = FactoryUtil.getFormatPrefix(TestDynamicTableFactory.KEY_FORMAT, (String)"test-format");
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(TestDynamicTableFactory.TARGET.key(), "abc");
        options.put(TestDynamicTableFactory.KEY_FORMAT.key(), "test-format");
        options.put(keyFormatPrefix + TestFormatFactory.DELIMITER.key(), "|");
        options.put(keyFormatPrefix + TestFormatFactory.FAIL_ON_MISSING.key(), "true");
        HashMap<String, String> enrichment = new HashMap<String, String>();
        enrichment.put(TestDynamicTableFactory.TARGET.key(), "xyz");
        enrichment.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "2000");
        enrichment.put(TestDynamicTableFactory.KEY_FORMAT.key(), "another-format");
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)new TestDynamicTableFactory(), (DynamicTableFactory.Context)FactoryMocks.createTableContext(FactoryMocks.SCHEMA, options, enrichment));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> helper.discoverDecodingFormat(DeserializationFormatFactory.class, TestDynamicTableFactory.KEY_FORMAT)).isInstanceOf(ValidationException.class)).hasMessageContaining(String.format("Both persisted plan table and catalog table define the format option '%s', but they mismatch: '%s' != '%s'", TestDynamicTableFactory.KEY_FORMAT, "test-format", "another-format"));
    }

    @Test
    void testFactoryHelperWithEmptyEnrichmentOptions() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(TestDynamicTableFactory.TARGET.key(), "abc");
        options.put(TestDynamicTableFactory.BUFFER_SIZE.key(), "1000");
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)new TestDynamicTableFactory(), (DynamicTableFactory.Context)FactoryMocks.createTableContext(FactoryMocks.SCHEMA, options));
        helper.validate();
        Assertions.assertThat((String)((String)helper.getOptions().get(TestDynamicTableFactory.TARGET))).isEqualTo("abc");
        Assertions.assertThat((Long)((Long)helper.getOptions().get(TestDynamicTableFactory.BUFFER_SIZE))).isEqualTo(1000L);
    }

    @Test
    void testFactoryHelperWithMapOption() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("properties.prop-1", "value-1");
        options.put("properties.prop-2", "value-2");
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)new TestFactoryWithMap(), (DynamicTableFactory.Context)FactoryMocks.createTableContext(FactoryMocks.SCHEMA, options));
        helper.validate();
    }

    @Test
    void testInvalidFactoryHelperWithMapOption() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("properties.prop-1", "value-1");
        options.put("properties.prop-2", "value-2");
        options.put("unknown", "value-3");
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)new TestFactoryWithMap(), (DynamicTableFactory.Context)FactoryMocks.createTableContext(FactoryMocks.SCHEMA, options));
        Assertions.assertThatThrownBy(() -> ((FactoryUtil.TableFactoryHelper)helper).validate()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, (String)"Unsupported options found for 'test-factory-with-map'.\n\nUnsupported options:\n\nunknown\n\nSupported options:\n\nconnector\nproperties\nproperties.prop-1\nproperties.prop-2\nproperty-version")});
    }

    @Test
    void testDiscoverFactoryBadClass(@TempDir Path tempDir) throws IOException {
        String subInterfaceName = "MyFancySerializationSchemaFormat";
        String subInterfaceImplementationName = "MyFancySerializationSchemaFormatImpl";
        String serializationSchemaImplementationName = "AnotherSerializationSchema";
        URLClassLoader classLoaderIncludingTheInterface = ClassLoaderUtils.withRoot((File)tempDir.toFile()).addClass("MyFancySerializationSchemaFormat", "public interface MyFancySerializationSchemaFormat extends " + SerializationFormatFactory.class.getName() + " {}").addClass("MyFancySerializationSchemaFormatImpl", "import org.apache.flink.api.common.serialization.SerializationSchema;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.table.connector.format.EncodingFormat;import org.apache.flink.table.data.RowData;import org.apache.flink.table.factories.DynamicTableFactory;import org.apache.flink.table.factories.SerializationFormatFactory;import java.util.Set;public class MyFancySerializationSchemaFormatImpl implements MyFancySerializationSchemaFormat {@Override public String factoryIdentifier() { return null; }@Override public Set<ConfigOption<?>> requiredOptions() { return null; }@Override public Set<ConfigOption<?>> optionalOptions() { return null; }@Override public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) { return null; }}").addClass("AnotherSerializationSchema", "import org.apache.flink.api.common.serialization.SerializationSchema;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.table.connector.format.EncodingFormat;import org.apache.flink.table.data.RowData;import org.apache.flink.table.factories.DynamicTableFactory;import org.apache.flink.table.factories.SerializationFormatFactory;import java.util.Set;public class AnotherSerializationSchema implements " + SerializationFormatFactory.class.getName() + " {@Override public String factoryIdentifier() { return null; }@Override public Set<ConfigOption<?>> requiredOptions() { return null; }@Override public Set<ConfigOption<?>> optionalOptions() { return null; }@Override public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) { return null; }}").addService(Factory.class.getName(), "MyFancySerializationSchemaFormatImpl").addService(Factory.class.getName(), "AnotherSerializationSchema").build();
        Files.delete(tempDir.resolve("MyFancySerializationSchemaFormat.class"));
        ((AbstractListAssert)Assertions.assertThat((List)FactoryUtil.discoverFactories((ClassLoader)classLoaderIncludingTheInterface)).map(f -> f.getClass().getName()).doesNotContain((Object[])new String[]{"MyFancySerializationSchemaFormatImpl"})).contains((Object[])new String[]{"AnotherSerializationSchema"});
    }

    @Test
    void testDiscoverFactoryFromClosedClassLoader() throws Exception {
        MutableURLClassLoader classLoader = FlinkUserCodeClassLoaders.create((URL[])new URL[0], (ClassLoader)FactoryUtilTest.class.getClassLoader(), (ReadableConfig)new Configuration());
        classLoader.close();
        Assertions.assertThatThrownBy(() -> FactoryUtil.discoverFactory((ClassLoader)classLoader, Factory.class, (String)"test")).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(IllegalStateException.class, (String)"Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'")});
    }

    private static void assertCreateTableSourceWithOptionModifier(Consumer<Map<String, String>> optionModifier, String ... messages) {
        AbstractThrowableAssert assertion = Assertions.assertThatThrownBy(() -> {
            Map<String, String> options = FactoryUtilTest.createAllOptions();
            optionModifier.accept(options);
            FactoryMocks.createTableSource(FactoryMocks.SCHEMA, options);
        });
        for (String message : messages) {
            assertion.satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, (String)message)});
        }
    }

    private static Map<String, String> createAllOptions() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("property-version", "1");
        options.put("connector", "test-connector");
        options.put("target", "MyTarget");
        options.put("buffer-size", "1000");
        options.put("key.format", "test-format");
        options.put("key.test-format.delimiter", ",");
        options.put("value.format", "test-format");
        options.put("value.test-format.delimiter", "|");
        options.put("value.test-format.fail-on-missing", "true");
        return options;
    }

    private static class TestFactoryWithMap
    implements DynamicTableFactory {
        public static final ConfigOption<Map<String, String>> PROPERTIES = ConfigOptions.key((String)"properties").mapType().noDefaultValue();

        private TestFactoryWithMap() {
        }

        public String factoryIdentifier() {
            return "test-factory-with-map";
        }

        public Set<ConfigOption<?>> requiredOptions() {
            return Collections.emptySet();
        }

        public Set<ConfigOption<?>> optionalOptions() {
            return Collections.singleton(PROPERTIES);
        }
    }
}

