/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.chaining;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.FlatMapDriver;
import org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class ChainedOperatorsMetricTest
extends TaskTestBase {
    private static final int MEMORY_MANAGER_SIZE = 0x300000;
    private static final int NETWORK_BUFFER_SIZE = 1024;
    private static final TypeSerializerFactory<Record> serFact = RecordSerializerFactory.get();
    private final List<Record> outList = new ArrayList<Record>();
    private static final String HEAD_OPERATOR_NAME = "headoperator";
    private static final String CHAINED_OPERATOR_NAME = "chainedoperator";

    @Test
    public void testOperatorIOMetricReuse() throws Exception {
        this.initEnvironment(0x300000L, 1024);
        this.mockEnv = new MockEnvironmentBuilder().setTaskName(HEAD_OPERATOR_NAME).setManagedMemorySize(0x300000L).setInputSplitProvider(this.inputSplitProvider).setBufferSize(1024).setMetricGroup(new TaskMetricGroup(NoOpMetricRegistry.INSTANCE, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), new JobVertexID(), new ExecutionAttemptID(), "task", 0, 0)).build();
        int keyCnt = 100;
        int valCnt = 20;
        int numRecords = 2000;
        this.addInput(new UniformRecordGenerator(100, 20, false), 0);
        this.addOutput(this.outList);
        this.addChainedOperator();
        this.registerTask(FlatMapDriver.class, DuplicatingFlatMapFunction.class);
        BatchTask testTask = new BatchTask((Environment)this.mockEnv);
        testTask.invoke();
        Assert.assertEquals((long)8000L, (long)this.outList.size());
        TaskMetricGroup taskMetricGroup = this.mockEnv.getMetricGroup();
        TaskIOMetricGroup ioMetricGroup = taskMetricGroup.getIOMetricGroup();
        Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
        Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
        Assert.assertEquals((long)2000L, (long)numRecordsInCounter.getCount());
        Assert.assertEquals((long)8000L, (long)numRecordsOutCounter.getCount());
        OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(HEAD_OPERATOR_NAME);
        OperatorIOMetricGroup ioMetricGroup2 = operatorMetricGroup1.getIOMetricGroup();
        Counter numRecordsInCounter2 = ioMetricGroup2.getNumRecordsInCounter();
        Counter numRecordsOutCounter2 = ioMetricGroup2.getNumRecordsOutCounter();
        Assert.assertEquals((long)2000L, (long)numRecordsInCounter2.getCount());
        Assert.assertEquals((long)4000L, (long)numRecordsOutCounter2.getCount());
        operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(CHAINED_OPERATOR_NAME);
        ioMetricGroup2 = operatorMetricGroup1.getIOMetricGroup();
        numRecordsInCounter2 = ioMetricGroup2.getNumRecordsInCounter();
        numRecordsOutCounter2 = ioMetricGroup2.getNumRecordsOutCounter();
        Assert.assertEquals((long)4000L, (long)numRecordsInCounter2.getCount());
        Assert.assertEquals((long)8000L, (long)numRecordsOutCounter2.getCount());
    }

    private void addChainedOperator() {
        TaskConfig chainedConfig = new TaskConfig(new Configuration());
        chainedConfig.addInputToGroup(0);
        chainedConfig.setInputSerializer(serFact, 0);
        chainedConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        chainedConfig.setOutputSerializer(serFact);
        chainedConfig.setDriverStrategy(DriverStrategy.FLAT_MAP);
        chainedConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(DuplicatingFlatMapFunction.class));
        this.getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, chainedConfig, CHAINED_OPERATOR_NAME);
    }

    public static class DuplicatingFlatMapFunction
    extends RichFlatMapFunction<Record, Record> {
        private static final long serialVersionUID = -1152068682935346164L;

        public void flatMap(Record value, Collector<Record> out) throws Exception {
            out.collect((Object)value);
            out.collect((Object)value);
        }
    }
}

