锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

基于Flink 1.13.2问题集锦

时间:2023-10-01 14:03:30 固态继电器s203zl

问题:

问题:flink sql 使用自定义函数map作为缓存,但有时缓存数据每次都会被清空。打印线程id也一样,以后有时间看源码。

解决方案:静态缓存private static final ...

package com.nj.snx.app.functions;  import com.nj.snx.common.LruLinkedHashMap; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.ScalarFunction;  import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.function.Consumer;  /**  * @author :cuicc  * @date : 2022/5/16 9:32  */ public class ContainsIndexFunction  extends ScalarFunction {      //设置静态     private static final LruLinkedHashMap> quotaIdMaps = new LruLinkedHashMap<>(50);      /**      * @param businessKey 流水号      * @param quotas  逗号隔开 衍生指标需要计算id  PC_CODE-INDEX_ID      * @param indexId 所有配置中的衍生指标id      * @return      */    public boolean eval(String businessKey,String quotas,String indexId){ //       System.out.println("自定义函数需要加工的衍生指标ID】" quotas ",获取的indexId:" indexId);        if(StringUtils.isNotEmpty(quotas)){            List strings = quotaIdMaps.get(businessKey);            System.out.println("map----->" strings ":businessKey:" businessKey ";quotas:" quotas ";indexId:" indexId ":当前线程:" Thread.currentThread().getId());            if(strings != null && strings.contains(indexId)){                System.out.println("命中:" indexId ":businessKey:" businessKey ";quotas:" quotas ":当前线程:" Thread.currentThread().getId());                return true;            }else if(strings == null){                System.out.println("为null:" strings ":businessKey:" businessKey ";quotas:" quotas ":当前线程:" Thread.currentThread().getId());                String[] quotaIds = quotas.split(",");                List quotaIdsList = Arrays.asList(quotaIds);                for (int i = 0; i < quotaIdsList.size(); i  ) {                    quotaIdsList.set(i,quotaIdsList.get(i).split("-")[0]);                }                quotaIdMaps.put(businessKey,quotaIdsList);                if(quotaIdsList.contains(indexId)){                    return true;                }            }        }        return false;    } } 

问题:使用flink sql的if(条件,a,b)如果B设置为null报错:org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of 'NULL'

解决:IF(type=1,2,CAST(NULL AS INT))或者IF(type=1,2,CAST(NULL AS STRING))即可解决。

问题:使用over()开窗函数:执行后报告:Flink Window TOPN: The window can only be ordered in ASCENDING mode.

解决方案:一层外套,取TOP-N之后就不会报错了。

1、使用flink mysql cdc 发现bigint unsigned类型字段,capture以后变成字符串类型,用这个分析JsonDebeziumDeserializationSchema。

解决方案:设置debeziumProperties将以下参数传入方法:

properties.setProperty("bigint.unsigned.handling.mode","long");

properties.setProperty("decimal.handling.mode","double");

2、使用Flink-connector-kafka-2.11连接器消耗某个连接器topic时,在kafka消费者组在服务器中找不到?

解:

bin/kafka-consumer-groups.sh  --bootstrap-server s203:9092 --group ddd --describe

没有相应的消费者。[FLINK-11325] Flink Consumer Kafka Topic Not Found ConsumerID - ASF JIRAhttps://issues.apache.org/jira/browse/FLINK-11325

上面的链接反映了这个问题,实际上是flink没有使用kafka直接控制特性kafka消费,而不是交由kafka去消费,connector自己实现了FlinkKafkaConsumer,且没有按照kafka的feature实现coordinator以及JOIN_GROUOP的逻辑,消费者组在服务器中没有查询。

从kafka服务器找不到,但这些指标可以在Flink Web的Metrics可在指标中查询,

currentOffsets:当前偏移量
committedOffsets:已提交的偏移量

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章