/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.tubemq;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.tubemq.TubemqOptions;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.consumer.ConsumerResult;
import org.apache.tubemq.client.consumer.PullMessageConsumer;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TubemqSourceFunction<T>
extends RichParallelSourceFunction<T>
implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(TubemqSourceFunction.class);
    private static final String TUBE_OFFSET_STATE = "tube-offset-state";
    private static final String SPLIT_COMMA = ",";
    private static final String SPLIT_COLON = ":";
    private final String masterAddress;
    private final String topic;
    private final TreeSet<String> tidSet;
    private final String consumerGroup;
    private final DeserializationSchema<T> deserializationSchema;
    private final String sessionKey;
    private final boolean consumeFromMax;
    private final Duration messageNotFoundWaitPeriod;
    private final Duration maxIdleTime;
    private volatile boolean running;
    private transient ListState<Tuple2<String, Long>> offsetsState;
    private transient Map<String, Long> currentOffsets;
    private transient TubeSingleSessionFactory messageSessionFactory;
    private transient PullMessageConsumer messagePullConsumer;

    public TubemqSourceFunction(String masterAddress, String topic, TreeSet<String> tidSet, String consumerGroup, DeserializationSchema<T> deserializationSchema, Configuration configuration) {
        Preconditions.checkNotNull((Object)masterAddress, (String)"The master address must not be null.");
        Preconditions.checkNotNull((Object)topic, (String)"The topic must not be null.");
        Preconditions.checkNotNull(tidSet, (String)"The tid set must not be null.");
        Preconditions.checkNotNull((Object)consumerGroup, (String)"The consumer group must not be null.");
        Preconditions.checkNotNull(deserializationSchema, (String)"The deserialization schema must not be null.");
        Preconditions.checkNotNull((Object)configuration, (String)"The configuration must not be null.");
        this.masterAddress = masterAddress;
        this.topic = topic;
        this.tidSet = tidSet;
        this.consumerGroup = consumerGroup;
        this.deserializationSchema = deserializationSchema;
        this.sessionKey = configuration.getString(TubemqOptions.SESSION_KEY);
        this.consumeFromMax = configuration.getBoolean(TubemqOptions.BOOTSTRAP_FROM_MAX);
        this.messageNotFoundWaitPeriod = TimeUtils.parseDuration((String)configuration.getString(TubemqOptions.MESSAGE_NOT_FOUND_WAIT_PERIOD));
        this.maxIdleTime = TimeUtils.parseDuration((String)configuration.getString(TubemqOptions.SOURCE_MAX_IDLE_TIME));
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        TupleTypeInfo typeInformation = new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO});
        ListStateDescriptor stateDescriptor = new ListStateDescriptor(TUBE_OFFSET_STATE, (TypeInformation)typeInformation);
        OperatorStateStore stateStore = context.getOperatorStateStore();
        this.offsetsState = stateStore.getListState(stateDescriptor);
        this.currentOffsets = new HashMap<String, Long>();
        if (context.isRestored()) {
            for (Tuple2 tubeOffset : (Iterable)this.offsetsState.get()) {
                this.currentOffsets.put((String)tubeOffset.f0, (Long)tubeOffset.f1);
            }
            LOG.info("Successfully restore the offsets {}.", this.currentOffsets);
        } else {
            LOG.info("No restore offsets.");
        }
    }

    public void open(Configuration parameters) throws Exception {
        String firstAddress = this.masterAddress.split(SPLIT_COMMA)[0];
        String[] firstAddressSegments = firstAddress.split(SPLIT_COLON);
        String firstHost = firstAddressSegments[0];
        int firstPort = Integer.parseInt(firstAddressSegments[1]);
        InetSocketAddress firstSocketAddress = new InetSocketAddress(firstHost, firstPort);
        InetAddress localAddress = ConnectionUtils.findConnectingAddress((InetSocketAddress)firstSocketAddress, (long)2000L, (long)400L);
        String localhost = localAddress.getHostAddress();
        ConsumerConfig consumerConfig = new ConsumerConfig(localhost, this.masterAddress, this.consumerGroup);
        consumerConfig.setConsumeModel(this.consumeFromMax ? 1 : 0);
        consumerConfig.setMsgNotFoundWaitPeriodMs(this.messageNotFoundWaitPeriod.toMillis());
        int numTasks = this.getRuntimeContext().getNumberOfParallelSubtasks();
        this.messageSessionFactory = new TubeSingleSessionFactory((TubeClientConfig)consumerConfig);
        this.messagePullConsumer = this.messageSessionFactory.createPullConsumer(consumerConfig);
        this.messagePullConsumer.subscribe(this.topic, this.tidSet);
        this.messagePullConsumer.completeSubscribe(this.sessionKey, numTasks, true, this.currentOffsets);
        this.running = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        Instant lastConsumeInstant = Instant.now();
        while (this.running) {
            ConsumerResult consumeResult = this.messagePullConsumer.getMessage();
            if (!consumeResult.isSuccess()) {
                LOG.info("Could not consume messages from tubemq (errcode: {}, errmsg: {}).", (Object)consumeResult.getErrCode(), (Object)consumeResult.getErrMsg());
                Duration idleTime = Duration.between(lastConsumeInstant, Instant.now());
                if (idleTime.compareTo(this.maxIdleTime) <= 0) continue;
                LOG.info("Mark this source as temporarily idle.");
                ctx.markAsTemporarilyIdle();
                continue;
            }
            List messageList = consumeResult.getMessageList();
            ArrayList records = new ArrayList();
            if (messageList != null) {
                lastConsumeInstant = Instant.now();
                for (Message message : messageList) {
                    Object record = this.deserializationSchema.deserialize(message.getData());
                    records.add(record);
                }
            }
            Iterator iterator = ctx.getCheckpointLock();
            synchronized (iterator) {
                for (Object record : records) {
                    ctx.collect(record);
                }
                this.currentOffsets.put(consumeResult.getPartitionKey(), consumeResult.getCurrOffset());
            }
            ConsumerResult confirmResult = this.messagePullConsumer.confirmConsume(consumeResult.getConfirmContext(), true);
            if (confirmResult.isSuccess()) continue;
            LOG.warn("Could not confirm messages to tubemq (errcode: {}, errmsg: {}).", (Object)confirmResult.getErrCode(), (Object)confirmResult.getErrMsg());
        }
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.offsetsState.clear();
        for (Map.Entry<String, Long> entry : this.currentOffsets.entrySet()) {
            this.offsetsState.add((Object)new Tuple2((Object)entry.getKey(), (Object)entry.getValue()));
        }
        LOG.info("Successfully save the offsets in checkpoint {}: {}.", (Object)context.getCheckpointId(), this.currentOffsets);
    }

    public void cancel() {
        this.running = false;
    }

    public void close() throws Exception {
        this.cancel();
        if (this.messagePullConsumer != null) {
            try {
                this.messagePullConsumer.shutdown();
            }
            catch (Throwable t) {
                LOG.warn("Could not properly shutdown the tubemq pull consumer.", t);
            }
        }
        if (this.messageSessionFactory != null) {
            try {
                this.messageSessionFactory.shutdown();
            }
            catch (Throwable t) {
                LOG.warn("Could not properly shutdown the tubemq session factory.", t);
            }
        }
        super.close();
        LOG.info("Closed the tubemq source.");
    }
}

