/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.api.MessageOutputStream;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Log4jUtils;
import org.apache.ratis.util.SizeInBytes;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;

public abstract class MessageStreamApiTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
    public static final int NUM_SERVERS = 3;
    private static final SizeInBytes SUBMESSAGE_SIZE = SizeInBytes.ONE_KB;

    public MessageStreamApiTests() {
        Log4jUtils.setLogLevel((Logger)RaftServer.Division.LOG, (Level)Level.DEBUG);
        Log4jUtils.setLogLevel((Logger)RaftClient.LOG, (Level)Level.DEBUG);
    }

    @Test
    public void testStream() throws Exception {
        RaftProperties p = this.getProperties();
        p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
        this.runWithNewCluster(3, arg_0 -> this.runTestStream(arg_0));
    }

    void runTestStream(CLUSTER cluster) throws Exception {
        RaftTestUtil.waitForLeader(cluster);
        int numParts = 9;
        int endOfRequest = 6;
        StringBuilder key = new StringBuilder();
        try (RaftClient client = cluster.createClient();
             MessageOutputStream out = client.getMessageStreamApi().stream();){
            for (int i = 1; i <= 9; ++i) {
                key.append(i);
                out.sendAsync((Message)new RaftTestUtil.SimpleMessage(i + ""), i == 6);
            }
        }
        String k = key.toString();
        try (RaftClient client = cluster.createClient();){
            String k1 = k.substring(0, 6);
            RaftClientReply r1 = client.io().sendReadOnly((Message)new RaftTestUtil.SimpleMessage(k1));
            Assert.assertTrue((boolean)r1.isSuccess());
            String k2 = k.substring(6);
            RaftClientReply r2 = client.io().sendReadOnly((Message)new RaftTestUtil.SimpleMessage(k2));
            Assert.assertTrue((boolean)r2.isSuccess());
        }
    }

    @Test
    public void testStreamAsync() throws Exception {
        RaftProperties p = this.getProperties();
        RaftClientConfigKeys.MessageStream.setSubmessageSize((RaftProperties)p, (SizeInBytes)SUBMESSAGE_SIZE);
        p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
        this.runWithNewCluster(3, arg_0 -> this.runTestStreamAsync(arg_0));
        RaftClientConfigKeys.MessageStream.setSubmessageSize((RaftProperties)p);
    }

    void runTestStreamAsync(CLUSTER cluster) throws Exception {
        RaftClientReply reply;
        RaftTestUtil.waitForLeader(cluster);
        ByteString bytes = ByteString.EMPTY;
        int i = 0;
        while (i < 10) {
            String s = (char)(65 + i) + "1234567";
            this.LOG.info("s=" + s);
            ByteString b = ByteString.copyFrom((String)s, (Charset)StandardCharsets.UTF_8);
            Assert.assertEquals((long)8L, (long)b.size());
            for (int j = 0; j < 128; ++j) {
                bytes = bytes.concat(b);
            }
            Assert.assertEquals((long)(++i * SUBMESSAGE_SIZE.getSizeInt()), (long)bytes.size());
        }
        try (RaftClient client = cluster.createClient();){
            reply = (RaftClientReply)client.getMessageStreamApi().streamAsync(Message.valueOf((ByteString)bytes)).get();
            Assert.assertTrue((boolean)reply.isSuccess());
        }
        client = cluster.createClient();
        var4_5 = null;
        try {
            reply = client.io().sendReadOnly((Message)new RaftTestUtil.SimpleMessage(bytes.toString(StandardCharsets.UTF_8)));
            Assert.assertTrue((boolean)reply.isSuccess());
        }
        catch (Throwable throwable) {
            var4_5 = throwable;
            throw throwable;
        }
        finally {
            if (client != null) {
                if (var4_5 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable throwable) {
                        var4_5.addSuppressed(throwable);
                    }
                } else {
                    client.close();
                }
            }
        }
    }
}

