/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.PeerCache;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;

@InterfaceAudience.Private
public class RemoteBlockReader2
implements BlockReader {
    static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
    private final Peer peer;
    private final DatanodeID datanodeID;
    private final PeerCache peerCache;
    private final ReadableByteChannel in;
    private DataChecksum checksum;
    private final PacketReceiver packetReceiver = new PacketReceiver(true);
    private ByteBuffer curDataSlice = null;
    private long lastSeqNo = -1L;
    private long startOffset;
    private final String filename;
    private final int bytesPerChecksum;
    private final int checksumSize;
    private long bytesNeededToFinish;
    private final boolean isLocal;
    private final boolean verifyChecksum;
    private boolean sentStatusCode = false;
    byte[] skipBuf = null;
    ByteBuffer checksumBytes = null;
    int dataLeft = 0;

    @VisibleForTesting
    public Peer getPeer() {
        return this.peer;
    }

    @Override
    public synchronized int read(byte[] buf, int off, int len) throws IOException {
        if (this.curDataSlice == null || this.curDataSlice.remaining() == 0 && this.bytesNeededToFinish > 0L) {
            this.readNextPacket();
        }
        if (this.curDataSlice.remaining() == 0) {
            return -1;
        }
        int nRead = Math.min(this.curDataSlice.remaining(), len);
        this.curDataSlice.get(buf, off, nRead);
        return nRead;
    }

    public int read(ByteBuffer buf) throws IOException {
        if (this.curDataSlice == null || this.curDataSlice.remaining() == 0 && this.bytesNeededToFinish > 0L) {
            this.readNextPacket();
        }
        if (this.curDataSlice.remaining() == 0) {
            return -1;
        }
        int nRead = Math.min(this.curDataSlice.remaining(), buf.remaining());
        ByteBuffer writeSlice = this.curDataSlice.duplicate();
        writeSlice.limit(writeSlice.position() + nRead);
        buf.put(writeSlice);
        this.curDataSlice.position(writeSlice.position());
        return nRead;
    }

    private void readNextPacket() throws IOException {
        this.packetReceiver.receiveNextPacket(this.in);
        PacketHeader curHeader = this.packetReceiver.getHeader();
        this.curDataSlice = this.packetReceiver.getDataSlice();
        assert (this.curDataSlice.capacity() == curHeader.getDataLen());
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("DFSClient readNextPacket got header " + curHeader));
        }
        if (!curHeader.sanityCheck(this.lastSeqNo)) {
            throw new IOException("BlockReader: error in packet header " + curHeader);
        }
        if (curHeader.getDataLen() > 0) {
            int chunks = 1 + (curHeader.getDataLen() - 1) / this.bytesPerChecksum;
            int checksumsLen = chunks * this.checksumSize;
            assert (this.packetReceiver.getChecksumSlice().capacity() == checksumsLen) : "checksum slice capacity=" + this.packetReceiver.getChecksumSlice().capacity() + " checksumsLen=" + checksumsLen;
            this.lastSeqNo = curHeader.getSeqno();
            if (this.verifyChecksum && this.curDataSlice.remaining() > 0) {
                this.checksum.verifyChunkedSums(this.curDataSlice, this.packetReceiver.getChecksumSlice(), this.filename, curHeader.getOffsetInBlock());
            }
            this.bytesNeededToFinish -= (long)curHeader.getDataLen();
        }
        if (curHeader.getOffsetInBlock() < this.startOffset) {
            int newPos = (int)(this.startOffset - curHeader.getOffsetInBlock());
            this.curDataSlice.position(newPos);
        }
        if (this.bytesNeededToFinish <= 0L) {
            this.readTrailingEmptyPacket();
            if (this.verifyChecksum) {
                this.sendReadResult(DataTransferProtos.Status.CHECKSUM_OK);
            } else {
                this.sendReadResult(DataTransferProtos.Status.SUCCESS);
            }
        }
    }

    @Override
    public synchronized long skip(long n) throws IOException {
        long nSkipped;
        int ret;
        if (this.skipBuf == null) {
            this.skipBuf = new byte[this.bytesPerChecksum];
        }
        for (nSkipped = 0L; nSkipped < n; nSkipped += (long)ret) {
            int toSkip = (int)Math.min(n - nSkipped, (long)this.skipBuf.length);
            ret = this.read(this.skipBuf, 0, toSkip);
            if (ret > 0) continue;
            return nSkipped;
        }
        return nSkipped;
    }

    private void readTrailingEmptyPacket() throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)"Reading empty packet at end of read");
        }
        this.packetReceiver.receiveNextPacket(this.in);
        PacketHeader trailer = this.packetReceiver.getHeader();
        if (!trailer.isLastPacketInBlock() || trailer.getDataLen() != 0) {
            throw new IOException("Expected empty end-of-read packet! Header: " + trailer);
        }
    }

    protected RemoteBlockReader2(String file, String bpid, long blockId, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache) {
        this.isLocal = DFSClient.isLocalAddress(NetUtils.createSocketAddr((String)datanodeID.getXferAddr()));
        this.peer = peer;
        this.datanodeID = datanodeID;
        this.in = peer.getInputStreamChannel();
        this.checksum = checksum;
        this.verifyChecksum = verifyChecksum;
        this.startOffset = Math.max(startOffset, 0L);
        this.filename = file;
        this.peerCache = peerCache;
        this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
        this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
        this.checksumSize = this.checksum.getChecksumSize();
    }

    @Override
    public synchronized void close() throws IOException {
        this.packetReceiver.close();
        this.startOffset = -1L;
        this.checksum = null;
        if (this.peerCache != null && this.sentStatusCode) {
            this.peerCache.put(this.datanodeID, this.peer);
        } else {
            this.peer.close();
        }
    }

    void sendReadResult(DataTransferProtos.Status statusCode) {
        assert (!this.sentStatusCode) : "already sent status code to " + this.peer;
        try {
            RemoteBlockReader2.writeReadResult(this.peer.getOutputStream(), statusCode);
            this.sentStatusCode = true;
        }
        catch (IOException e) {
            LOG.info((Object)("Could not send read status (" + (Object)((Object)statusCode) + ") to datanode " + this.peer.getRemoteAddressString() + ": " + e.getMessage()));
        }
    }

    static void writeReadResult(OutputStream out, DataTransferProtos.Status statusCode) throws IOException {
        DataTransferProtos.ClientReadStatusProto.newBuilder().setStatus(statusCode).build().writeDelimitedTo(out);
        out.flush();
    }

    public static String getFileName(InetSocketAddress s, String poolId, long blockId) {
        return s.toString() + ":" + poolId + ":" + blockId;
    }

    @Override
    public int readAll(byte[] buf, int offset, int len) throws IOException {
        return BlockReaderUtil.readAll(this, buf, offset, len);
    }

    @Override
    public void readFully(byte[] buf, int off, int len) throws IOException {
        BlockReaderUtil.readFully(this, buf, off, len);
    }

    public static BlockReader newBlockReader(String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, boolean verifyChecksum, String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy) throws IOException {
        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
        new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy);
        DataInputStream in = new DataInputStream(peer.getInputStream());
        DataTransferProtos.BlockOpResponseProto status = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
        RemoteBlockReader2.checkSuccess(status, peer, block, file);
        DataTransferProtos.ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo();
        DataChecksum checksum = DataTransferProtoUtil.fromProto(checksumInfo.getChecksum());
        long firstChunkOffset = checksumInfo.getChunkOffset();
        if (firstChunkOffset < 0L || firstChunkOffset > startOffset || firstChunkOffset <= startOffset - (long)checksum.getBytesPerChecksum()) {
            throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file);
        }
        return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, peerCache);
    }

    static void checkSuccess(DataTransferProtos.BlockOpResponseProto status, Peer peer, ExtendedBlock block, String file) throws IOException {
        if (status.getStatus() != DataTransferProtos.Status.SUCCESS) {
            if (status.getStatus() == DataTransferProtos.Status.ERROR_ACCESS_TOKEN) {
                throw new InvalidBlockTokenException("Got access token error for OP_READ_BLOCK, self=" + peer.getLocalAddressString() + ", remote=" + peer.getRemoteAddressString() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp());
            }
            throw new IOException("Got error for OP_READ_BLOCK, self=" + peer.getLocalAddressString() + ", remote=" + peer.getRemoteAddressString() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp());
        }
    }

    @Override
    public int available() throws IOException {
        return 131072;
    }

    @Override
    public boolean isLocal() {
        return this.isLocal;
    }

    @Override
    public boolean isShortCircuit() {
        return false;
    }

    @Override
    public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
        return null;
    }
}

