一、问题
flink编写自定义函数,调用时报错:Invalid input arguments
自定义函数和调用代码如下:
package cn.hmb.flink.udf.table;import org.apache.flink.table.functions.TableFunction;public class MultilineByValueTranslate extends TableFunction<Integer> { public void eval(int number) { for (int i =0; i < number; i++) { collect(1); } }}
@Override protected String handle(List<InputConfig> input, OperatorConfig opt, List<OutputConfig> output, Map<Object, String> tableMap) { InputConfig in = input.get(0); VerticalValueSplitConfig config = (VerticalValueSplitConfig) opt; StringBuffer script = new StringBuffer(); script.append("CREATE TEMPORARY FUNCTION IF NOT EXISTS multiline_by_value AS 'cn.hmb.flink.udf.table.MultilineByValueTranslate';"); script.append(System.lineSeparator()); output.forEach(out -> script.append(String.format("insert into %s select %s from (select %s from %s where %s) temp, %s where %s;", tableMap.get(out), getOutputFields(out, config, input), getFields(in.getFields()), tableMap.get(in), getInFilter(in, opt), getExtendTables(config), getOutFilter(out, opt) )).append(System.lineSeparator()) ); return script.toString(); } private String getExtendTables(VerticalValueSplitConfig config) { return String.format("LATERAL TABLE(multiline_by_value(`%s`)) as t_%s(`%s`)", config.getField(), UUID.randomUUID().toString().replace("-", ""), config.getExtendName()); }
二、问题分析根据查询数据,flink有两种INT数据类型:
- INT
- INT NOT NULL
查看报错信息,这里需要传输INT的参数类型 NOT NULL,必须是INT NOT NULL数据类型。我传递的参数是数据库中的一个字段值,flink无法确定是否为空。我定制函数中的参数类型是int,int不能为空,因此需要将参数类型改为包装类型。
三、解决问题修改Integer的自定义函数参数类型
package cn.hmb.flink.udf.table;import org.apache.flink.table.functions.TableFunction;public class MultilineByValueTranslate extends TableFunction<Integer> { public void eval(Integer number) { for (int i =0; i < number; i++) { collect(1); } }}