package org.apache.flink.table.runtime.operators.python.scalar;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.python.AbstractEmbeddedPythonFunctionOperator;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import pemja.core.PythonInterpreter;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/scalar/EmbeddedPythonScalarFunctionOperator.class */
public class EmbeddedPythonScalarFunctionOperator extends AbstractEmbeddedPythonFunctionOperator<RowData> implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private final PythonFunctionInfo[] scalarFunctions;
    private final int[] udfInputOffsets;
    protected final RowType inputType;
    protected final RowType udfInputType;
    protected final RowType udfOutputType;
    private GeneratedProjection forwardedFieldGeneratedProjection;
    private GenericRowData reuseResultRowData;
    private transient StreamRecordRowDataWrappingCollector rowDataWrapper;
    private transient Projection<RowData, BinaryRowData> forwardedFieldProjection;
    private transient PythonTypeUtils.DataConverter[] userDefinedFunctionInputConverters;
    private transient Object[] userDefinedFunctionInputArgs;
    private transient PythonTypeUtils.DataConverter[] userDefinedFunctionOutputConverters;
    private transient boolean isOneArg;
    private transient boolean isOneFieldResult;

    public EmbeddedPythonScalarFunctionOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, RowType rowType3, int[] iArr) {
        super(configuration);
        this.inputType = (RowType) Preconditions.checkNotNull(rowType);
        this.udfInputType = (RowType) Preconditions.checkNotNull(rowType2);
        this.udfOutputType = (RowType) Preconditions.checkNotNull(rowType3);
        this.udfInputOffsets = (int[]) Preconditions.checkNotNull(iArr);
        this.scalarFunctions = (PythonFunctionInfo[]) Preconditions.checkNotNull(pythonFunctionInfoArr);
    }

    public EmbeddedPythonScalarFunctionOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, RowType rowType3, int[] iArr, GeneratedProjection generatedProjection) {
        this(configuration, pythonFunctionInfoArr, rowType, rowType2, rowType3, iArr);
        this.forwardedFieldGeneratedProjection = (GeneratedProjection) Preconditions.checkNotNull(generatedProjection);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractEmbeddedPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.isOneArg = this.udfInputOffsets.length == 1;
        this.isOneFieldResult = this.udfOutputType.getFieldCount() == 1;
        super.open();
        this.rowDataWrapper = new StreamRecordRowDataWrappingCollector(this.output);
        this.reuseResultRowData = new GenericRowData(this.udfOutputType.getFieldCount());
        this.userDefinedFunctionInputConverters = (PythonTypeUtils.DataConverter[]) new RowType((List) Arrays.stream(this.udfInputOffsets).mapToObj(i -> {
            return (RowType.RowField) this.inputType.getFields().get(i);
        }).collect(Collectors.toList())).getFields().stream().map((v0) -> {
            return v0.getType();
        }).map(PythonTypeUtils::toDataConverter).toArray(i2 -> {
            return new PythonTypeUtils.DataConverter[i2];
        });
        this.userDefinedFunctionInputArgs = new Object[this.udfInputOffsets.length];
        this.userDefinedFunctionOutputConverters = (PythonTypeUtils.DataConverter[]) this.udfOutputType.getFields().stream().map((v0) -> {
            return v0.getType();
        }).map(PythonTypeUtils::toDataConverter).toArray(i3 -> {
            return new PythonTypeUtils.DataConverter[i3];
        });
        if (this.forwardedFieldGeneratedProjection != null) {
            this.forwardedFieldProjection = (Projection) this.forwardedFieldGeneratedProjection.newInstance(Thread.currentThread().getContextClassLoader());
        }
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractEmbeddedPythonFunctionOperator
    public void openPythonInterpreter(String str, Map<String, String> map) {
        LOG.info("Create Operation in multi-threads.");
        this.interpreter.exec("from pyflink.fn_execution.utils.operation_utils import create_scalar_operation_from_proto");
        this.interpreter.set("proto", getUserDefinedFunctionsProto().toByteArray());
        PythonInterpreter pythonInterpreter = this.interpreter;
        Object[] objArr = new Object[2];
        objArr[0] = this.isOneArg ? "True" : "False";
        objArr[1] = this.isOneFieldResult ? "True" : "False";
        pythonInterpreter.exec(String.format("scalar_operation = create_scalar_operation_from_proto(proto, %s, %s)", objArr));
        this.interpreter.invokeMethod("scalar_operation", "open", new Object[0]);
    }

    public void endInput() {
        if (this.interpreter != null) {
            this.interpreter.invokeMethod("scalar_operation", "close", new Object[0]);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractEmbeddedPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonEnv getPythonEnv() {
        return this.scalarFunctions[0].getPythonFunction().getPythonEnv();
    }

    public void processElement(StreamRecord<RowData> streamRecord) {
        RowData rowData = (RowData) streamRecord.getValue();
        Object obj = null;
        if (this.userDefinedFunctionInputArgs.length > 1) {
            for (int i = 0; i < this.userDefinedFunctionInputArgs.length; i++) {
                this.userDefinedFunctionInputArgs[i] = this.userDefinedFunctionInputConverters[i].toExternal(rowData, this.udfInputOffsets[i]);
            }
            obj = this.userDefinedFunctionInputArgs;
        } else if (this.userDefinedFunctionInputArgs.length == 1) {
            obj = this.userDefinedFunctionInputConverters[0].toExternal(rowData, this.udfInputOffsets[0]);
        }
        if (this.isOneFieldResult) {
            this.reuseResultRowData.setField(0, this.userDefinedFunctionOutputConverters[0].toInternal(this.interpreter.invokeMethod("scalar_operation", "process_element", obj)));
        } else {
            Object[] objArr = (Object[]) this.interpreter.invokeMethod("scalar_operation", "process_element", obj);
            for (int i2 = 0; i2 < objArr.length; i2++) {
                this.reuseResultRowData.setField(i2, this.userDefinedFunctionOutputConverters[i2].toInternal(objArr[i2]));
            }
        }
        if (this.forwardedFieldProjection != null) {
            this.rowDataWrapper.collect(new JoinedRowData(this.forwardedFieldProjection.apply(rowData).copy(), this.reuseResultRowData));
        } else {
            this.rowDataWrapper.collect((RowData) this.reuseResultRowData);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    protected void invokeFinishBundle() throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractEmbeddedPythonFunctionOperator
    public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
        FlinkFnApi.UserDefinedFunctions.Builder newBuilder = FlinkFnApi.UserDefinedFunctions.newBuilder();
        for (PythonFunctionInfo pythonFunctionInfo : this.scalarFunctions) {
            newBuilder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(pythonFunctionInfo));
        }
        newBuilder.setMetricEnabled(((Boolean) this.config.get(PythonOptions.PYTHON_METRIC_ENABLED)).booleanValue());
        newBuilder.setProfileEnabled(((Boolean) this.config.get(PythonOptions.PYTHON_PROFILE_ENABLED)).booleanValue());
        return newBuilder.build();
    }
}
