/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.redis.runtime.datasource;

import io.quarkus.redis.datasource.RedisDataSource;
import io.quarkus.redis.datasource.pubsub.PubSubCommands;
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands;
import io.quarkus.redis.runtime.datasource.AbstractRedisCommandGroup;
import io.smallrye.mutiny.helpers.ParameterValidation;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;

public class BlockingPubSubCommandsImpl<V>
extends AbstractRedisCommandGroup
implements PubSubCommands<V> {
    private final ReactivePubSubCommands<V> reactive;

    public BlockingPubSubCommandsImpl(RedisDataSource ds, ReactivePubSubCommands<V> reactive, Duration timeout) {
        super(ds, timeout);
        this.reactive = reactive;
    }

    @Override
    public void publish(String channel, V message) {
        this.reactive.publish(channel, message).await().atMost(this.timeout);
    }

    @Override
    public PubSubCommands.RedisSubscriber subscribe(String channel, Consumer<V> onMessage) {
        return this.subscribe(channel, onMessage, null, null);
    }

    @Override
    public PubSubCommands.RedisSubscriber subscribeToPattern(String pattern, Consumer<V> onMessage) {
        return this.subscribeToPattern(pattern, onMessage, null, null);
    }

    @Override
    public PubSubCommands.RedisSubscriber subscribeToPatterns(List<String> patterns, Consumer<V> onMessage) {
        return this.subscribeToPatterns(patterns, onMessage, null, null);
    }

    @Override
    public PubSubCommands.RedisSubscriber subscribe(List<String> channels, Consumer<V> onMessage) {
        return this.subscribe(channels, onMessage, null, null);
    }

    @Override
    public PubSubCommands.RedisSubscriber subscribe(String channel, Consumer<V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        ParameterValidation.nonNull((Object)channel, (String)"channel");
        return this.subscribe(List.of(channel), onMessage, onEnd, onException);
    }

    @Override
    public PubSubCommands.RedisSubscriber subscribeToPattern(String pattern, Consumer<V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        ParameterValidation.nonNull((Object)pattern, (String)"pattern");
        return this.subscribeToPatterns(List.of(pattern), onMessage, onEnd, onException);
    }

    @Override
    public PubSubCommands.RedisSubscriber subscribeToPatterns(List<String> patterns, Consumer<V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        return (PubSubCommands.RedisSubscriber)this.reactive.subscribeToPatterns(patterns, onMessage, onEnd, onException).map(rs -> new BlockingRedisSubscriber((ReactivePubSubCommands.ReactiveRedisSubscriber)rs)).await().atMost(this.timeout);
    }

    @Override
    public PubSubCommands.RedisSubscriber subscribe(List<String> channels, Consumer<V> onMessage, Runnable onEnd, Consumer<Throwable> onException) {
        return (PubSubCommands.RedisSubscriber)this.reactive.subscribe(channels, onMessage, onEnd, onException).map(r -> new BlockingRedisSubscriber((ReactivePubSubCommands.ReactiveRedisSubscriber)r)).await().atMost(this.timeout);
    }

    private class BlockingRedisSubscriber
    implements PubSubCommands.RedisSubscriber {
        private final ReactivePubSubCommands.ReactiveRedisSubscriber reactiveRedisSubscriber;

        public BlockingRedisSubscriber(ReactivePubSubCommands.ReactiveRedisSubscriber reactiveRedisSubscriber) {
            this.reactiveRedisSubscriber = reactiveRedisSubscriber;
        }

        @Override
        public void unsubscribe(String ... channels) {
            this.reactiveRedisSubscriber.unsubscribe(channels).await().atMost(BlockingPubSubCommandsImpl.this.timeout);
        }

        @Override
        public void unsubscribe() {
            this.reactiveRedisSubscriber.unsubscribe().await().atMost(BlockingPubSubCommandsImpl.this.timeout);
        }
    }
}

