/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.junit;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterators;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbConfig;
import javax.json.spi.JsonProvider;
import org.apache.xbean.finder.filter.Filter;
import org.apache.ziplock.JarLocation;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.injector.Injector;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
import org.talend.sdk.component.container.Container;
import org.talend.sdk.component.junit.ComponentsHandler;
import org.talend.sdk.component.junit.ControllableInputFactory;
import org.talend.sdk.component.junit.SimpleCollector;
import org.talend.sdk.component.junit.SimpleFactory;
import org.talend.sdk.component.junit.lang.StreamDecorator;
import org.talend.sdk.component.runtime.base.Lifecycle;
import org.talend.sdk.component.runtime.input.Input;
import org.talend.sdk.component.runtime.input.Mapper;
import org.talend.sdk.component.runtime.manager.ComponentFamilyMeta;
import org.talend.sdk.component.runtime.manager.ComponentManager;
import org.talend.sdk.component.runtime.manager.ContainerComponentRegistry;
import org.talend.sdk.component.runtime.manager.chain.AutoChunkProcessor;
import org.talend.sdk.component.runtime.manager.chain.Job;
import org.talend.sdk.component.runtime.manager.json.PreComputedJsonpProvider;
import org.talend.sdk.component.runtime.output.InputFactory;
import org.talend.sdk.component.runtime.output.OutputFactory;
import org.talend.sdk.component.runtime.output.Processor;
import org.talend.sdk.component.runtime.record.RecordConverters;

public class BaseComponentsHandler
implements ComponentsHandler {
    private static final Logger log = LoggerFactory.getLogger(BaseComponentsHandler.class);
    protected static final Local<State> STATE = BaseComponentsHandler.loadStateHolder();
    private final ThreadLocal<PreState> initState = ThreadLocal.withInitial(PreState::new);
    protected String packageName;
    protected Collection<String> isolatedPackages;

    private static Local<State> loadStateHolder() {
        switch (System.getProperty("talend.component.junit.handler.state", "thread").toLowerCase(Locale.ROOT)) {
            case "static": {
                return new Local.StaticImpl<State>();
            }
        }
        return new Local.ThreadLocalImpl<State>();
    }

    @Override
    public <T> T injectServices(T instance) {
        if (instance == null) {
            return null;
        }
        String plugin = this.getSinglePlugin();
        Map services = ((ComponentManager.AllServices)((Container)this.asManager().findPlugin(plugin).orElseThrow(() -> new IllegalArgumentException("cant find plugin '" + plugin + "'"))).get(ComponentManager.AllServices.class)).getServices();
        ((Injector)Injector.class.cast(services.get(Injector.class))).inject(instance);
        return instance;
    }

    public BaseComponentsHandler withIsolatedPackage(String packageName, String ... packages) {
        this.isolatedPackages = Stream.concat(Stream.of(packageName), Stream.of(packages)).filter(Objects::nonNull).collect(Collectors.toList());
        if (this.isolatedPackages.isEmpty()) {
            this.isolatedPackages = null;
        }
        return this;
    }

    public EmbeddedComponentManager start() {
        EmbeddedComponentManager embeddedComponentManager = new EmbeddedComponentManager(this.packageName){

            /*
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            @Override
            protected boolean isContainerClass(Filter filter, String name) {
                if (name == null) {
                    return super.isContainerClass(filter, null);
                }
                if (BaseComponentsHandler.this.isolatedPackages != null) {
                    if (!BaseComponentsHandler.this.isolatedPackages.stream().noneMatch(name::startsWith)) return false;
                }
                if (!super.isContainerClass(filter, name)) return false;
                return true;
            }

            @Override
            public void close() {
                try {
                    State state = STATE.get();
                    if (state.jsonb != null) {
                        try {
                            state.jsonb.close();
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                    STATE.remove();
                    BaseComponentsHandler.this.initState.remove();
                }
                finally {
                    super.close();
                }
            }
        };
        STATE.set(new State(embeddedComponentManager, new ArrayList<Object>(), this.initState.get().emitter, null, null, null, null));
        return embeddedComponentManager;
    }

    @Override
    public Outputs collect(Processor processor, ControllableInputFactory inputs) {
        return this.collect(processor, inputs, 10);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Outputs collect(Processor processor, ControllableInputFactory inputs, int bundleSize) {
        AutoChunkProcessor autoChunkProcessor = new AutoChunkProcessor(bundleSize, processor);
        autoChunkProcessor.start();
        Outputs outputs = new Outputs();
        OutputFactory outputFactory = name -> value -> {
            List aggregator = outputs.data.computeIfAbsent(name, n -> new ArrayList());
            aggregator.add(value);
        };
        try {
            while (inputs.hasMoreData()) {
                autoChunkProcessor.onElement((InputFactory)inputs, outputFactory);
            }
            autoChunkProcessor.flush(outputFactory);
        }
        finally {
            autoChunkProcessor.stop();
        }
        return outputs;
    }

    @Override
    public <T> Stream<T> collect(Class<T> recordType, Mapper mapper, int maxRecords) {
        return this.collect(recordType, mapper, maxRecords, Runtime.getRuntime().availableProcessors());
    }

    @Override
    public <T> Stream<T> collect(final Class<T> recordType, final Mapper mapper, int maxRecords, int concurrency) {
        mapper.start();
        final State state = STATE.get();
        long assess = mapper.assess();
        int proc = Math.max(1, concurrency);
        List mappers = mapper.split(Math.max(assess / (long)proc, 1L));
        switch (mappers.size()) {
            case 0: {
                return Stream.empty();
            }
            case 1: {
                return StreamDecorator.decorate(this.asStream(this.asIterator(((Mapper)mappers.iterator().next()).create(), new AtomicInteger(maxRecords))), collect -> {
                    try {
                        collect.run();
                    }
                    finally {
                        mapper.stop();
                    }
                });
            }
        }
        final AtomicInteger threadCounter = new AtomicInteger(0);
        ExecutorService es = Executors.newFixedThreadPool(mappers.size(), r -> new Thread(r){
            {
                super(x0);
                this.setName(BaseComponentsHandler.this.getClass().getSimpleName() + "-pool-" + Math.abs(mapper.hashCode()) + "-" + threadCounter.incrementAndGet());
            }
        });
        AtomicInteger recordCounter = new AtomicInteger(maxRecords);
        final Semaphore permissions = new Semaphore(0);
        final ConcurrentLinkedQueue records = new ConcurrentLinkedQueue();
        final CountDownLatch latch = new CountDownLatch(mappers.size());
        List tasks = mappers.stream().map(Mapper::create).map(input -> this.asIterator((Input)input, recordCounter)).map(it -> es.submit(() -> {
            try {
                while (it.hasNext()) {
                    Object next = it.next();
                    records.add(next);
                    permissions.release();
                }
            }
            finally {
                latch.countDown();
            }
        })).collect(Collectors.toList());
        es.shutdown();
        final int timeout = Integer.getInteger("talend.component.junit.timeout", 5);
        new Thread(){
            {
                this.setName(BaseComponentsHandler.class.getSimpleName() + "-monitor_" + Math.abs(mapper.hashCode()));
            }

            @Override
            public void run() {
                try {
                    latch.await(timeout, TimeUnit.MINUTES);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                finally {
                    permissions.release();
                }
            }
        }.start();
        return StreamDecorator.decorate(this.asStream(new Iterator<T>(){

            @Override
            public boolean hasNext() {
                try {
                    permissions.acquire();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    Assert.fail((String)e.getMessage());
                }
                return !records.isEmpty();
            }

            @Override
            public T next() {
                Object poll = records.poll();
                if (poll != null) {
                    return BaseComponentsHandler.this.mapRecord(state, recordType, poll);
                }
                return null;
            }
        }), task -> {
            try {
                task.run();
            }
            finally {
                tasks.forEach(f -> {
                    try {
                        f.get(5L, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    catch (ExecutionException | TimeoutException exception) {
                    }
                    finally {
                        if (!f.isDone() && !f.isCancelled()) {
                            f.cancel(true);
                        }
                    }
                });
            }
        });
    }

    private <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 1024), false);
    }

    private <T> Iterator<T> asIterator(final Input input, final AtomicInteger counter) {
        input.start();
        return new Iterator<T>(){
            private boolean closed;
            private Object next;

            @Override
            public boolean hasNext() {
                boolean hasNext;
                int remaining = counter.get();
                if (remaining <= 0) {
                    return false;
                }
                this.next = input.next();
                boolean bl = hasNext = this.next != null;
                if (!hasNext && !this.closed) {
                    this.closed = true;
                    input.stop();
                }
                if (hasNext) {
                    counter.decrementAndGet();
                }
                return hasNext;
            }

            @Override
            public T next() {
                return this.next;
            }
        };
    }

    @Override
    public <T> List<T> collectAsList(Class<T> recordType, Mapper mapper) {
        return this.collectAsList(recordType, mapper, 1000);
    }

    @Override
    public <T> List<T> collectAsList(Class<T> recordType, Mapper mapper, int maxRecords) {
        return this.collect(recordType, mapper, maxRecords).collect(Collectors.toList());
    }

    @Override
    public Mapper createMapper(Class<?> componentType, Object configuration) {
        return this.create(Mapper.class, componentType, configuration);
    }

    @Override
    public Processor createProcessor(Class<?> componentType, Object configuration) {
        return this.create(Processor.class, componentType, configuration);
    }

    private <C, T, A> A create(Class<A> api, Class<T> componentType, C configuration) {
        ComponentFamilyMeta.BaseMeta<Lifecycle> meta = this.findMeta(componentType);
        return api.cast(meta.getInstantiator().apply(configuration == null || ((List)meta.getParameterMetas().get()).isEmpty() ? Collections.emptyMap() : SimpleFactory.configurationByExample(configuration, ((List)meta.getParameterMetas().get()).stream().filter(p -> p.getName().equals(p.getPath())).findFirst().map(p -> p.getName() + '.').orElseThrow(() -> new IllegalArgumentException("Didn't find any option and therefore can't convert the configuration instance to a configuration")))));
    }

    private <T> ComponentFamilyMeta.BaseMeta<? extends Lifecycle> findMeta(Class<T> componentType) {
        return this.asManager().find(c -> ((ContainerComponentRegistry)c.get(ContainerComponentRegistry.class)).getComponents().values().stream()).flatMap(f -> Stream.concat(f.getProcessors().values().stream(), f.getPartitionMappers().values().stream())).filter(m -> m.getType().getName().equals(componentType.getName())).findFirst().orElseThrow(() -> new IllegalArgumentException("No component " + componentType));
    }

    @Override
    public <T> List<T> collect(Class<T> recordType, String family, String component, int version, Map<String, String> configuration) {
        Job.components().component("in", family + "://" + component + "?__version=" + version + configuration.entrySet().stream().map(entry -> (String)entry.getKey() + "=" + (String)entry.getValue()).collect(Collectors.joining("&", "&", ""))).component("collector", "test://collector").connections().from("in").to("collector").build().run();
        return this.getCollectedData(recordType);
    }

    @Override
    public <T> void process(Iterable<T> inputs, String family, String component, int version, Map<String, String> configuration) {
        this.setInputData(inputs);
        Job.components().component("emitter", "test://emitter").component("out", family + "://" + component + "?__version=" + version + configuration.entrySet().stream().map(entry -> (String)entry.getKey() + "=" + (String)entry.getValue()).collect(Collectors.joining("&", "&", ""))).connections().from("emitter").to("out").build().run();
    }

    @Override
    public ComponentManager asManager() {
        return BaseComponentsHandler.STATE.get().manager;
    }

    @Override
    public <T> T findService(String plugin, Class<T> serviceClass) {
        return serviceClass.cast(((ComponentManager.AllServices)((Container)this.asManager().findPlugin(plugin).orElseThrow(() -> new IllegalArgumentException("cant find plugin '" + plugin + "'"))).get(ComponentManager.AllServices.class)).getServices().get(serviceClass));
    }

    @Override
    public <T> T findService(Class<T> serviceClass) {
        return this.findService(this.getSinglePlugin(), serviceClass);
    }

    public Set<String> getTestPlugins() {
        return new HashSet<String>(((EmbeddedComponentManager)((Object)EmbeddedComponentManager.class.cast(this.asManager()))).testPlugins);
    }

    @Override
    public <T> void setInputData(Iterable<T> data) {
        State state = STATE.get();
        if (state == null) {
            this.initState.get().emitter = data.iterator();
        } else {
            state.emitter = data.iterator();
        }
    }

    @Override
    public <T> List<T> getCollectedData(Class<T> recordType) {
        State state = STATE.get();
        return state.collector.stream().filter(r -> recordType.isInstance(r) || JsonObject.class.isInstance(r) || Record.class.isInstance(r)).map(r -> this.mapRecord(state, recordType, r)).collect(Collectors.toList());
    }

    public void resetState() {
        State state = STATE.get();
        if (state == null) {
            STATE.remove();
        } else {
            state.collector.clear();
            state.emitter = Collections.emptyIterator();
        }
    }

    private String getSinglePlugin() {
        return Optional.of(((EmbeddedComponentManager)((Object)EmbeddedComponentManager.class.cast(this.asManager()))).testPlugins).filter(c -> !c.isEmpty()).map(c -> (String)c.iterator().next()).orElseThrow(() -> new IllegalStateException("No component plugin found"));
    }

    private <T> T mapRecord(State state, Class<T> recordType, Object r) {
        if (recordType.isInstance(r)) {
            return recordType.cast(r);
        }
        if (Record.class == recordType) {
            return recordType.cast(new RecordConverters().toRecord(state.registry, r, state::jsonb, state::recordBuilderFactory));
        }
        return recordType.cast(new RecordConverters().toType(state.registry, r, recordType, state::jsonBuilderFactory, state::jsonProvider, state::jsonb, state::recordBuilderFactory));
    }

    static interface Local<T> {
        public void set(T var1);

        public T get();

        public void remove();

        public static class ThreadLocalImpl<T>
        implements Local<T> {
            private final ThreadLocal<T> threadLocal = new ThreadLocal();

            @Override
            public void set(T value) {
                this.threadLocal.set(value);
            }

            @Override
            public T get() {
                return this.threadLocal.get();
            }

            @Override
            public void remove() {
                this.threadLocal.remove();
            }
        }

        public static class StaticImpl<T>
        implements Local<T> {
            private final AtomicReference<T> state = new AtomicReference();

            @Override
            public void set(T value) {
                this.state.set(value);
            }

            @Override
            public T get() {
                return this.state.get();
            }

            @Override
            public void remove() {
                this.state.set(null);
            }
        }
    }

    public static class Outputs {
        private final Map<String, List<?>> data = new HashMap();

        public int size() {
            return this.data.size();
        }

        public Set<String> keys() {
            return this.data.keySet();
        }

        public <T> List<T> get(Class<T> type, String name) {
            return this.data.get(name);
        }
    }

    public static class EmbeddedComponentManager
    extends ComponentManager {
        private final ComponentManager oldInstance;
        private final List<String> testPlugins;

        private EmbeddedComponentManager(String componentPackage) {
            super(EmbeddedComponentManager.findM2(), "TALEND-INF/dependencies.txt", "org.talend.sdk.component:type=component,value=%s");
            this.testPlugins = this.addJarContaining(Thread.currentThread().getContextClassLoader(), componentPackage.replace('.', '/'));
            this.container.builder("component-runtime-junit.jar", JarLocation.jarLocation(SimpleCollector.class).getAbsolutePath()).create();
            this.oldInstance = (ComponentManager)CONTEXTUAL_INSTANCE.get();
            CONTEXTUAL_INSTANCE.set(this);
        }

        public void close() {
            try {
                super.close();
            }
            finally {
                CONTEXTUAL_INSTANCE.compareAndSet(this, this.oldInstance);
            }
        }

        protected boolean isContainerClass(Filter filter, String name) {
            return true;
        }
    }

    protected static class State {
        final ComponentManager manager;
        final Collection<Object> collector;
        final RecordConverters.MappingMetaRegistry registry = new RecordConverters.MappingMetaRegistry();
        Iterator<?> emitter;
        volatile Jsonb jsonb;
        volatile JsonProvider jsonProvider;
        volatile JsonBuilderFactory jsonBuilderFactory;
        volatile RecordBuilderFactory recordBuilderFactory;

        synchronized Jsonb jsonb() {
            if (this.jsonb == null) {
                this.jsonb = this.manager.getJsonbProvider().create().withProvider((JsonProvider)new PreComputedJsonpProvider("test", this.manager.getJsonpProvider(), this.manager.getJsonpParserFactory(), this.manager.getJsonpWriterFactory(), this.manager.getJsonpBuilderFactory(), this.manager.getJsonpGeneratorFactory(), this.manager.getJsonpReaderFactory())).withConfig(new JsonbConfig().setProperty("johnzon.cdi.activated", (Object)false)).build();
            }
            return this.jsonb;
        }

        synchronized JsonProvider jsonProvider() {
            if (this.jsonProvider == null) {
                this.jsonProvider = this.manager.getJsonpProvider();
            }
            return this.jsonProvider;
        }

        synchronized JsonBuilderFactory jsonBuilderFactory() {
            if (this.jsonBuilderFactory == null) {
                this.jsonBuilderFactory = this.manager.getJsonpBuilderFactory();
            }
            return this.jsonBuilderFactory;
        }

        synchronized RecordBuilderFactory recordBuilderFactory() {
            if (this.recordBuilderFactory == null) {
                this.recordBuilderFactory = (RecordBuilderFactory)this.manager.getRecordBuilderFactoryProvider().apply("test");
            }
            return this.recordBuilderFactory;
        }

        public State(ComponentManager manager, Collection<Object> collector, Iterator<?> emitter, Jsonb jsonb, JsonProvider jsonProvider, JsonBuilderFactory jsonBuilderFactory, RecordBuilderFactory recordBuilderFactory) {
            this.manager = manager;
            this.collector = collector;
            this.emitter = emitter;
            this.jsonb = jsonb;
            this.jsonProvider = jsonProvider;
            this.jsonBuilderFactory = jsonBuilderFactory;
            this.recordBuilderFactory = recordBuilderFactory;
        }
    }

    static class PreState {
        Iterator<?> emitter;

        PreState() {
        }
    }
}

