package org.apache.flink.changelog.fs;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.StateChangeUploader;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/changelog/fs/StateChangeFsUploader.class */
public class StateChangeFsUploader implements StateChangeUploader {
    private static final Logger LOG = LoggerFactory.getLogger(StateChangeFsUploader.class);
    private final Path basePath;
    private final FileSystem fileSystem;
    private final boolean compression;
    private final int bufferSize;
    private final ChangelogStorageMetricGroup metrics;
    private final StateChangeFormat format = new StateChangeFormat();
    private final Clock clock = SystemClock.getInstance();

    public StateChangeFsUploader(Path path, FileSystem fileSystem, boolean z, int i, ChangelogStorageMetricGroup changelogStorageMetricGroup) {
        this.basePath = path;
        this.fileSystem = fileSystem;
        this.compression = z;
        this.bufferSize = i;
        this.metrics = changelogStorageMetricGroup;
    }

    @Override // org.apache.flink.changelog.fs.StateChangeUploader
    public StateChangeUploader.UploadTasksResult upload(Collection<StateChangeUploadScheduler.UploadTask> collection) throws IOException {
        String generateFileName = generateFileName();
        LOG.debug("upload {} tasks to {}", Integer.valueOf(collection.size()), generateFileName);
        Path path = new Path(this.basePath, generateFileName);
        try {
            return uploadWithMetrics(path, collection);
        } catch (IOException e) {
            this.metrics.getUploadFailuresCounter().inc();
            Closer create = Closer.create();
            Throwable th = null;
            try {
                try {
                    create.register(() -> {
                        throw e;
                    });
                    collection.forEach(uploadTask -> {
                        create.register(() -> {
                            uploadTask.fail(e);
                        });
                    });
                    create.register(() -> {
                        this.fileSystem.delete(path, true);
                    });
                    if (create == null) {
                        return null;
                    }
                    if (0 == 0) {
                        create.close();
                        return null;
                    }
                    try {
                        create.close();
                        return null;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return null;
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th4;
            }
        }
    }

    private StateChangeUploader.UploadTasksResult uploadWithMetrics(Path path, Collection<StateChangeUploadScheduler.UploadTask> collection) throws IOException {
        this.metrics.getUploadsCounter().inc();
        long relativeTimeNanos = this.clock.relativeTimeNanos();
        StateChangeUploader.UploadTasksResult upload = upload(path, collection);
        this.metrics.getUploadLatenciesNanos().update(this.clock.relativeTimeNanos() - relativeTimeNanos);
        this.metrics.getUploadSizes().update(upload.getStateSize());
        return upload;
    }

    private StateChangeUploader.UploadTasksResult upload(Path path, Collection<StateChangeUploadScheduler.UploadTask> collection) throws IOException {
        FSDataOutputStream create = this.fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE);
        Throwable th = null;
        try {
            create.write(this.compression ? 1 : 0);
            OutputStreamWithPos wrap = wrap(create);
            Throwable th2 = null;
            try {
                try {
                    HashMap hashMap = new HashMap();
                    for (StateChangeUploadScheduler.UploadTask uploadTask : collection) {
                        hashMap.put(uploadTask, this.format.write(wrap, uploadTask.changeSets));
                    }
                    StateChangeUploader.UploadTasksResult uploadTasksResult = new StateChangeUploader.UploadTasksResult(hashMap, new FileStateHandle(path, wrap.getPos()));
                    if (wrap != null) {
                        if (0 != 0) {
                            try {
                                wrap.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            wrap.close();
                        }
                    }
                    return uploadTasksResult;
                } finally {
                }
            } catch (Throwable th4) {
                if (wrap != null) {
                    if (th2 != null) {
                        try {
                            wrap.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        wrap.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    private OutputStreamWithPos wrap(FSDataOutputStream fSDataOutputStream) throws IOException {
        return new OutputStreamWithPos(new BufferedOutputStream((this.compression ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE).decorateWithCompression(fSDataOutputStream), this.bufferSize));
    }

    private String generateFileName() {
        return UUID.randomUUID().toString();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }
}
