package com.goodow.realtime.channel.impl;

import com.goodow.realtime.channel.Bus;
import com.goodow.realtime.channel.BusHook;
import com.goodow.realtime.channel.Message;
import com.goodow.realtime.core.Handler;
import com.goodow.realtime.core.Platform;
import com.goodow.realtime.json.Json;
import com.goodow.realtime.json.JsonArray;
import com.goodow.realtime.json.JsonObject;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: classes.dex */
public class ReliableSubscribeBus extends BusProxy {
    static final /* synthetic */ boolean $assertionsDisabled;
    public static final String ACKNOWLEDGE_DELAY_MILLIS = "acknowledgeDelayMillis";
    public static final String PUBLISH_CHANNEL = "publish_channel";
    public static final String SEQUENCE_NUMBER = "sequence_number_key";
    private static final Logger log;
    private final int acknowledgeDelayMillis;
    private final JsonObject acknowledgeNumbers;
    private final JsonObject acknowledgeScheduled;
    private final JsonObject currentSequences;
    private final JsonObject knownHeadSequences;
    private final JsonObject pendings;
    private final String publishChannel;
    private final String sequenceNumberKey;

    static {
        $assertionsDisabled = !ReliableSubscribeBus.class.desiredAssertionStatus();
        log = Logger.getLogger(ReliableSubscribeBus.class.getName());
    }

    public ReliableSubscribeBus(Bus bus, JsonObject jsonObject) {
        super(bus);
        this.sequenceNumberKey = (jsonObject == null || !jsonObject.has(SEQUENCE_NUMBER)) ? "v" : jsonObject.getString(SEQUENCE_NUMBER);
        this.publishChannel = (jsonObject == null || !jsonObject.has(PUBLISH_CHANNEL)) ? "realtime/store" : jsonObject.getString(PUBLISH_CHANNEL);
        this.acknowledgeDelayMillis = (jsonObject == null || !jsonObject.has(ACKNOWLEDGE_DELAY_MILLIS)) ? 3000 : (int) jsonObject.getNumber(ACKNOWLEDGE_DELAY_MILLIS);
        this.pendings = Json.createObject();
        this.currentSequences = Json.createObject();
        this.knownHeadSequences = Json.createObject();
        this.acknowledgeScheduled = Json.createObject();
        this.acknowledgeNumbers = Json.createObject();
        bus.setHook(new BusHookProxy() { // from class: com.goodow.realtime.channel.impl.ReliableSubscribeBus.1
            @Override // com.goodow.realtime.channel.impl.BusHookProxy
            protected BusHook delegate() {
                return ReliableSubscribeBus.this.hook;
            }

            @Override // com.goodow.realtime.channel.impl.BusHookProxy, com.goodow.realtime.channel.BusHook
            public boolean handleReceiveMessage(Message<?> message) {
                if (ReliableSubscribeBus.this.hook == null || ReliableSubscribeBus.this.hook.handleReceiveMessage(message)) {
                    return ReliableSubscribeBus.this.onReceiveMessage(message);
                }
                return false;
            }

            @Override // com.goodow.realtime.channel.impl.BusHookProxy, com.goodow.realtime.channel.BusHook
            public boolean handleUnsubscribe(String str) {
                if (ReliableSubscribeBus.this.needProcess(str)) {
                    ReliableSubscribeBus.this.pendings.remove(str);
                    ReliableSubscribeBus.this.currentSequences.remove(str);
                    ReliableSubscribeBus.this.knownHeadSequences.remove(str);
                    ReliableSubscribeBus.this.acknowledgeScheduled.remove(str);
                    ReliableSubscribeBus.this.acknowledgeNumbers.remove(str);
                }
                return super.handleUnsubscribe(str);
            }
        });
    }

    private void initSequenceNumber(String str, double d) {
        this.currentSequences.set(str, d);
        this.knownHeadSequences.set(str, d);
        this.pendings.set(str, Json.createObject());
    }

    private void scheduleAcknowledgment(final String str) {
        if (this.acknowledgeScheduled.has(str)) {
            return;
        }
        this.acknowledgeScheduled.set(str, true);
        Platform.scheduler().scheduleDelay(this.acknowledgeDelayMillis, new Handler<Void>() { // from class: com.goodow.realtime.channel.impl.ReliableSubscribeBus.3
            @Override // com.goodow.realtime.core.Handler
            public void handle(Void r9) {
                if (ReliableSubscribeBus.this.acknowledgeScheduled.has(str)) {
                    ReliableSubscribeBus.this.acknowledgeScheduled.remove(str);
                    double number = ReliableSubscribeBus.this.knownHeadSequences.getNumber(str);
                    double number2 = ReliableSubscribeBus.this.currentSequences.getNumber(str);
                    if (number <= number2 || (ReliableSubscribeBus.this.acknowledgeNumbers.has(str) && number <= ReliableSubscribeBus.this.acknowledgeNumbers.getNumber(str))) {
                        ReliableSubscribeBus.log.log(Level.FINE, "No need to catchup");
                        return;
                    }
                    ReliableSubscribeBus.this.acknowledgeNumbers.set(str, number);
                    ReliableSubscribeBus.log.log(Level.CONFIG, "Catching up to " + number);
                    ReliableSubscribeBus.this.catchup(str, number2);
                }
            }
        });
    }

    private void scheduleMessages(final JsonArray jsonArray) {
        Platform.scheduler().scheduleDeferred(new Handler<Void>() { // from class: com.goodow.realtime.channel.impl.ReliableSubscribeBus.4
            @Override // com.goodow.realtime.core.Handler
            public void handle(Void r3) {
                jsonArray.forEach(new JsonArray.ListIterator<Message<?>>() { // from class: com.goodow.realtime.channel.impl.ReliableSubscribeBus.4.1
                    @Override // com.goodow.realtime.json.JsonArray.ListIterator
                    public void call(int i, Message<?> message) {
                        ReliableSubscribeBus.this.delegate.publishLocal(message.topic(), message.body());
                    }
                });
            }
        });
    }

    protected void catchup(final String str, double d) {
        String substring = str.substring(this.publishChannel.length() + 1);
        this.delegate.send(this.publishChannel + "/_ops", Json.createObject().set("id", substring.substring(0, substring.lastIndexOf("/_watch"))).set("from", 1.0d + d), new Handler<Message<JsonArray>>() { // from class: com.goodow.realtime.channel.impl.ReliableSubscribeBus.2
            @Override // com.goodow.realtime.core.Handler
            public void handle(Message<JsonArray> message) {
                final String replyTopic = message.replyTopic();
                message.body().forEach(new JsonArray.ListIterator() { // from class: com.goodow.realtime.channel.impl.ReliableSubscribeBus.2.1
                    @Override // com.goodow.realtime.json.JsonArray.ListIterator
                    public void call(int i, Object obj) {
                        ReliableSubscribeBus.this.onReceiveMessage(new MessageImpl(false, false, ReliableSubscribeBus.this, str, replyTopic, obj));
                    }
                });
            }
        });
    }

    @Override // com.goodow.realtime.channel.impl.BusProxy, com.goodow.realtime.channel.Bus
    public void close() {
        super.close();
        this.pendings.clear();
        this.currentSequences.clear();
        this.knownHeadSequences.clear();
        this.acknowledgeScheduled.clear();
        this.acknowledgeNumbers.clear();
    }

    protected double getSequenceNumber(String str, Object obj) {
        return ((JsonObject) obj).getNumber(this.sequenceNumberKey);
    }

    protected boolean needProcess(String str) {
        return str.startsWith(new StringBuilder().append(this.publishChannel).append("/").toString()) && str.endsWith("/_watch") && !str.contains("/_presence/");
    }

    protected boolean onReceiveMessage(Message<?> message) {
        String str;
        String str2 = message.topic();
        Object body = message.body();
        if (!needProcess(str2)) {
            return true;
        }
        double sequenceNumber = getSequenceNumber(str2, body);
        if (!this.currentSequences.has(str2)) {
            initSequenceNumber(str2, sequenceNumber);
            return true;
        }
        double number = this.currentSequences.getNumber(str2);
        if (sequenceNumber <= number) {
            log.log(Level.CONFIG, "Old dup at sequence " + sequenceNumber + ", current is now " + number);
            return false;
        }
        JsonObject object = this.pendings.getObject(str2);
        if (((Message) object.get("" + sequenceNumber)) != null) {
            if (!$assertionsDisabled && sequenceNumber <= 1.0d + number) {
                throw new AssertionError("should not have pending data");
            }
            log.log(Level.CONFIG, "Dup message: " + message);
            return false;
        }
        this.knownHeadSequences.set(str2, Math.max(this.knownHeadSequences.getNumber(str2), sequenceNumber));
        if (sequenceNumber > 1.0d + number) {
            object.set("" + sequenceNumber, message);
            log.log(Level.CONFIG, "Missed message, current sequence=" + number + " incoming sequence=" + sequenceNumber);
            scheduleAcknowledgment(str2);
            return false;
        }
        if (!$assertionsDisabled && sequenceNumber != 1.0d + number) {
            throw new AssertionError("other cases should have been caught");
        }
        JsonArray createArray = Json.createArray();
        while (true) {
            createArray.push(message);
            number += 1.0d;
            this.currentSequences.set(str2, number);
            str = (1.0d + number) + "";
            message = (Message) object.get(str);
            if (message == null) {
                break;
            }
            object.remove(str);
        }
        scheduleMessages(createArray);
        if ($assertionsDisabled || !object.has(str)) {
            return false;
        }
        throw new AssertionError();
    }

    public void synchronizeSequenceNumber(String str, double d) {
        if (!$assertionsDisabled && (this.currentSequences.has(str) || this.knownHeadSequences.has(str) || this.pendings.has(str))) {
            throw new AssertionError();
        }
        initSequenceNumber(str, d);
        catchup(str, d);
    }
}
