/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexer.hadoop;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.DatasourceInputSplit;
import io.druid.indexer.hadoop.SegmentInputRow;
import io.druid.indexer.hadoop.WindowedDataSegment;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;

public class DatasourceRecordReader
extends RecordReader<NullWritable, InputRow> {
    private static final Logger logger = new Logger(DatasourceRecordReader.class);
    private DatasourceIngestionSpec spec;
    private IngestSegmentFirehose firehose;
    private int rowNum;
    private Row currRow;
    private List<QueryableIndex> indexes = Lists.newArrayList();
    private List<File> tmpSegmentDirs = Lists.newArrayList();
    private int numRows;

    public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
        this.spec = this.readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.JSON_MAPPER);
        List<WindowedDataSegment> segments = ((DatasourceInputSplit)split).getSegments();
        List adapters = Lists.transform(segments, (Function)new Function<WindowedDataSegment, WindowedStorageAdapter>(){

            public WindowedStorageAdapter apply(WindowedDataSegment segment) {
                try {
                    logger.info("Getting storage path for segment [%s]", new Object[]{segment.getSegment().getIdentifier()});
                    Path path = new Path(JobHelper.getURIFromSegment(segment.getSegment()));
                    logger.info("Fetch segment files from [%s]", new Object[]{path});
                    File dir = Files.createTempDir();
                    DatasourceRecordReader.this.tmpSegmentDirs.add(dir);
                    logger.info("Locally storing fetched segment at [%s]", new Object[]{dir});
                    JobHelper.unzipNoGuava(path, context.getConfiguration(), dir, (Progressable)context);
                    logger.info("finished fetching segment files", new Object[0]);
                    QueryableIndex index = HadoopDruidIndexerConfig.INDEX_IO.loadIndex(dir);
                    DatasourceRecordReader.this.indexes.add(index);
                    DatasourceRecordReader.this.numRows = DatasourceRecordReader.this.numRows + index.getNumRows();
                    return new WindowedStorageAdapter((StorageAdapter)new QueryableIndexStorageAdapter(index), segment.getInterval());
                }
                catch (IOException ex) {
                    throw Throwables.propagate((Throwable)ex);
                }
            }
        });
        this.firehose = new IngestSegmentFirehose(adapters, this.spec.getTransformSpec(), this.spec.getDimensions(), this.spec.getMetrics(), this.spec.getFilter());
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (this.firehose.hasMore()) {
            this.currRow = this.firehose.nextRow();
            ++this.rowNum;
            return true;
        }
        return false;
    }

    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    public InputRow getCurrentValue() throws IOException, InterruptedException {
        return this.currRow == null ? null : new SegmentInputRow(this.currRow, this.spec.getDimensions());
    }

    public float getProgress() throws IOException, InterruptedException {
        if (this.numRows > 0) {
            return (float)this.rowNum * 1.0f / (float)this.numRows;
        }
        return 0.0f;
    }

    public void close() throws IOException {
        Closeables.close((Closeable)this.firehose, (boolean)true);
        for (QueryableIndex qi : this.indexes) {
            Closeables.close((Closeable)qi, (boolean)true);
        }
        for (File dir : this.tmpSegmentDirs) {
            FileUtils.deleteDirectory((File)dir);
        }
    }

    private DatasourceIngestionSpec readAndVerifyDatasourceIngestionSpec(Configuration config, ObjectMapper jsonMapper) {
        try {
            String schema = (String)Preconditions.checkNotNull((Object)config.get("druid.datasource.schema"), (Object)"null schema");
            logger.info("load schema [%s]", new Object[]{schema});
            DatasourceIngestionSpec spec = (DatasourceIngestionSpec)jsonMapper.readValue(schema, DatasourceIngestionSpec.class);
            if (spec.getDimensions() == null || spec.getDimensions().size() == 0) {
                throw new ISE("load schema does not have dimensions", new Object[0]);
            }
            if (spec.getMetrics() == null || spec.getMetrics().size() == 0) {
                throw new ISE("load schema does not have metrics", new Object[0]);
            }
            return spec;
        }
        catch (IOException ex) {
            throw new RuntimeException("couldn't load segment load spec", ex);
        }
    }
}

