/*
 * Decompiled with CFR 0.152.
 */
package com.taosdata.jdbc.ws;

import com.taosdata.jdbc.AbstractConnection;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.common.Column;
import com.taosdata.jdbc.common.ColumnInfo;
import com.taosdata.jdbc.common.DataLengthCfg;
import com.taosdata.jdbc.common.SerializeBlock;
import com.taosdata.jdbc.common.TableInfo;
import com.taosdata.jdbc.enums.FieldBindType;
import com.taosdata.jdbc.rs.ConnectionParam;
import com.taosdata.jdbc.utils.ReqId;
import com.taosdata.jdbc.utils.SyncObj;
import com.taosdata.jdbc.ws.AbsWSPreparedStatement;
import com.taosdata.jdbc.ws.Transport;
import com.taosdata.jdbc.ws.entity.Action;
import com.taosdata.jdbc.ws.entity.Code;
import com.taosdata.jdbc.ws.entity.CommonResp;
import com.taosdata.jdbc.ws.entity.Request;
import com.taosdata.jdbc.ws.stmt2.entity.Field;
import com.taosdata.jdbc.ws.stmt2.entity.RequestFactory;
import com.taosdata.jdbc.ws.stmt2.entity.Stmt2ExecResp;
import com.taosdata.jdbc.ws.stmt2.entity.Stmt2PrepareResp;
import com.taosdata.jdbc.ws.stmt2.entity.Stmt2Resp;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WSEWPreparedStatement
extends AbsWSPreparedStatement {
    private static final Logger log = LoggerFactory.getLogger(WSEWPreparedStatement.class);
    private final boolean copyData;
    private final int writeThreadNum;
    private ThreadPoolExecutor writerThreads;
    private ArrayList<ArrayBlockingQueue<Map<Integer, Column>>> writeQueueList;
    private final AtomicInteger remainingUnprocessedRows = new AtomicInteger(0);
    private final AtomicInteger batchInsertedRows = new AtomicInteger(0);
    private final AtomicInteger flushIn = new AtomicInteger(0);
    private List<WorkerThread> workerThreadList;
    private final SyncObj syncObj = new SyncObj();
    private int addBatchCounts = 0;

    public WSEWPreparedStatement(Transport transport, ConnectionParam param, String database, AbstractConnection connection, String sql, Long instanceId, Stmt2PrepareResp prepareResp) throws SQLException {
        super(transport, param, database, connection, sql, instanceId, prepareResp);
        this.copyData = param.isCopyData();
        this.writeThreadNum = param.getBackendWriteThreadNum();
        this.writerThreads = (ThreadPoolExecutor)Executors.newFixedThreadPool(this.writeThreadNum);
        this.writeQueueList = new ArrayList(this.writeThreadNum);
        for (int i = 0; i < this.writeThreadNum; ++i) {
            this.writeQueueList.add(new ArrayBlockingQueue(param.getCacheSizeByRow()));
        }
        this.workerThreadList = new ArrayList<WorkerThread>(this.writeThreadNum);
        CommonResp res = null;
        for (int i = 0; i < this.writeThreadNum; ++i) {
            WorkerThread workerThread = new WorkerThread(this.writeQueueList.get(i), sql, transport, param, this.closed, this.remainingUnprocessedRows, this.batchInsertedRows, this.flushIn, this.syncObj);
            this.workerThreadList.add(workerThread);
            res = workerThread.initStmt();
            if (res.getCode() != Code.SUCCESS.getCode()) break;
        }
        if (res != null && res.getCode() != Code.SUCCESS.getCode()) {
            for (WorkerThread workerThread : this.workerThreadList) {
                workerThread.releaseStmt();
            }
            throw new SQLException("(0x" + Integer.toHexString(res.getCode()) + "):" + res.getMessage());
        }
        for (WorkerThread workerThread : this.workerThreadList) {
            this.writerThreads.submit(workerThread);
        }
    }

    private Map<Integer, Column> copyMap(Map<Integer, Column> originalMap) {
        HashMap<Integer, Column> dstMap = new HashMap<Integer, Column>();
        originalMap.forEach((key, src) -> {
            Column dst = src;
            if (this.copyData && src.getData() instanceof byte[]) {
                byte[] srcBytes = (byte[])src.getData();
                byte[] copiedValue = new byte[srcBytes.length];
                System.arraycopy(srcBytes, 0, copiedValue, 0, srcBytes.length);
                dst = new Column(copiedValue, src.getType(), src.getIndex());
            }
            dstMap.put((Integer)key, dst);
        });
        return dstMap;
    }

    @Override
    public boolean execute() throws SQLException {
        if (this.isClosed()) {
            throw TSDBError.createSQLException(8964);
        }
        if (!this.isInsert) {
            throw TSDBError.createSQLException(8963, "Only support insert.");
        }
        this.executeUpdate();
        return !this.isInsert;
    }

    @Override
    public ResultSet executeQuery() throws SQLException {
        throw TSDBError.createSQLException(8963, "Only support insert.");
    }

    @Override
    public int executeUpdate() throws SQLException {
        if (this.isClosed()) {
            throw TSDBError.createSQLException(8964);
        }
        this.waitWriteCompleted();
        Exception lastError = null;
        for (WorkerThread workerThread : this.workerThreadList) {
            Exception tempEx = workerThread.getAndClearLastError();
            if (tempEx == null || lastError != null) continue;
            lastError = tempEx;
        }
        int totalRowsInserted = this.batchInsertedRows.getAndSet(0);
        if (lastError != null) {
            throw new SQLException("InsertedRows: " + totalRowsInserted + ", ErrorInfo: " + lastError.getMessage(), "", 9104);
        }
        return totalRowsInserted;
    }

    private void waitWriteCompleted() {
        this.flushIn.incrementAndGet();
        while (this.remainingUnprocessedRows.get() != 0) {
            try {
                this.syncObj.await();
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    private void checkDataLength(Map<Integer, Column> map) throws SQLException {
        for (int i = 0; i < this.fields.size(); ++i) {
            Object data;
            Field field = (Field)this.fields.get(i);
            Column column = map.get(i + 1);
            if (DataLengthCfg.getDataLength(column.getType()) != null || field.getBindType() == FieldBindType.TAOS_FIELD_TBNAME.getValue()) continue;
            if (column.getData() instanceof byte[] && ((byte[])(data = (byte[])column.getData())).length > field.getBytes()) {
                throw TSDBError.createSQLException(8963, "data length is too long, column index " + i);
            }
            if (!(column.getData() instanceof String) || ((String)(data = (Object)((String)column.getData()))).getBytes().length <= field.getBytes()) continue;
            throw TSDBError.createSQLException(8963, "data length is too long, column index " + i);
        }
    }

    @Override
    public void addBatch() throws SQLException {
        if (this.colOrderedMap.size() == this.fields.size()) {
            int hashCode;
            Object o;
            Map<Integer, Column> map = this.copyMap(this.colOrderedMap);
            if (this.param.isStrictCheck()) {
                this.checkDataLength(map);
            }
            if ((o = map.get(this.toBeBindTableNameIndex + 1).getData()) instanceof String) {
                hashCode = o.hashCode();
            } else if (o instanceof byte[]) {
                hashCode = Arrays.hashCode((byte[])o);
            } else {
                throw TSDBError.createSQLException(8963, "error type tbname.");
            }
            if (hashCode < 0) {
                hashCode = -hashCode;
            }
            int index = hashCode % this.writeThreadNum;
            try {
                this.writeQueueList.get(index).put(map);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.remainingUnprocessedRows.incrementAndGet();
            ++this.addBatchCounts;
        } else {
            throw TSDBError.createSQLException(8963, "Only support standard jdbc bind api.");
        }
    }

    @Override
    public int[] executeBatch() throws SQLException {
        int[] ints = new int[this.addBatchCounts];
        int len = ints.length;
        for (int i = 0; i < len; ++i) {
            ints[i] = -2;
        }
        this.addBatchCounts = 0;
        return ints;
    }

    @Override
    public void close() throws SQLException {
        this.waitWriteCompleted();
        if (this.isClosed()) {
            return;
        }
        super.close();
        if (this.writerThreads != null) {
            while (this.writerThreads.getActiveCount() != 0) {
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException ignored) {
                    Thread.currentThread().interrupt();
                }
            }
            if (!this.writerThreads.isShutdown()) {
                this.writerThreads.shutdown();
            }
        }
        for (WorkerThread workerThread : this.workerThreadList) {
            workerThread.releaseStmt();
        }
    }

    @Override
    public ResultSetMetaData getMetaData() throws SQLException {
        throw TSDBError.createSQLException(8962, "Fast write mode only support insert.");
    }

    @Override
    public void columnDataAddBatch() throws SQLException {
        throw TSDBError.createSQLException(8962);
    }

    @Override
    public void columnDataExecuteBatch() throws SQLException {
        throw TSDBError.createSQLException(8962);
    }

    @Override
    public void columnDataCloseBatch() throws SQLException {
        throw TSDBError.createSQLException(8962);
    }

    static class WorkerThread
    implements Runnable {
        private static final Logger log = LoggerFactory.getLogger(WorkerThread.class);
        private final ArrayBlockingQueue<Map<Integer, Column>> dataQueue;
        private final int batchSize;
        private final String sql;
        private long reqId;
        private long stmtId = 0L;
        private List<Field> fields;
        private int toBeBindTableNameIndex;
        private int toBeBindTagCount;
        private int toBeBindColCount;
        private int precision;
        private int reconnectCount = 0;
        private final HashMap<ByteBuffer, TableInfo> tableInfoMap = new HashMap();
        private TableInfo tableInfo = TableInfo.getEmptyTableInfo();
        private final Transport transport;
        private final ConnectionParam connectionParam;
        private final AtomicBoolean isClosed;
        private final AtomicInteger remainingUnprocessedRows;
        private final AtomicInteger batchInsertedRows;
        private final AtomicInteger flushIn;
        private final SyncObj syncObj;
        private Exception lastError;

        public WorkerThread(ArrayBlockingQueue<Map<Integer, Column>> taskQueue, String sql, Transport transport, ConnectionParam param, AtomicBoolean isClosed, AtomicInteger remainingUnprocessedRows, AtomicInteger batchInsertedRows, AtomicInteger flushIn, SyncObj syncObj) {
            this.dataQueue = taskQueue;
            this.batchSize = param.getBatchSizeByRow();
            this.sql = sql;
            this.transport = transport;
            this.connectionParam = param;
            this.isClosed = isClosed;
            this.remainingUnprocessedRows = remainingUnprocessedRows;
            this.batchInsertedRows = batchInsertedRows;
            this.flushIn = flushIn;
            this.syncObj = syncObj;
        }

        public CommonResp initStmt() throws SQLException {
            long tmpReqID = ReqId.getReqID();
            Request request = RequestFactory.generateInit(this.reqId, true, true);
            Stmt2Resp resp = (Stmt2Resp)this.transport.send(request);
            if (Code.SUCCESS.getCode() != resp.getCode()) {
                return resp;
            }
            long tmpStmtId = resp.getStmtId();
            this.reqId = tmpReqID;
            this.stmtId = tmpStmtId;
            Request prepare = RequestFactory.generatePrepare(this.stmtId, this.reqId, this.sql);
            Stmt2PrepareResp prepareResp = (Stmt2PrepareResp)this.transport.send(prepare);
            this.fields = prepareResp.getFields();
            if (!this.fields.isEmpty()) {
                this.precision = this.fields.get(0).getPrecision();
            }
            this.toBeBindTagCount = 0;
            this.toBeBindColCount = 0;
            for (int i = 0; i < this.fields.size(); ++i) {
                Field field = this.fields.get(i);
                if (field.getBindType() == FieldBindType.TAOS_FIELD_TBNAME.getValue()) {
                    this.toBeBindTableNameIndex = i;
                }
                if (field.getBindType() == FieldBindType.TAOS_FIELD_TAG.getValue()) {
                    ++this.toBeBindTagCount;
                }
                if (field.getBindType() != FieldBindType.TAOS_FIELD_COL.getValue()) continue;
                ++this.toBeBindColCount;
            }
            return prepareResp;
        }

        public void releaseStmt() throws SQLException {
            if (this.stmtId != 0L && this.transport.isConnected()) {
                Request close = RequestFactory.generateClose(this.stmtId, this.reqId);
                this.transport.send(close);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            int flushInLocal = 0;
            while (!this.isClosed.get() || !this.dataQueue.isEmpty()) {
                int polledRow = 0;
                int size = 0;
                try {
                    if (this.dataQueue.isEmpty()) {
                        Map<Integer, Column> map;
                        if (this.flushIn.get() != flushInLocal) {
                            flushInLocal = this.flushIn.get();
                            this.syncObj.signal();
                        }
                        if ((map = this.dataQueue.poll(10L, TimeUnit.MILLISECONDS)) == null) continue;
                        this.processOneRow(map);
                        ++polledRow;
                    }
                    size = Math.min(this.dataQueue.size(), this.batchSize - polledRow);
                    for (int i = 0; i < size; ++i) {
                        Map<Integer, Column> map = this.dataQueue.take();
                        this.processOneRow(map);
                    }
                    if (!this.isTableInfoEmpty()) {
                        this.tableInfoMap.put(this.tableInfo.getTableName(), this.tableInfo);
                    }
                    this.writeBlockWithRetry();
                    this.tableInfo = TableInfo.getEmptyTableInfo();
                    this.tableInfoMap.clear();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                catch (SQLException e) {
                    this.lastError = e;
                    log.error("Error in write data to server, stmt id: {}, req id: {}rows: {}, code: {}, msg: {}", new Object[]{this.stmtId, this.reqId, polledRow + size, e.getErrorCode(), e.getMessage()});
                }
                finally {
                    if (size + polledRow <= 0) continue;
                    this.remainingUnprocessedRows.addAndGet(-size - polledRow);
                }
            }
            this.syncObj.signal();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void writeBlockWithRetry() throws SQLException {
            for (int i = 0; i < this.connectionParam.getRetryTimes(); ++i) {
                ByteBuf rawBlock;
                try {
                    rawBlock = SerializeBlock.getStmt2BindBlock(this.reqId, this.stmtId, this.tableInfoMap, this.toBeBindTableNameIndex, this.toBeBindTagCount, this.toBeBindColCount, this.precision);
                    log.trace("buffer allocated: {}", (Object)Integer.toHexString(System.identityHashCode(rawBlock)));
                }
                catch (Exception e) {
                    this.lastError = e;
                    log.error("Error in serialize data to block, stmt id: {}, req id: {}", new Object[]{this.stmtId, this.reqId, e});
                    break;
                }
                try {
                    Stmt2Resp bindResp = (Stmt2Resp)this.transport.send(Action.STMT2_BIND.getAction(), this.reqId, rawBlock);
                    if (Code.SUCCESS.getCode() != bindResp.getCode()) {
                        throw new SQLException("(0x" + Integer.toHexString(bindResp.getCode()) + "):" + bindResp.getMessage());
                    }
                    Request request = RequestFactory.generateExec(this.stmtId, this.reqId);
                    Stmt2ExecResp resp = (Stmt2ExecResp)this.transport.send(request);
                    if (Code.SUCCESS.getCode() != resp.getCode()) {
                        throw new SQLException("(0x" + Integer.toHexString(resp.getCode()) + "):" + resp.getMessage());
                    }
                    int affectedRows = resp.getAffected();
                    this.batchInsertedRows.addAndGet(affectedRows);
                    return;
                }
                catch (SQLException e) {
                    if (i == this.connectionParam.getRetryTimes() - 1) {
                        this.lastError = e;
                    }
                    log.error("Error in writeBlockWithRetry, stmt id: {}, req id: {}retry times: {}, code: {}, msg: {}", new Object[]{this.stmtId, this.reqId, i, e.getErrorCode(), e.getMessage()});
                    int realReconnectCount = this.transport.getReconnectCount();
                    if (this.reconnectCount != realReconnectCount) {
                        log.error("connection reestablished, need to init stmt obj");
                        this.reconnectCount = realReconnectCount;
                        this.initStmt();
                        continue;
                    }
                    if (e.getErrorCode() != 8990 && e.getErrorCode() != 8961 && e.getErrorCode() != 8984) break;
                    continue;
                }
                finally {
                    log.trace("buffer {}, refCnt: {}", (Object)Integer.toHexString(System.identityHashCode(rawBlock)), (Object)rawBlock.refCnt());
                }
            }
        }

        private boolean isTableInfoEmpty() {
            return this.tableInfo.getTableName().capacity() == 0 && this.tableInfo.getTagInfo().isEmpty() && this.tableInfo.getDataList().isEmpty();
        }

        private void bindColToTableInfo(TableInfo tableInfo, Map<Integer, Column> colOrderedMap) {
            for (ColumnInfo columnInfo : tableInfo.getDataList()) {
                columnInfo.add(colOrderedMap.get(columnInfo.getIndex()).getData());
            }
        }

        public void processOneRow(Map<Integer, Column> colOrderedMap) throws SQLException {
            if (this.isTableInfoEmpty()) {
                AbsWSPreparedStatement.bindAllToTableInfo(this.fields, colOrderedMap, this.tableInfo);
            } else {
                ByteBuffer tempTableName;
                Object tbname = colOrderedMap.get(this.toBeBindTableNameIndex + 1).getData();
                if (tbname instanceof String) {
                    tempTableName = ByteBuffer.wrap(((String)tbname).getBytes());
                } else if (tbname instanceof byte[]) {
                    tempTableName = ByteBuffer.wrap((byte[])tbname);
                } else {
                    throw TSDBError.createSQLException(8963, "table name must be string or binary");
                }
                if (this.tableInfo.getTableName().equals(tempTableName)) {
                    this.bindColToTableInfo(this.tableInfo, colOrderedMap);
                } else if (this.tableInfoMap.containsKey(tempTableName)) {
                    TableInfo tbInfo = this.tableInfoMap.get(tempTableName);
                    this.bindColToTableInfo(tbInfo, colOrderedMap);
                } else {
                    this.tableInfoMap.put(this.tableInfo.getTableName(), this.tableInfo);
                    this.tableInfo = TableInfo.getEmptyTableInfo();
                    AbsWSPreparedStatement.bindAllToTableInfo(this.fields, colOrderedMap, this.tableInfo);
                }
            }
        }

        public Exception getAndClearLastError() {
            Exception tmp = this.lastError;
            this.lastError = null;
            return tmp;
        }
    }
}

