/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.impl.engine;

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.camel.AsyncCallback;
import org.apache.camel.StaticService;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.support.service.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedResource(description="Managed ReactiveExecutor")
public class DefaultReactiveExecutor
extends ServiceSupport
implements ReactiveExecutor,
StaticService {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultReactiveExecutor.class);
    private final ThreadLocal<Worker> workers = ThreadLocal.withInitial(new Supplier<Worker>(){

        @Override
        public Worker get() {
            DefaultReactiveExecutor.this.createdWorkers.incrementAndGet();
            return new Worker(DefaultReactiveExecutor.this);
        }
    });
    private final AtomicInteger createdWorkers = new AtomicInteger();
    private final AtomicInteger runningWorkers = new AtomicInteger();
    private final AtomicLong pendingTasks = new AtomicLong();

    public void scheduleMain(Runnable runnable, String description) {
        if (description != null) {
            runnable = DefaultReactiveExecutor.describe(runnable, description);
        }
        this.workers.get().schedule(runnable, true, true, false);
    }

    public void schedule(Runnable runnable, String description) {
        if (description != null) {
            runnable = DefaultReactiveExecutor.describe(runnable, description);
        }
        this.workers.get().schedule(runnable, true, false, false);
    }

    public void scheduleSync(Runnable runnable, String description) {
        if (description != null) {
            runnable = DefaultReactiveExecutor.describe(runnable, description);
        }
        this.workers.get().schedule(runnable, false, true, true);
    }

    public boolean executeFromQueue() {
        return this.workers.get().executeFromQueue();
    }

    @ManagedAttribute(description="Number of created workers")
    public int getCreatedWorkers() {
        return this.createdWorkers.get();
    }

    @ManagedAttribute(description="Number of running workers")
    public int getRunningWorkers() {
        return this.runningWorkers.get();
    }

    @ManagedAttribute(description="Number of pending tasks")
    public long getPendingTasks() {
        return this.pendingTasks.get();
    }

    public void callback(final AsyncCallback callback) {
        this.schedule(new Runnable(){

            @Override
            public void run() {
                callback.done(false);
            }

            public String toString() {
                return "Callback[" + callback + "]";
            }
        });
    }

    private static Runnable describe(final Runnable runnable, final String description) {
        return new Runnable(){

            @Override
            public void run() {
                runnable.run();
            }

            public String toString() {
                return description;
            }
        };
    }

    protected void doStart() throws Exception {
    }

    protected void doStop() throws Exception {
    }

    private static class Worker {
        private final DefaultReactiveExecutor executor;
        private volatile LinkedList<Runnable> queue = new LinkedList();
        private volatile LinkedList<LinkedList<Runnable>> back;
        private volatile boolean running;

        public Worker(DefaultReactiveExecutor executor) {
            this.executor = executor;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
            block17: {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Schedule [first={}, main={}, sync={}]: {}", new Object[]{first, main, sync, runnable});
                }
                if (main && !this.queue.isEmpty()) {
                    if (this.back == null) {
                        this.back = new LinkedList();
                    }
                    this.back.push(this.queue);
                    this.queue = new LinkedList();
                }
                if (first) {
                    this.queue.addFirst(runnable);
                    this.executor.pendingTasks.incrementAndGet();
                } else {
                    this.queue.addLast(runnable);
                    this.executor.pendingTasks.incrementAndGet();
                }
                if (!this.running || sync) {
                    this.running = true;
                    this.executor.runningWorkers.incrementAndGet();
                    block5: while (true) {
                        while (true) {
                            Runnable polled;
                            if ((polled = this.queue.poll()) == null) {
                                if (this.back != null && !this.back.isEmpty()) {
                                    this.queue = this.back.poll();
                                    continue;
                                }
                                break block17;
                            }
                            try {
                                this.executor.pendingTasks.decrementAndGet();
                                if (LOG.isTraceEnabled()) {
                                    LOG.trace("Running: {}", (Object)runnable);
                                }
                                polled.run();
                                continue block5;
                            }
                            catch (Throwable t) {
                                LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
                                continue;
                            }
                            break;
                        }
                    }
                    finally {
                        this.running = false;
                        this.executor.runningWorkers.decrementAndGet();
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Queuing reactive work: {}", (Object)runnable);
                }
            }
        }

        boolean executeFromQueue() {
            Runnable polled;
            Runnable runnable = polled = this.queue != null ? this.queue.poll() : null;
            if (polled == null) {
                return false;
            }
            try {
                this.executor.pendingTasks.decrementAndGet();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Running: {}", (Object)polled);
                }
                polled.run();
            }
            catch (Throwable t) {
                LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t);
            }
            return true;
        }
    }
}

