/*
 * Decompiled with CFR 0.152.
 */
package tachyon.worker.nio;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tachyon.Constants;
import tachyon.conf.CommonConf;
import tachyon.worker.BlocksLocker;
import tachyon.worker.DataServer;
import tachyon.worker.hierarchy.StorageDir;
import tachyon.worker.nio.DataServerMessage;

public class NIODataServer
implements Runnable,
DataServer {
    private static final Logger LOG = LoggerFactory.getLogger((String)Constants.LOGGER_TYPE);
    private final InetSocketAddress mAddress;
    private ServerSocketChannel mServerChannel;
    private Selector mSelector;
    private final Map<SocketChannel, DataServerMessage> mSendingData = Collections.synchronizedMap(new HashMap());
    private final Map<SocketChannel, DataServerMessage> mReceivingData = Collections.synchronizedMap(new HashMap());
    private final BlocksLocker mBlockLocker;
    private final Thread mListenerThread;
    private volatile boolean mShutdown = false;
    private volatile boolean mShutdowned = false;

    public NIODataServer(InetSocketAddress address, BlocksLocker locker) {
        LOG.info("Starting DataServer @ " + address);
        CommonConf.assertValidPort(address);
        this.mAddress = address;
        this.mBlockLocker = locker;
        try {
            this.mSelector = this.initSelector();
            this.mListenerThread = new Thread(this);
            this.mListenerThread.start();
        }
        catch (IOException e) {
            LOG.error(e.getMessage() + this.mAddress, (Throwable)e);
            throw Throwables.propagate((Throwable)e);
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(this.mSelector, 1);
    }

    @Override
    public void close() throws IOException {
        this.mShutdown = true;
        this.mServerChannel.close();
        this.mSelector.close();
    }

    @Override
    public int getPort() {
        return this.mServerChannel.socket().getLocalPort();
    }

    private Selector initSelector() throws IOException {
        AbstractSelector socketSelector = SelectorProvider.provider().openSelector();
        try {
            this.mServerChannel = ServerSocketChannel.open();
            this.mServerChannel.configureBlocking(false);
            this.mServerChannel.socket().bind(this.mAddress);
            this.mServerChannel.register(socketSelector, 16);
            return socketSelector;
        }
        catch (IOException e) {
            try {
                ((Selector)socketSelector).close();
            }
            catch (IOException ex) {
                LOG.warn("Unable to close socket selector", (Throwable)ex);
            }
            throw e;
        }
        catch (RuntimeException e) {
            try {
                ((Selector)socketSelector).close();
            }
            catch (IOException ex) {
                LOG.warn("Unable to close socket selector", (Throwable)ex);
            }
            throw e;
        }
    }

    @Override
    public boolean isClosed() {
        return this.mShutdowned;
    }

    private void read(SelectionKey key) throws IOException {
        int numRead;
        DataServerMessage tMessage;
        SocketChannel socketChannel = (SocketChannel)key.channel();
        if (this.mReceivingData.containsKey(socketChannel)) {
            tMessage = this.mReceivingData.get(socketChannel);
        } else {
            tMessage = DataServerMessage.createBlockRequestMessage();
            this.mReceivingData.put(socketChannel, tMessage);
        }
        try {
            numRead = tMessage.recv(socketChannel);
        }
        catch (IOException e) {
            key.cancel();
            socketChannel.close();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            return;
        }
        if (numRead == -1) {
            key.channel().close();
            key.cancel();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            return;
        }
        if (tMessage.isMessageReady()) {
            ByteBuffer data;
            if (tMessage.getBlockId() <= 0L) {
                LOG.error("Invalid block id " + tMessage.getBlockId());
                return;
            }
            key.interestOps(4);
            long blockId = tMessage.getBlockId();
            LOG.info("Get request for blockId: {}", (Object)blockId);
            int lockId = this.mBlockLocker.getLockId();
            StorageDir storageDir = this.mBlockLocker.lock(blockId, lockId);
            int dataLen = 0;
            try {
                data = storageDir.getBlockData(blockId, tMessage.getOffset(), (int)tMessage.getLength());
                storageDir.accessBlock(blockId);
                dataLen = data.limit();
            }
            catch (Exception e) {
                LOG.error(e.getMessage(), (Throwable)e);
                data = null;
            }
            DataServerMessage tResponseMessage = DataServerMessage.createBlockResponseMessage(true, blockId, tMessage.getOffset(), dataLen, data);
            tResponseMessage.setLockId(lockId);
            this.mSendingData.put(socketChannel, tResponseMessage);
        }
    }

    @Override
    public void run() {
        while (!this.mShutdown) {
            try {
                this.mSelector.select();
                if (this.mShutdown) break;
                Iterator<SelectionKey> selectKeys = this.mSelector.selectedKeys().iterator();
                while (selectKeys.hasNext()) {
                    SelectionKey key = selectKeys.next();
                    selectKeys.remove();
                    if (!key.isValid()) continue;
                    if (key.isAcceptable()) {
                        this.accept(key);
                        continue;
                    }
                    if (key.isReadable()) {
                        this.read(key);
                        continue;
                    }
                    if (!key.isWritable()) continue;
                    this.write(key);
                }
            }
            catch (Exception e) {
                LOG.error(e.getMessage(), (Throwable)e);
                if (this.mShutdown) break;
                throw new RuntimeException(e);
            }
        }
        this.mShutdowned = true;
    }

    private void write(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        DataServerMessage sendMessage = this.mSendingData.get(socketChannel);
        boolean closeChannel = false;
        try {
            sendMessage.send(socketChannel);
        }
        catch (IOException e) {
            closeChannel = true;
            LOG.error(e.getMessage());
        }
        if (sendMessage.finishSending() || closeChannel) {
            try {
                key.channel().close();
            }
            catch (IOException e) {
                LOG.error(e.getMessage());
            }
            key.cancel();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            sendMessage.close();
            this.mBlockLocker.unlock(Math.abs(sendMessage.getBlockId()), sendMessage.getLockId());
        }
    }
}

