当前位置: 首页 > 图灵资讯 > 技术篇> Flink自定义函数报错,无效的输入参数

Flink自定义函数报错,无效的输入参数

来源:图灵教育
时间:2023-06-01 09:49:36

一、问题

flink编写自定义函数,调用时报错:Invalid input arguments

Flink自定义函数报错,无效的输入参数_flink

自定义函数和调用代码如下:

package cn.hmb.flink.udf.table;import org.apache.flink.table.functions.TableFunction;public class MultilineByValueTranslate extends TableFunction<Integer> {    public void eval(int number) {        for (int i =0; i < number; i++) {            collect(1);        }    }}

@Override    protected String handle(List<InputConfig> input, OperatorConfig opt, List<OutputConfig> output, Map<Object, String> tableMap) {        InputConfig in = input.get(0);        VerticalValueSplitConfig config = (VerticalValueSplitConfig) opt;        StringBuffer script = new StringBuffer();        script.append("CREATE TEMPORARY FUNCTION IF NOT EXISTS multiline_by_value AS 'cn.hmb.flink.udf.table.MultilineByValueTranslate';");        script.append(System.lineSeparator());        output.forEach(out ->                script.append(String.format("insert into %s select %s from (select %s from %s where %s) temp, %s where %s;",                        tableMap.get(out),                        getOutputFields(out, config, input),                        getFields(in.getFields()),                        tableMap.get(in),                        getInFilter(in, opt),                        getExtendTables(config),                        getOutFilter(out, opt)                )).append(System.lineSeparator())        );        return script.toString();    }    private String getExtendTables(VerticalValueSplitConfig config) {        return String.format("LATERAL TABLE(multiline_by_value(`%s`)) as t_%s(`%s`)",                config.getField(),                UUID.randomUUID().toString().replace("-", ""),                config.getExtendName());    }

二、问题分析

根据查询数据,flink有两种INT数据类型:

  • INT
  • INT NOT NULL

查看报错信息,这里需要传输INT的参数类型 NOT NULL,必须是INT NOT NULL数据类型。我传递的参数是数据库中的一个字段值,flink无法确定是否为空。我定制函数中的参数类型是int,int不能为空,因此需要将参数类型改为包装类型。

三、解决问题

修改Integer的自定义函数参数类型

package cn.hmb.flink.udf.table;import org.apache.flink.table.functions.TableFunction;public class MultilineByValueTranslate extends TableFunction<Integer> {    public void eval(Integer number) {        for (int i =0; i < number; i++) {            collect(1);        }    }}