/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.exceptions.HandshakeException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class AkkaRpcActorHandshakeTest
extends TestLogger {
    private static final Time timeout = Time.seconds((long)10L);
    private static AkkaRpcService akkaRpcService1;
    private static AkkaRpcService akkaRpcService2;
    private static WrongVersionAkkaRpcService wrongVersionAkkaRpcService;

    @BeforeClass
    public static void setupClass() {
        ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
        ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
        ActorSystem wrongVersionActorSystem = AkkaUtils.createDefaultActorSystem();
        AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.defaultConfiguration();
        akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
        akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
        wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(wrongVersionActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        ArrayList<CompletableFuture> terminationFutures = new ArrayList<CompletableFuture>(3);
        terminationFutures.add(akkaRpcService1.stopService());
        terminationFutures.add(akkaRpcService2.stopService());
        terminationFutures.add(wrongVersionAkkaRpcService.stopService());
        FutureUtils.waitForAll(terminationFutures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testVersionMatchBetweenRpcComponents() throws Exception {
        AkkaRpcActorTest.DummyRpcEndpoint rpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint((RpcService)akkaRpcService1);
        int value = 42;
        rpcEndpoint.setFoobar(42);
        rpcEndpoint.start();
        try {
            AkkaRpcActorTest.DummyRpcGateway dummyRpcGateway = (AkkaRpcActorTest.DummyRpcGateway)akkaRpcService2.connect(rpcEndpoint.getAddress(), AkkaRpcActorTest.DummyRpcGateway.class).get();
            Assert.assertThat((Object)dummyRpcGateway.foobar().get(), (Matcher)Matchers.equalTo((Object)42));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)rpcEndpoint, (Time)timeout);
        }
    }

    @Test
    public void testVersionMismatchBetweenRpcComponents() throws Exception {
        AkkaRpcActorTest.DummyRpcEndpoint rpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint((RpcService)akkaRpcService1);
        rpcEndpoint.start();
        try {
            try {
                wrongVersionAkkaRpcService.connect(rpcEndpoint.getAddress(), AkkaRpcActorTest.DummyRpcGateway.class).get();
                Assert.fail((String)"Expected HandshakeException.");
            }
            catch (ExecutionException ee) {
                Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.instanceOf(HandshakeException.class));
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)rpcEndpoint, (Time)timeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWrongGatewayEndpointConnection() throws Exception {
        AkkaRpcActorTest.DummyRpcEndpoint rpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint((RpcService)akkaRpcService1);
        rpcEndpoint.start();
        CompletableFuture futureGateway = akkaRpcService2.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
        try {
            futureGateway.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"We expected a HandshakeException.");
        }
        catch (ExecutionException executionException) {
            Assert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)executionException), (Matcher)Matchers.instanceOf(HandshakeException.class));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)rpcEndpoint, (Time)timeout);
        }
    }

    private static interface WrongRpcGateway
    extends RpcGateway {
        public CompletableFuture<Boolean> barfoo();

        public void tell(String var1);
    }

    private static class WrongVersionAkkaRpcService
    extends AkkaRpcService {
        WrongVersionAkkaRpcService(ActorSystem actorSystem, AkkaRpcServiceConfiguration configuration) {
            super(actorSystem, configuration);
        }

        protected int getVersion() {
            return -1;
        }
    }
}

