package org.apache.flink.connector.pulsar.source.reader.split;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.class */
public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSplitReaderBase<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarUnorderedPartitionSplitReader.class);
    private static final Duration REDELIVER_TIME = Duration.ofSeconds(3);
    private final TransactionCoordinatorClient coordinatorClient;

    @Nullable
    private Transaction uncommittedTransaction;

    public PulsarUnorderedPartitionSplitReader(PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceConfiguration sourceConfiguration, PulsarDeserializationSchema<OUT> pulsarDeserializationSchema, TransactionCoordinatorClient transactionCoordinatorClient) {
        super(pulsarClient, pulsarAdmin, sourceConfiguration, pulsarDeserializationSchema);
        this.coordinatorClient = transactionCoordinatorClient;
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    protected Message<byte[]> pollMessage(Duration duration) throws ExecutionException, InterruptedException, PulsarClientException {
        Message<byte[]> receive = this.pulsarConsumer.receive(Math.toIntExact(duration.toMillis()), TimeUnit.MILLISECONDS);
        if (receive == null) {
            return null;
        }
        if (!this.sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
            if (this.uncommittedTransaction == null) {
                this.uncommittedTransaction = newTransaction();
            }
            try {
                this.pulsarConsumer.acknowledgeAsync(receive.getMessageId(), this.uncommittedTransaction).get();
            } catch (InterruptedException e) {
                PulsarExceptionUtils.sneakyClient(() -> {
                    this.pulsarConsumer.reconsumeLater(receive, REDELIVER_TIME.toMillis(), TimeUnit.MILLISECONDS);
                });
                Thread.currentThread().interrupt();
                throw e;
            } catch (ExecutionException e2) {
                PulsarExceptionUtils.sneakyClient(() -> {
                    this.pulsarConsumer.reconsumeLater(receive, REDELIVER_TIME.toMillis(), TimeUnit.MILLISECONDS);
                });
                throw e2;
            }
        }
        return receive;
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    protected void finishedPollMessage(Message<byte[]> message) {
        if (this.sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
            PulsarExceptionUtils.sneakyClient(() -> {
                this.pulsarConsumer.acknowledge(message);
            });
        }
        message.release();
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    protected void afterCreatingConsumer(PulsarPartitionSplit pulsarPartitionSplit, Consumer<byte[]> consumer) {
        TxnID uncommittedTransactionId = pulsarPartitionSplit.getUncommittedTransactionId();
        if (uncommittedTransactionId != null) {
            if (this.coordinatorClient != null) {
                try {
                    this.coordinatorClient.abort(uncommittedTransactionId);
                } catch (TransactionCoordinatorClientException e) {
                    LOG.error("Failed to abort the uncommitted transaction {} when restart the reader", uncommittedTransactionId, e);
                }
            }
            consumer.redeliverUnacknowledgedMessages();
        }
    }

    public PulsarPartitionSplitState snapshotState(long j) {
        PulsarPartitionSplitState pulsarPartitionSplitState = new PulsarPartitionSplitState(this.registeredSplit);
        if (this.uncommittedTransaction != null) {
            TxnID txnID = this.uncommittedTransaction.getTxnID();
            this.uncommittedTransaction = newTransaction();
            pulsarPartitionSplitState.setUncommittedTransactionId(txnID);
        }
        return pulsarPartitionSplitState;
    }

    private Transaction newTransaction() {
        return PulsarTransactionUtils.createTransaction(this.pulsarClient, this.sourceConfiguration.getTransactionTimeoutMillis());
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    public /* bridge */ /* synthetic */ void wakeUp() {
        super.wakeUp();
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    public /* bridge */ /* synthetic */ void handleSplitsChanges(SplitsChange splitsChange) {
        super.handleSplitsChanges(splitsChange);
    }

    @Override // org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase
    public /* bridge */ /* synthetic */ RecordsWithSplitIds fetch() throws IOException {
        return super.fetch();
    }
}
