基于Flink 1.13.2问题集锦
时间:2023-10-01 14:03:30
问题:
问题:flink sql 使用自定义函数map作为缓存,但有时缓存数据每次都会被清空。打印线程id也一样,以后有时间看源码。
解决方案:静态缓存private static final ...
package com.nj.snx.app.functions; import com.nj.snx.common.LruLinkedHashMap; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.ScalarFunction; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.function.Consumer; /** * @author :cuicc * @date : 2022/5/16 9:32 */ public class ContainsIndexFunction extends ScalarFunction { //设置静态 private static final LruLinkedHashMap> quotaIdMaps = new LruLinkedHashMap<>(50); /** * @param businessKey 流水号 * @param quotas 逗号隔开 衍生指标需要计算id PC_CODE-INDEX_ID * @param indexId 所有配置中的衍生指标id * @return */ public boolean eval(String businessKey,String quotas,String indexId){ // System.out.println("自定义函数需要加工的衍生指标ID】" quotas ",获取的indexId:" indexId); if(StringUtils.isNotEmpty(quotas)){ List strings = quotaIdMaps.get(businessKey); System.out.println("map----->" strings ":businessKey:" businessKey ";quotas:" quotas ";indexId:" indexId ":当前线程:" Thread.currentThread().getId()); if(strings != null && strings.contains(indexId)){ System.out.println("命中:" indexId ":businessKey:" businessKey ";quotas:" quotas ":当前线程:" Thread.currentThread().getId()); return true; }else if(strings == null){ System.out.println("为null:" strings ":businessKey:" businessKey ";quotas:" quotas ":当前线程:" Thread.currentThread().getId()); String[] quotaIds = quotas.split(","); List quotaIdsList = Arrays.asList(quotaIds); for (int i = 0; i < quotaIdsList.size(); i ) { quotaIdsList.set(i,quotaIdsList.get(i).split("-")[0]); } quotaIdMaps.put(businessKey,quotaIdsList); if(quotaIdsList.contains(indexId)){ return true; } } } return false; } }
问题:使用flink sql的if(条件,a,b)如果B设置为null报错:org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of 'NULL'。
解决:IF(type=1,2,CAST(NULL AS INT))或者IF(type=1,2,CAST(NULL AS STRING))即可解决。
问题:使用over()开窗函数:执行后报告:Flink Window TOPN: The window can only be ordered in ASCENDING mode.
解决方案:一层外套,取TOP-N之后就不会报错了。
1、使用flink mysql cdc 发现bigint unsigned类型字段,capture以后变成字符串类型,用这个分析JsonDebeziumDeserializationSchema。
解决方案:设置debeziumProperties将以下参数传入方法:
properties.setProperty("bigint.unsigned.handling.mode","long");
properties.setProperty("decimal.handling.mode","double");
2、使用Flink-connector-kafka-2.11连接器消耗某个连接器topic时,在kafka消费者组在服务器中找不到?
解:
bin/kafka-consumer-groups.sh --bootstrap-server s203:9092 --group ddd --describe
上面的链接反映了这个问题,实际上是flink没有使用kafka直接控制特性kafka消费,而不是交由kafka去消费,connector自己实现了FlinkKafkaConsumer,且没有按照kafka的feature实现coordinator以及JOIN_GROUOP的逻辑,消费者组在服务器中没有查询。
从kafka服务器找不到,但这些指标可以在Flink Web的Metrics可在指标中查询,
currentOffsets:当前偏移量
committedOffsets:已提交的偏移量。