/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.tubemq;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.tubemq.TubemqOptions;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TubemqSinkFunction<T>
extends RichSinkFunction<T>
implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(TubemqSinkFunction.class);
    private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm";
    private final String masterAddress;
    private final String topic;
    private final String tid;
    private final SerializationSchema<T> serializationSchema;
    private transient MessageProducer producer;
    private transient MessageSessionFactory sessionFactory;
    private final int maxRetries;

    public TubemqSinkFunction(String topic, String masterAddress, SerializationSchema<T> serializationSchema, Configuration configuration) {
        Preconditions.checkNotNull((Object)topic, (String)"The topic must not be null.");
        Preconditions.checkNotNull((Object)masterAddress, (String)"The master address must not be null.");
        Preconditions.checkNotNull(serializationSchema, (String)"The serialization schema must not be null.");
        Preconditions.checkNotNull((Object)configuration, (String)"The configuration must not be null.");
        this.topic = topic;
        this.masterAddress = masterAddress;
        this.serializationSchema = serializationSchema;
        this.tid = configuration.getString(TubemqOptions.TID);
        this.maxRetries = configuration.getInteger(TubemqOptions.MAX_RETRIES);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) {
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        TubeClientConfig tubeClientConfig = new TubeClientConfig(this.masterAddress);
        this.sessionFactory = new TubeSingleSessionFactory(tubeClientConfig);
        this.producer = this.sessionFactory.createProducer();
        HashSet<String> hashSet = new HashSet<String>();
        hashSet.add(this.topic);
        this.producer.publish(hashSet);
    }

    public void invoke(T in, SinkFunction.Context context) throws Exception {
        int retries = 0;
        Exception exception = null;
        while (this.maxRetries <= 0 || retries < this.maxRetries) {
            try {
                MessageSentResult sendResult;
                byte[] body = this.serializationSchema.serialize(in);
                Message message = new Message(this.topic, body);
                if (StringUtils.isNotBlank((CharSequence)this.tid)) {
                    SimpleDateFormat sdf = new SimpleDateFormat(SYSTEM_HEADER_TIME_FORMAT);
                    long currTimeMillis = System.currentTimeMillis();
                    message.putSystemHeader(this.tid, sdf.format(new Date(currTimeMillis)));
                }
                if ((sendResult = this.producer.sendMessage(message)).isSuccess()) {
                    return;
                }
                LOG.warn("Send msg fail, error code: {}, error message: {}", (Object)sendResult.getErrCode(), (Object)sendResult.getErrMsg());
            }
            catch (Exception e) {
                LOG.warn("Could not properly send the message to hippo (retries: {}).", (Object)retries, (Object)e);
                ++retries;
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
            }
        }
        throw new IOException("Could not properly send the message to hippo.", exception);
    }

    public void close() throws Exception {
        try {
            if (this.producer != null) {
                this.producer.shutdown();
                this.producer = null;
            }
            if (this.sessionFactory != null) {
                this.sessionFactory.shutdown();
                this.sessionFactory = null;
            }
        }
        catch (Throwable e) {
            LOG.error("Shutdown producer error", e);
        }
        finally {
            super.close();
        }
    }
}

