锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

Flink_uv统计——使用布隆过滤器

时间:2022-09-16 06:30:01 uv连接器

一:UV在一定时间内(如1小时)网站的独立访客数量(Unique Visitor.)
一天内,同一访客的多次访问仅记录为一名访客。
一般通过用户IP和cookie判断UV值的两种方式。埋点日志一般包含USERID
package networkflow

import networkflow.PageView.UserBehavior
import networkflow.UniqueVistorWithBoolean.UvCount
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessWindowFunction}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

object UniqueVistorWithBoolean {
//case class UvCount(windowEnd:Long,count:Long)
case class UvCount(windowEnd: Long, count: Long)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

// 从文件中读取数据 val resource = getClass.getResource("/UserBehavior.csv") val inputStream: DataStream[String] = env.readTextFile(resource.getPath)  // 转换成样例类型,提取时间戳和watermark val dataStream: DataStream[UserBehavior] = inputStream   .map(data => {     val arr = data.split(",")     UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)   })   .assignAscendingTimestamps(_.timestamp * 1000L)  val uvStream = dataStream   .filter(_.behavior == "pv")   .map( data => ("uv", data.userId) )   .keyBy(_._1)   .timeWindow(Time.hours(1))   .trigger(new MyTrigger())    // 定制触发器 每次数据 直接清空window   .process( new UvCountWithBloom() )  uvStream.print()  env.execute("uv with bloom job") 

}
}
// 定制触发器 每次数据 直接清空window (输入类型),window类型】
class MyTrigger() extends Trigger[(String, Long), TimeWindow]{
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}

override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult =
///每次数据
TriggerResult.FIRE_AND_PURGE
}

// 布隆过滤器的自定义主要是位图和hash函数
class Bloom(size: Long) extends Serializable{
private val cap = size // 默认cap应该是2的整次幂

// hash函数
def hash(value: String, seed: Int): Long = {
var result = 0
for( i <- 0 until value.length ){
result = result * seed value.charAt(i)
}
// 返回hash值,映射cap范围内 截取result
(cap - 1) & result
}
}

// 实现自定义窗口处理函数
class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{
// 定义redis连接和布隆过滤器
lazy val jedis = new Jedis(“hadoop203”, 6379)
//64m的空间 64M=2 6*1000k=2610241024=26*220 =2^29bit 用5亿个位置保存1亿个id
lazy val bloomFilter = new Bloom(1<<29) // 位的个数:2^6(64) * 2^20(1M) * 2^3(8bit) ,64MB

// 只有在收集所有数据和窗口触发计算时才会调用;现在每个数据都被调用一次 因为用了triger
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
// 先定义redis存储位图key 这是关窗时的时间值
val windowEnd = context.window.getEnd.toString

// 此外,当前窗口uv count值,作为状态保存redis里面,用一个叫uvcount的hash表来保存(windowEnd,count) val currentKey = context.window.getEnd.toString //当前窗口uvcount值保存到状态redis里 用一个uvcount的hash表来保存(windowEnd,count) //定义表名称 uvcount val uvCountMap = "uvcount" var count=0L if(jedis.hget(uvCountMap, currentKey) != null) {   // 从redis取出当前窗口uv count值   count=jedis.hget(uvCountMap,currentKey).toLong } // 去重:判断当前userId的hash值对应的位图位置,是否为0 val userId = elements.last._2.toString // 计算hash值对应着位图中的偏移量 61为可调参数 防止哈希碰撞 val offset = bloomFilter.hash(userId, 61) // 用redis位操作命令,取bitmap中对应位值 val isExist = jedis.getbit(windowEnd, offset) if(!isExist){   // 如果不存在,则位图对应位置1,并将count值加1   jedis.setbit(windowEnd, offset, true)   //redis存放数据("uvcount",context.window.getEnd.toString,count 1)   jedis.hset(uvCountMap, currentKey, (count   1).toString) } 

}
}

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章