博客
关于我
sparkstreaming消费kafka数据,如果发生消息积压,如何处理?
阅读量:634 次
发布时间:2019-03-14

本文共 2246 字,大约阅读时间需要 7 分钟。

为什么会出现消息积压?

原因分析

在默认配置下,Spark Streaming通过接收器(或者Direct方式)以生产者生成数据的速率接收数据。当批处理时间(batch processing time)超过批处理间隔(batch interval)的时候,即每个批次数据处理的时间都比Spark Streaming批处理间隔时间长;随着越来越多的数据被接收,而数据的处理速度无法跟上,系统可能会出现数据堆积,进一步可能导致Executor端内存溢出(OOM)的问题从而引发失败。

在Spark 1.5版本之前,为了解决这个问题,对于基于接收器的数据接收器,我们可以通过配置spark.streaming.receiver.maxRate参数来限制每个接收器每秒钟最大可以接收的记录数量;对于Direct Approach的数据接收,我们可以通过配置spark.streaming.kafka.maxRatePerPartition参数来限制每次作业中每个Kafka分区最少读取的记录条数。通过这种方式限制接收速率,能够合理配合处理能力。然而,这种方法还有几个问题:

  • 需要手动估算集群的处理速度以及消息数据的产生速度
  • 这两种方式都需要人工干预,修改好相关参数后,还需要手动重启Spark Streaming
  • 如果当前集群的处理能力高于我们配置的maxRate,而且生产者产生的数据量高于maxRate,这样会导致集群资源利用率低下,并且也会导致数据无法及时处理
  • 反压机制的介绍

    那么是否有办法不需要人工干预让Spark Streaming系统自动处理这些问题呢?是的!Spark 1.5版本引入了反压(Back Pressure)机制,其通过动态收集系统的一些数据来自动适配集群的数据处理能力。

    之前的Spark Streaming架构

    在Spark 1.5版本之前,Spark Streaming的架构如下:

  • 数据源源不断通过接收器接收,当数据被接收到后,它会被存储到Block Manager中;为了避免数据丢失,还会备份到其他的Block Manager中
  • Receiver Tracker接收到这些存储的Block IDs,并维护一个时间到这些Block IDs的关系
  • Job Generator每隔batchInterval的时间就生成一个JobSet
  • Job Scheduler运行上面生成的JobSet
  • Spark 1.5及以后的架构

    为了实现对数据传输速率的自动调节,在原有架构上新增了一个名为RateController的组件,它集成在StreamingListener中,负责监控所有作业的onBatchCompleted事件。基于processingDelay,samplingDelay、当前批次记录条数及处理完成事件,RateController可以估算出一个速率。这个速率会被用来更新流的每秒能处理的最大记录数。速率估算器(RateEstimator)有多种实现,但目前Spark 2.2只支持基于PID的速率估算器。

    具体来说,RateController内会存下计算好的最大速率,这个速率在处理完onBatchCompleted事件后会被推送到ReceiverSupervisorImpl。这样接收器就知道每秒能接收多少条数据。需要注意的是,当用户配置了spark.streaming.kafka.maxRatePerPartition时,最终接收的数据量取决于三个值中的最小值,即每个接收器或每个Kafka分区每秒能处理的数据不会超过spark.streaming.receiver.maxRate或spark.streaming.kafka.maxRatePerPartition的值。

    具体流程可以参考下图所示(图中描述了详细的数据流和反压机制的调节过程)。

    如何启用反压机制

    在Spark中启用反压机制非常简单,只需将spark.streaming.backpressure.enabled设置为true即可,默认值为false。

    在使用反压机制时,还需要注意一些相关参数:

    • spark.streaming.backpressure.initialRate:在启用反压机制时,每个接收器接收第一批数据时的最大初始速率,默认值未设置
    • spark.streaming.backpressure.rateEstimator:速率估算器类,默认值为pid,目前Spark只支持这个,用户可以根据需要实现
    • spark.streaming.backpressure.pid.proportional:用于响应错误的权重(最后批次和当前批次之间的变化),默认值为1,只能设置为非负数
    • spark.streaming.backpressure.pid.integral:用于对错误积累做出响应,有抑制作用,默认值为0.2
    • spark.streaming.backpressure.pid.derived:用于对错误趋势做出响应,可能会导致批次大小波动,但也能辅助快速增加或减少处理能力,默认值为0
    • spark.streaming.backpressure.pid.minRate:可以估算的最低费率,,默认值为100,设置为非负数

    通过以上设置,可以有效地利用反压机制来动态调整数据处理速率,最大限度地发挥集群的处理能力,同时避免因为消息堆积而引发的资源浪费或系统故障。

    转载地址:http://tfooz.baihongyu.com/

    你可能感兴趣的文章
    Oracle 序列sequence 开始于某个值(10)执行完nextval 发现查出的值比10还小的解释
    查看>>
    oracle 执行一条查询语句,把数据加载到页面或者前台发生的事情
    查看>>
    oracle 批量生成建同义词语句和付权语句
    查看>>
    oracle 抓包工具,shell 安装oracle和pfring(抓包) 及自动环境配置
    查看>>
    Oracle 拆分以逗号分隔的字符串为多行数据
    查看>>
    Oracle 排序中使用nulls first 或者nulls last 语法
    查看>>
    oracle 插入date日期类型的数据、插入从表中查出的数据,使用表中的默认数据
    查看>>
    Oracle 操作笔记
    查看>>
    oracle 数据库 安装 和优化
    查看>>
    oracle 数据库dg搭建规范1
    查看>>
    Oracle 数据库常用SQL语句(1)
    查看>>
    Oracle 数据库特殊查询总结
    查看>>
    Oracle 数据类型
    查看>>
    oracle 数据迁移 怎么保证 和原表的数据顺序一致_一个比传统数据库快 1001000 倍的数据库,来看一看?...
    查看>>
    oracle 时间函数
    查看>>
    oracle 时间转化函数及常见函数 .
    查看>>
    Oracle 权限(grant、revoke)
    查看>>
    oracle 查询clob
    查看>>
    Oracle 比较 B-tree 和 Bitmap 索引
    查看>>
    Oracle 注意点大全
    查看>>