/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.service;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.TableFunc0;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.Condition;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

public class SqlGatewayServiceITCase
extends AbstractTestBase {
    @RegisterExtension
    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension();
    private static SessionManager sessionManager;
    private static SqlGatewayServiceImpl service;
    private final SessionEnvironment defaultSessionEnvironment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).build();
    private final ThreadFactory threadFactory = new ExecutorThreadFactory("SqlGatewayService Test Pool", (Thread.UncaughtExceptionHandler)IgnoreExceptionHandler.INSTANCE);

    @BeforeAll
    public static void setUp() {
        sessionManager = SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager();
        service = (SqlGatewayServiceImpl)SQL_GATEWAY_SERVICE_EXTENSION.getService();
    }

    @Test
    public void testOpenSessionWithConfig() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("key1", "val1");
        options.put("key2", "val2");
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).addSessionConfig(options).build();
        SessionHandle sessionHandle = service.openSession(environment);
        Map actualConfig = service.getSessionConfig(sessionHandle);
        Assertions.assertThat((Map)actualConfig).containsAllEntriesOf(options);
    }

    @Test
    public void testOpenSessionWithEnvironment() throws Exception {
        String catalogName = "default";
        String databaseName = "testDb";
        String moduleName = "testModule";
        GenericInMemoryCatalog defaultCatalog = new GenericInMemoryCatalog(catalogName, databaseName);
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog(catalogName, (Catalog)defaultCatalog).registerModuleAtHead(moduleName, (Module)new TestModule()).setDefaultCatalog(catalogName).build();
        SessionHandle sessionHandle = service.openSession(environment);
        TableEnvironmentInternal tableEnv = service.getSession(sessionHandle).createExecutor(new Configuration()).getTableEnvironment();
        Assertions.assertThat((String)tableEnv.getCurrentCatalog()).isEqualTo(catalogName);
        Assertions.assertThat((String)tableEnv.getCurrentDatabase()).isEqualTo(databaseName);
        Assertions.assertThat((Object[])tableEnv.listModules()).contains((Object[])new String[]{moduleName});
    }

    @Test
    public void testFetchResultsInRunning() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch startRunningLatch = new CountDownLatch(1);
        CountDownLatch endRunningLatch = new CountDownLatch(1);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> {
            startRunningLatch.countDown();
            endRunningLatch.await();
        });
        startRunningLatch.await();
        Assertions.assertThat((Object)service.fetchResults(sessionHandle, operationHandle, 0L, Integer.MAX_VALUE)).isEqualTo((Object)ResultSet.NOT_READY_RESULTS);
        endRunningLatch.countDown();
    }

    @Test
    public void testGetOperationFinishedAndFetchResults() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch startRunningLatch = new CountDownLatch(1);
        CountDownLatch endRunningLatch = new CountDownLatch(1);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> {
            startRunningLatch.countDown();
            endRunningLatch.await();
        });
        startRunningLatch.await();
        Assertions.assertThat((Object)service.getOperationInfo(sessionHandle, operationHandle)).isEqualTo((Object)new OperationInfo(OperationStatus.RUNNING));
        endRunningLatch.countDown();
        OperationInfo expectedInfo = new OperationInfo(OperationStatus.FINISHED);
        CommonTestUtils.waitUtil(() -> service.getOperationInfo(sessionHandle, operationHandle).equals((Object)expectedInfo), (Duration)Duration.ofSeconds(10L), (String)"Failed to wait operation finish.");
        Long token = 0L;
        List expectedData = this.getDefaultResultSet().getData();
        ArrayList actualData = new ArrayList();
        while (token != null) {
            ResultSet currentResult = service.fetchResults(sessionHandle, operationHandle, token.longValue(), 1);
            actualData.addAll((Collection)Preconditions.checkNotNull((Object)currentResult.getData()));
            token = currentResult.getNextToken();
        }
        Assertions.assertThat(actualData).isEqualTo((Object)expectedData);
        service.closeOperation(sessionHandle, operationHandle);
        Assertions.assertThat((int)sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
    }

    @Test
    public void testCancelOperation() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch startRunningLatch = new CountDownLatch(1);
        CountDownLatch endRunningLatch = new CountDownLatch(1);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> {
            startRunningLatch.countDown();
            endRunningLatch.await();
        });
        startRunningLatch.await();
        Assertions.assertThat((Object)service.getOperationInfo(sessionHandle, operationHandle)).isEqualTo((Object)new OperationInfo(OperationStatus.RUNNING));
        service.cancelOperation(sessionHandle, operationHandle);
        Assertions.assertThat((Object)service.getOperationInfo(sessionHandle, operationHandle)).isEqualTo((Object)new OperationInfo(OperationStatus.CANCELED));
        service.closeOperation(sessionHandle, operationHandle);
        Assertions.assertThat((int)sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
    }

    @Test
    public void testOperationGetErrorAndFetchError() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch startRunningLatch = new CountDownLatch(1);
        String msg = "Artificial Exception.";
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> {
            startRunningLatch.countDown();
            throw new SqlExecutionException(msg);
        });
        startRunningLatch.await();
        CommonTestUtils.waitUtil(() -> service.getOperationInfo(sessionHandle, operationHandle).getStatus().equals((Object)OperationStatus.ERROR), (Duration)Duration.ofSeconds(10L), (String)"Failed to get expected operation status.");
        AssertionsForClassTypes.assertThatThrownBy(() -> service.fetchResults(sessionHandle, operationHandle, 0L, Integer.MAX_VALUE)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlExecutionException.class, (String)msg)});
        service.closeOperation(sessionHandle, operationHandle);
        Assertions.assertThat((int)sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
    }

    @Test
    public void testExecuteSqlWithConfig() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        String key = "username";
        String value = "Flink";
        OperationHandle operationHandle = service.executeStatement(sessionHandle, "SET", -1L, Configuration.fromMap(Collections.singletonMap(key, value)));
        Long token = 0L;
        ArrayList settings = new ArrayList();
        while (token != null) {
            ResultSet result = service.fetchResults(sessionHandle, operationHandle, token.longValue(), Integer.MAX_VALUE);
            settings.addAll(result.getData());
            token = result.getNextToken();
        }
        Assertions.assertThat(settings).contains((Object[])new RowData[]{GenericRowData.of((Object[])new Object[]{StringData.fromString((String)key), StringData.fromString((String)value)})});
    }

    @Test
    public void testGetOperationSchemaUntilOperationIsReady() throws Exception {
        this.runGetOperationSchemaUntilOperationIsReadyOrError(this::getDefaultResultSet, (org.apache.flink.util.function.ThrowingConsumer<FutureTask<ResolvedSchema>, Exception>)((org.apache.flink.util.function.ThrowingConsumer)task -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat(task.get()).isEqualTo((Object)this.getDefaultResultSet().getResultSchema());
        }));
    }

    @Test
    public void testGetCurrentCatalog() {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat1", (Catalog)new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", (Catalog)new GenericInMemoryCatalog("cat2")).setDefaultCatalog("cat2").build();
        SessionHandle sessionHandle = service.openSession(environment);
        Assertions.assertThat((String)service.getCurrentCatalog(sessionHandle)).isEqualTo("cat2");
    }

    @Test
    public void testListCatalogs() {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat1", (Catalog)new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", (Catalog)new GenericInMemoryCatalog("cat2")).build();
        SessionHandle sessionHandle = service.openSession(environment);
        Assertions.assertThat((Collection)service.listCatalogs(sessionHandle)).contains((Object[])new String[]{"cat1", "cat2"});
    }

    @Test
    public void testListDatabases() throws Exception {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat", (Catalog)new GenericInMemoryCatalog("cat")).setDefaultCatalog("cat").build();
        SessionHandle sessionHandle = service.openSession(environment);
        Configuration configuration = Configuration.fromMap((Map)service.getSessionConfig(sessionHandle));
        service.executeStatement(sessionHandle, "CREATE DATABASE db1", -1L, configuration);
        OperationHandle operationHandle = service.executeStatement(sessionHandle, "CREATE DATABASE db2", -1L, configuration);
        CommonTestUtils.waitUtil(() -> service.getOperationInfo(sessionHandle, operationHandle).getStatus().isTerminalStatus(), (Duration)Duration.ofSeconds(100L), (String)"Failed to wait operation finish.");
        Assertions.assertThat((Collection)service.listDatabases(sessionHandle, "cat")).contains((Object[])new String[]{"db1", "db2"});
    }

    @Test
    public void testListTables() {
        SessionHandle sessionHandle = this.createInitializedSession();
        Assertions.assertThat((Collection)service.listTables(sessionHandle, "cat1", "db1", new HashSet<CatalogBaseTable.TableKind>(Arrays.asList(CatalogBaseTable.TableKind.TABLE, CatalogBaseTable.TableKind.VIEW)))).isEqualTo(new HashSet<TableInfo>(Arrays.asList(new TableInfo(ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl1"), CatalogBaseTable.TableKind.TABLE), new TableInfo(ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl2"), CatalogBaseTable.TableKind.TABLE), new TableInfo(ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl3"), CatalogBaseTable.TableKind.VIEW), new TableInfo(ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl4"), CatalogBaseTable.TableKind.VIEW))));
        Assertions.assertThat((Collection)service.listTables(sessionHandle, "cat1", "db2", Collections.singleton(CatalogBaseTable.TableKind.TABLE))).isEqualTo(Collections.singleton(new TableInfo(ObjectIdentifier.of((String)"cat1", (String)"db2", (String)"tbl1"), CatalogBaseTable.TableKind.TABLE)));
        Assertions.assertThat((Collection)service.listTables(sessionHandle, "cat2", "db0", Collections.singleton(CatalogBaseTable.TableKind.VIEW))).isEqualTo(Collections.emptySet());
    }

    @Test
    public void testListSystemFunctions() {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat1", (Catalog)new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", (Catalog)new GenericInMemoryCatalog("cat2")).build();
        SessionHandle sessionHandle = service.openSession(environment);
        Assertions.assertThat((Collection)service.listSystemFunctions(sessionHandle)).contains((Object[])new FunctionInfo[]{new FunctionInfo(FunctionIdentifier.of((String)"sin"), FunctionKind.SCALAR), new FunctionInfo(FunctionIdentifier.of((String)"sum"), FunctionKind.AGGREGATE), new FunctionInfo(FunctionIdentifier.of((String)"as"), FunctionKind.OTHER)});
    }

    @Test
    public void testListUserDefinedFunctions() {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat1", (Catalog)new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", (Catalog)new GenericInMemoryCatalog("cat2")).build();
        SessionHandle sessionHandle = service.openSession(environment);
        TableEnvironmentInternal tEnv = service.getSession(sessionHandle).createExecutor().getTableEnvironment();
        tEnv.createTemporarySystemFunction("count_distinct", JavaUserDefinedAggFunctions.CountDistinct.class);
        tEnv.createFunction("java1", JavaUserDefinedScalarFunctions.JavaFunc1.class);
        tEnv.createTemporaryFunction("table_func0", TableFunc0.class);
        tEnv.createFunction("cat1.default.filter_out_function", JavaUserDefinedScalarFunctions.JavaFunc1.class);
        Assertions.assertThat((Collection)service.listUserDefinedFunctions(sessionHandle, "default_catalog", "default_database")).contains((Object[])new FunctionInfo[]{new FunctionInfo(FunctionIdentifier.of((String)"count_distinct")), new FunctionInfo(FunctionIdentifier.of((ObjectIdentifier)ObjectIdentifier.of((String)"default_catalog", (String)"default_database", (String)"java1"))), new FunctionInfo(FunctionIdentifier.of((ObjectIdentifier)ObjectIdentifier.of((String)"default_catalog", (String)"default_database", (String)"table_func0")))});
    }

    @Test
    public void testGetTable() {
        SessionHandle sessionHandle = this.createInitializedSession();
        ResolvedCatalogTable actualTable = (ResolvedCatalogTable)service.getTable(sessionHandle, ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl1"));
        Assertions.assertThat((Object)actualTable.getResolvedSchema()).isEqualTo((Object)ResolvedSchema.of((Column[])new Column[0]));
        Assertions.assertThat((Map)actualTable.getOptions()).isEqualTo(Collections.singletonMap("connector", "values"));
        ResolvedCatalogView actualView = (ResolvedCatalogView)service.getTable(sessionHandle, ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl3"));
        Assertions.assertThat((String)actualView.getOriginalQuery()).isEqualTo("SELECT 1");
    }

    @Test
    public void testCancelOperationAndFetchResultInParallel() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch latch = new CountDownLatch(1);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, latch::await);
        this.runCancelOrCloseOperationWhenFetchResults(sessionHandle, operationHandle, () -> service.cancelOperation(sessionHandle, operationHandle), (Condition<String>)new Condition(msg -> msg.contains(String.format("Can not fetch results from the %s in %s status.", operationHandle, OperationStatus.CANCELED)), "Fetch results with expected error message.", new Object[0]));
        latch.countDown();
    }

    @Test
    public void testCloseOperationAndFetchResultInParallel() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> Thread.sleep(1L));
        this.runCancelOrCloseOperationWhenFetchResults(sessionHandle, operationHandle, () -> service.closeOperation(sessionHandle, operationHandle), (Condition<String>)new Condition(msg -> msg.contains(String.format("Can not find the submitted operation in the OperationManager with the %s.", operationHandle)) || msg.contains(String.format("Can not fetch results from the %s in %s status.", operationHandle, OperationStatus.CLOSED)), "Fetch results with expected error message.", new Object[0]));
    }

    @Test
    public void testCancelAndCloseOperationInParallel() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        int operationNum = 200;
        ArrayList<OperationManager.Operation> operations = new ArrayList<OperationManager.Operation>(operationNum);
        for (int i = 0; i < operationNum; ++i) {
            boolean throwError = i % 2 == 0;
            OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> {
                Thread.sleep(100L);
                if (throwError) {
                    throw new SqlGatewayException("Artificial Exception.");
                }
            });
            operations.add(service.getSession(sessionHandle).getOperationManager().getOperation(operationHandle));
            this.threadFactory.newThread(() -> service.cancelOperation(sessionHandle, operationHandle)).start();
            this.threadFactory.newThread(() -> service.closeOperation(sessionHandle, operationHandle)).start();
        }
        CommonTestUtils.waitUtil(() -> service.getSession(sessionHandle).getOperationManager().getOperationCount() == 0, (Duration)Duration.ofSeconds(10L), (String)"All operations should be closed.");
        for (OperationManager.Operation op : operations) {
            Assertions.assertThat((Comparable)op.getOperationInfo().getStatus()).isEqualTo((Object)OperationStatus.CLOSED);
        }
    }

    @Test
    public void testSubmitOperationAndCloseOperationManagerInParallel1() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        OperationManager manager = service.getSession(sessionHandle).getOperationManager();
        int submitThreadsNum = 100;
        CountDownLatch latch = new CountDownLatch(submitThreadsNum);
        for (int i = 0; i < submitThreadsNum; ++i) {
            this.threadFactory.newThread(() -> {
                try {
                    this.submitDefaultOperation(sessionHandle, () -> {});
                }
                finally {
                    latch.countDown();
                }
            }).start();
        }
        manager.close();
        latch.await();
        Assertions.assertThat((int)manager.getOperationCount()).isEqualTo(0);
    }

    @Test
    public void testSubmitOperationAndCloseOperationManagerInParallel2() throws Exception {
        int count = 3;
        CountDownLatch startRunning = new CountDownLatch(1);
        CountDownLatch terminateRunning = new CountDownLatch(1);
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        for (int i = 0; i < count; ++i) {
            this.threadFactory.newThread(() -> service.submitOperation(sessionHandle, () -> {
                startRunning.countDown();
                terminateRunning.await();
                return this.getDefaultResultSet();
            })).start();
        }
        startRunning.await();
        service.getSession(sessionHandle).getOperationManager().close();
        terminateRunning.countDown();
    }

    @Test
    public void testExecuteOperationInSequence() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        AtomicReference<Integer> v = new AtomicReference<Integer>(0);
        int threadNum = 100;
        ArrayList<OperationHandle> handles = new ArrayList<OperationHandle>();
        for (int i = 0; i < threadNum; ++i) {
            handles.add(service.submitOperation(sessionHandle, () -> {
                int origin = (Integer)v.get();
                v.set(origin + 1);
                return this.getDefaultResultSet();
            }));
        }
        for (OperationHandle handle : handles) {
            CommonTestUtils.waitUtil(() -> service.getOperationInfo(sessionHandle, handle).getStatus().isTerminalStatus(), (Duration)Duration.ofSeconds(10L), (String)"Failed to wait operation terminate");
        }
        Assertions.assertThat((Integer)v.get()).isEqualTo(threadNum);
    }

    @Test
    public void testReleaseLockWhenFailedToSubmitOperation() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        int maximumThreads = 500;
        ArrayList<SessionHandle> sessions = new ArrayList<SessionHandle>();
        ArrayList<OperationHandle> operations = new ArrayList<OperationHandle>();
        for (int i = 0; i < maximumThreads; ++i) {
            SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
            sessions.add(sessionHandle);
            operations.add(service.submitOperation(sessionHandle, () -> {
                latch.await();
                return this.getDefaultResultSet();
            }));
        }
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        AssertionsForClassTypes.assertThatThrownBy(() -> service.submitOperation(sessionHandle, () -> {
            latch.await();
            return this.getDefaultResultSet();
        })).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RejectedExecutionException.class)});
        latch.countDown();
        CommonTestUtils.waitUtil(() -> service.getOperationInfo((SessionHandle)sessions.get(0), (OperationHandle)operations.get(0)).getStatus().isTerminalStatus(), (Duration)Duration.ofSeconds(10L), (String)"Should come to end soon.");
        CountDownLatch success = new CountDownLatch(1);
        service.submitOperation(sessionHandle, () -> {
            success.countDown();
            return this.getDefaultResultSet();
        });
        CommonTestUtils.waitUtil(() -> success.getCount() == 0L, (Duration)Duration.ofSeconds(10L), (String)"Should come to end.");
    }

    @Test
    public void testFetchResultsFromCanceledOperation() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch latch = new CountDownLatch(1);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, latch::await);
        service.cancelOperation(sessionHandle, operationHandle);
        AssertionsForClassTypes.assertThatThrownBy(() -> service.fetchResults(sessionHandle, operationHandle, 0L, Integer.MAX_VALUE)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)String.format("Can not fetch results from the %s in %s status.", operationHandle, OperationStatus.CANCELED))});
        latch.countDown();
    }

    @Test
    public void testRequestNonExistOperation() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        OperationHandle operationHandle = OperationHandle.create();
        List<RunnableWithException> requests = Arrays.asList(() -> service.cancelOperation(sessionHandle, operationHandle), () -> service.getOperationInfo(sessionHandle, operationHandle), () -> service.fetchResults(sessionHandle, operationHandle, 0L, Integer.MAX_VALUE));
        for (RunnableWithException request : requests) {
            AssertionsForClassTypes.assertThatThrownBy(() -> ((RunnableWithException)request).run()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)String.format("Can not find the submitted operation in the OperationManager with the %s.", operationHandle))});
        }
    }

    @Test
    public void testGetOperationSchemaWhenOperationGetError() throws Exception {
        String msg = "Artificial Exception.";
        this.runGetOperationSchemaUntilOperationIsReadyOrError(() -> {
            throw new SqlGatewayException(msg);
        }, (org.apache.flink.util.function.ThrowingConsumer<FutureTask<ResolvedSchema>, Exception>)((org.apache.flink.util.function.ThrowingConsumer)task -> {
            AbstractThrowableAssert cfr_ignored_0 = (AbstractThrowableAssert)AssertionsForClassTypes.assertThatThrownBy(task::get).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlGatewayException.class, (String)msg)});
        }));
    }

    private OperationHandle submitDefaultOperation(SessionHandle sessionHandle, RunnableWithException executor) {
        return service.submitOperation(sessionHandle, () -> {
            executor.run();
            return this.getDefaultResultSet();
        });
    }

    private ResultSet getDefaultResultSet() {
        List<RowData> data = Arrays.asList(GenericRowData.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1L, StringData.fromString((String)"Flink CDC"), 3}), GenericRowData.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2L, StringData.fromString((String)"MySql"), null}), GenericRowData.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, null, null}), GenericRowData.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, null, 101}));
        return new ResultSet(ResultSet.ResultType.PAYLOAD, null, ResolvedSchema.of((Column[])new Column[]{Column.physical((String)"id", (DataType)DataTypes.BIGINT()), Column.physical((String)"name", (DataType)DataTypes.STRING()), Column.physical((String)"age", (DataType)DataTypes.INT())}), data);
    }

    private void runGetOperationSchemaUntilOperationIsReadyOrError(Callable<ResultSet> executor, org.apache.flink.util.function.ThrowingConsumer<FutureTask<ResolvedSchema>, Exception> validator) throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch operationIsRunning = new CountDownLatch(1);
        CountDownLatch schemaFetcherIsRunning = new CountDownLatch(1);
        OperationHandle operationHandle = service.submitOperation(sessionHandle, () -> {
            operationIsRunning.await();
            return (ResultSet)executor.call();
        });
        FutureTask<ResolvedSchema> task = new FutureTask<ResolvedSchema>(() -> {
            schemaFetcherIsRunning.countDown();
            return service.getOperationResultSchema(sessionHandle, operationHandle);
        });
        this.threadFactory.newThread(task).start();
        schemaFetcherIsRunning.await();
        operationIsRunning.countDown();
        validator.accept(task);
    }

    private void runCancelOrCloseOperationWhenFetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, RunnableWithException cancelOrClose, Condition<String> condition) {
        ArrayList actual = new ArrayList();
        this.threadFactory.newThread(() -> {
            try {
                cancelOrClose.run();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }).start();
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            Long token = 0L;
            while (token != null) {
                ResultSet resultSet = service.fetchResults(sessionHandle, operationHandle, token.longValue(), Integer.MAX_VALUE);
                if (resultSet.getNextToken() != null) {
                    token = resultSet.getNextToken();
                }
                if (resultSet.getResultType() != ResultSet.ResultType.PAYLOAD) continue;
                actual.addAll(resultSet.getData());
            }
        }).satisfies(new ThrowingConsumer[]{t -> {
            ListAssert cfr_ignored_0 = (ListAssert)FlinkAssertions.assertThatChainOfCauses((Throwable)t).anySatisfy(t1 -> condition.matches((Object)t1.getMessage()));
        }});
        Assertions.assertThat((List)this.getDefaultResultSet().getData()).containsAll(actual);
    }

    private SessionHandle createInitializedSession() {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat1", (Catalog)new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", (Catalog)new GenericInMemoryCatalog("cat2")).build();
        SessionHandle sessionHandle = service.openSession(environment);
        TableEnvironmentInternal tableEnv = service.getSession(sessionHandle).createExecutor().getTableEnvironment();
        tableEnv.executeSql("CREATE DATABASE cat1.db1");
        tableEnv.executeSql("CREATE TEMPORARY TABLE cat1.db1.tbl1 WITH ('connector' = 'values')");
        tableEnv.executeSql("CREATE TABLE cat1.db1.tbl2 WITH('connector' = 'values')");
        tableEnv.executeSql("CREATE TEMPORARY VIEW cat1.db1.tbl3 AS SELECT 1");
        tableEnv.executeSql("CREATE VIEW cat1.db1.tbl4 AS SELECT 1");
        tableEnv.executeSql("CREATE DATABASE cat1.db2");
        tableEnv.executeSql("CREATE TABLE cat1.db2.tbl1 WITH ('connector' = 'values')");
        tableEnv.executeSql("CREATE VIEW cat1.db2.tbl2 AS SELECT 1");
        tableEnv.executeSql("CREATE DATABASE cat2.db0");
        tableEnv.executeSql("CREATE TABLE cat2.db0.tbl0 WITH('connector' = 'values')");
        return sessionHandle;
    }
}

