/*
 * Decompiled with CFR 0.152.
 */
package org.postgresql.core.v3.replication;

import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import org.postgresql.copy.CopyDual;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.ReplicationType;
import org.postgresql.util.GT;
import org.postgresql.util.LOGGER;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import org.postgresql.util.TraceLogger;

public class V3PGReplicationStream
implements PGReplicationStream {
    private static final long NANOS_PER_MILLISECOND_T = 1000000L;
    public static final long POSTGRES_2000_01_01_EPOCH = 946684800000L;
    private final long updateIntervalTimeT;
    private boolean closeFlagT = false;
    private final ReplicationType replicTypeT;
    private final CopyDual copyDualT;
    private long lastStatusUpdateT;
    private volatile LogSequenceNumber lastReceiveLSN_T = LogSequenceNumber.INVALID_LSN;
    private volatile LogSequenceNumber lastAppliedLSN_T = LogSequenceNumber.INVALID_LSN;
    private volatile LogSequenceNumber lastFlushedLSN_T = LogSequenceNumber.INVALID_LSN;
    private LogSequenceNumber lastServerLSN_T = LogSequenceNumber.INVALID_LSN;

    public V3PGReplicationStream(CopyDual copyDual, LogSequenceNumber startLSN, long updateIntervalMs, ReplicationType replicType) {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        this.copyDualT = copyDual;
        this.updateIntervalTimeT = updateIntervalMs * 1000000L;
        this.lastStatusUpdateT = System.nanoTime() - updateIntervalMs * 1000000L;
        this.lastReceiveLSN_T = startLSN;
        this.replicTypeT = replicType;
    }

    @Override
    public ByteBuffer read() throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        this.checkClose_();
        ByteBuffer payload_SB = null;
        while (payload_SB == null && this.copyDualT.isActive()) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            payload_SB = this.readInternal_(true);
        }
        return payload_SB;
    }

    @Override
    public ByteBuffer readPending() throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        this.checkClose_();
        return this.readInternal_(false);
    }

    @Override
    public LogSequenceNumber getLastReceiveLSN() {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        return this.lastReceiveLSN_T;
    }

    @Override
    public LogSequenceNumber getLastFlushedLSN() {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        return this.lastFlushedLSN_T;
    }

    @Override
    public void setAppliedLSN(LogSequenceNumber applied) {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        this.lastAppliedLSN_T = applied;
    }

    @Override
    public void setFlushedLSN(LogSequenceNumber flushed) {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        this.lastFlushedLSN_T = flushed;
    }

    @Override
    public LogSequenceNumber getLastAppliedLSN() {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        return this.lastAppliedLSN_T;
    }

    @Override
    public void forceUpdateStatus() throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        this.checkClose_();
        this.updateStatusInternal_(this.lastReceiveLSN_T, this.lastFlushedLSN_T, this.lastAppliedLSN_T, true);
    }

    @Override
    public boolean isClosed() {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        return this.closeFlagT || !this.copyDualT.isActive();
    }

    private ByteBuffer readInternal_(boolean blockT) throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        boolean updateStatusRequired = false;
        block4: while (this.copyDualT.isActive()) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            ByteBuffer byteBuffer = this.receiveNextData_(blockT);
            if (updateStatusRequired || this.isTimeUpdate()) {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                this.timeUpdateStatus();
            }
            if (byteBuffer == null) {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                return null;
            }
            byte codeT = byteBuffer.get();
            switch (codeT) {
                case 107: {
                    TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                    updateStatusRequired = this.processKeepAliveMessage_(byteBuffer);
                    updateStatusRequired |= this.updateIntervalTimeT == 0L;
                    continue block4;
                }
                case 119: {
                    TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                    return this.processXLogData_(byteBuffer);
                }
            }
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            throw new PSQLException(GT.tr("Unexpected packet type during replication: {0}", Integer.toString(codeT)), PSQLState.PROTOCOL_VIOLATION);
        }
        return null;
    }

    private ByteBuffer receiveNextData_(boolean blockT) throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        try {
            byte[] bytes = this.copyDualT.readFromCopy(blockT);
            if (bytes != null) {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                return ByteBuffer.wrap(bytes);
            }
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            return null;
        }
        catch (PSQLException exception) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            if (exception.getCause() instanceof SocketTimeoutException) {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                return null;
            }
            throw exception;
        }
    }

    private boolean isTimeUpdate() {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        if (this.updateIntervalTimeT == 0L) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            return false;
        }
        long diffT = System.nanoTime() - this.lastStatusUpdateT;
        return diffT >= this.updateIntervalTimeT;
    }

    private void timeUpdateStatus() throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        this.updateStatusInternal_(this.lastReceiveLSN_T, this.lastFlushedLSN_T, this.lastAppliedLSN_T, false);
    }

    private void updateStatusInternal_(LogSequenceNumber receivedSeq, LogSequenceNumber flushedSeq, LogSequenceNumber appliedSeq, boolean replyRequiredT) throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        byte[] reply = this.prepareUpdateStatus(receivedSeq, flushedSeq, appliedSeq, replyRequiredT);
        this.copyDualT.writeToCopy(reply, 0, reply.length);
        this.copyDualT.flushCopy();
        this.lastStatusUpdateT = System.nanoTime();
    }

    private byte[] prepareUpdateStatus(LogSequenceNumber receivedSeq, LogSequenceNumber flushedSeq, LogSequenceNumber appliedSeq, boolean replyRequiredT) {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        ByteBuffer byteBuffer = ByteBuffer.allocate(34);
        long nowTime = System.nanoTime() / 1000000L;
        long systemClockTime = TimeUnit.MICROSECONDS.convert(nowTime - 946684800000L, TimeUnit.MICROSECONDS);
        if (LOGGER.isLoggable(Level.FINEST)) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            LOGGER.log(Level.FINEST, " FE=> StandbyStatusUpdate(receivedSeq: {0}, flushed: {1}, applied: {2}, clock: {3})", receivedSeq.asString(), flushedSeq.asString(), appliedSeq.asString(), new Date(nowTime));
        }
        byteBuffer.put((byte)114);
        byteBuffer.putLong(receivedSeq.asLong());
        byteBuffer.putLong(flushedSeq.asLong());
        byteBuffer.putLong(appliedSeq.asLong());
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        byteBuffer.putLong(systemClockTime);
        if (replyRequiredT) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            byteBuffer.put((byte)1);
        } else {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            byteBuffer.put(receivedSeq == LogSequenceNumber.INVALID_LSN ? (byte)1 : 0);
        }
        this.lastStatusUpdateT = nowTime;
        return byteBuffer.array();
    }

    private boolean processKeepAliveMessage_(ByteBuffer byteBuffer) {
        boolean replyRequiredT;
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        this.lastServerLSN_T = LogSequenceNumber.valueOf(byteBuffer.getLong());
        if (this.lastServerLSN_T.asLong() > this.lastReceiveLSN_T.asLong()) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            this.lastReceiveLSN_T = this.lastServerLSN_T;
        }
        long lastServerClockTime = byteBuffer.getLong();
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        boolean bl = replyRequiredT = byteBuffer.get() != 0;
        if (LOGGER.isLoggable(Level.FINEST)) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            Date clockTimeT = new Date(TimeUnit.MILLISECONDS.convert(lastServerClockTime, TimeUnit.MICROSECONDS) + 946684800000L);
            LOGGER.log(Level.FINEST, "  <=BE Keepalive(lastServerWal: {0}, clock: {1} needReply: {2})", this.lastServerLSN_T.asString(), clockTimeT, replyRequiredT);
        }
        return replyRequiredT;
    }

    private ByteBuffer processXLogData_(ByteBuffer byteBuffer) {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        long startLsnT = byteBuffer.getLong();
        this.lastServerLSN_T = LogSequenceNumber.valueOf(byteBuffer.getLong());
        long systemClockT = byteBuffer.getLong();
        switch (this.replicTypeT) {
            case PHYSICAL: {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                int payloadSize = byteBuffer.limit() - byteBuffer.position();
                this.lastReceiveLSN_T = LogSequenceNumber.valueOf(startLsnT + (long)payloadSize);
                break;
            }
            case LOGICAL: {
                TraceLogger.logLineInfo(Level.ALL, "lineInfo");
                this.lastReceiveLSN_T = LogSequenceNumber.valueOf(startLsnT);
            }
        }
        if (LOGGER.isLoggable(Level.FINEST)) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            LOGGER.log(Level.FINEST, "  <=BE XLogData(currWal: {0}, lastServerWal: {1}, clock: {2})", this.lastReceiveLSN_T.asString(), this.lastServerLSN_T.asString(), systemClockT);
        }
        return byteBuffer.slice();
    }

    private void checkClose_() throws PSQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        if (this.isClosed()) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            throw new PSQLException(GT.tr("This replication stream has been closed.", new Object[0]), PSQLState.CONNECTION_DOES_NOT_EXIST);
        }
    }

    public LogSequenceNumber getLastServerLSN() {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        return this.lastServerLSN_T;
    }

    @Override
    public void close() throws SQLException {
        TraceLogger.logLineInfo(Level.ALL, "lineInfo");
        if (this.isClosed()) {
            TraceLogger.logLineInfo(Level.ALL, "lineInfo");
            return;
        }
        LOGGER.log(Level.FINEST, " FE=> StopReplication", new Object[0]);
        this.copyDualT.endCopy();
        this.closeFlagT = true;
    }
}

