SparkSQL并发写入orc、parquet表的异常问题排查

文章目录

  • 一、问题描述
  • 二、Hive 执行overwrite语句时没有删除旧数据的原因
  • 三、SparkSQL 失败的原因
  • 四、解决方案
    • 1、排查过程
    • 2、解决方案
    • 3、spark.sql.hive.convertInsertingPartitionedTable参数的作用

一、问题描述

有业务反馈某张表的分区下有重复数据,该分区数据的写入任务之前曾用sparkSQL执行过,跑失败了后切换成Hive执行成功了。看了下该分区对应的目录,发现目录下同时存在sparkSQL和Hive SQL生成的文件(也就是Hive任务执行时未删除旧的sparkSQL的数据文件)。因此查询的时候会发现数据重复的现象

执行的写入sql都是insert overwrite,因此正常写入数据之前要删除旧的数据才合理。但显然后面执行的Hive SQL执行时并未删除旧的SparkSQL生成的数据文件。

二、Hive 执行overwrite语句时没有删除旧数据的原因

当Hive执行insert overwrite写入数据到分区时,根据分区的元数据是否存在来决定是否要清空分区目录下的所有文件:

1、如果分区元数据存在(HiveMetaStore中有分区记录),则清空分区下的所有元数据

2、如果分区元数据不存在(仅针对外部表),Hive不会去自动推测分区对应的路径,也就不会去删除该分区下的所有文件

https://cloudera.ericlin.me/2015/05/hive-insert-overwrite-does-not-remove-existing-data/

后面排查了下HiveMetaStore的元数据,发现该分区的创建时间是在22:57分(Hive SQL执行完后),因此可以推测出之前失败的SparkSQL任务虽然生成了数据文件,但是未生成对应的Hive元数据,因此出现这种情况。

三、SparkSQL 失败的原因

在spark执行任务时,会创建一个临时目录,这个临时目录路径为 ${outputPath}/_temporary (outputPath为任务设置的OutputFormat的outputPath)。spark会将执行过程中生成的文件先落地到临时目录中,最终任务执行成功了才全部移动到最终的输出目录。

最后,这个临时目录会在任务执行结束后被删除。

具体的流程可见:https://blog.csdn.net/u013332124/article/details/92001346

经过多次测试发现,SparkSQL执行某张表的分区写入时,它生成的临时目录位于表路径下。如果这张表不是ORC或者Parquet表,它的临时目录就和Hive比较像,如/a/test/.hive-staging_hive_2020-10-23_16-41-55_549_7302943708666306032-5 (/a/test为表路径),如果这张表是ORC或者Parquet表,sparkSQL生成的临时目录就变成/a/test/_temporary。

上面业务写入的那张表是ORC表,因此如果有多个任务在同时写入,就会有问题:任务结束时spark要删除掉这个临时目录,而其他的任务也在使用这个临时目录。

上面的业务SparkSQL任务的错误原因就可以理通了:

比如任务A执行结束,要删除/a/test/_temporary这个临时目录,但是任务B还在执行,下面还有任务B的一些临时文件存在,这时候任务A执行的删除操作就会报错,具体信息如下图:

上图提示临时目录下还有其他的文件,因此无法删除临时目录。同时,SparkSQL在删除完临时目录后才会添加Hive元数据,因此这里删除临时目录失败就导致了后面的数据重复的问题

Hive在写入数据的时候也会创建临时目录,但是在非动态分区的写入模式下,Hive创建的临时目录是在具体的分区路径下,比如/a/test/datep=20201022,因此各个分区的写入任务是可以同时并行的。

四、解决方案

1、排查过程

在网上查了一圈,均为发现有合适的解决方案,所以打算自己跟一下sparkSQL生成执行计划的代码,看一下为何内部表可以根据Hive的规范生成临时目录,而外部表却不行。

首先我们通过arthas跟踪FileOutputFormat.setOutputPath()方法的堆栈,发现了内部表和外部表的区别:

我们发现设置输出目录时外部表使用的执行计划类是InsertIntoHadoopFsRelationCommand,而内部表的则是InsertIntoHiveTable。所以,我们得需要从执行计划生成代码入手,看为什么生成的执行计划不同。

后面就是结合arthas一路跟代码,最终定位到了RelationConversions类。

/**
 * Relation conversion from metastore relations to data source relations for better performance
 *
 * - When writing to non-partitioned Hive-serde Parquet/Orc tables
 * - When scanning Hive-serde Parquet/ORC tables
 *
 * This rule must be run before all other DDL post-hoc resolution rules, i.e.
 * `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
 */
case class RelationConversions(
    conf: SQLConf,
    sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
  private def isConvertible(relation: HiveTableRelation): Boolean = {
    isConvertible(relation.tableMeta)
  }

  private def isConvertible(tableMeta: CatalogTable): Boolean = {
    val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
    serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
      serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
  }

  private val metastoreCatalog = sessionCatalog.metastoreCatalog

  override def apply(plan: LogicalPlan): LogicalPlan = {
    plan resolveOperators {
      // Write path
      case InsertIntoStatement(
           r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
          if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
            (!r.isPartitioned || SQLConf.get.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE))
            && isConvertible(r) =>
        InsertIntoStatement(metastoreCatalog.convert(r), partition,
          query, overwrite, ifPartitionNotExists)

      // Read path
      case relation: HiveTableRelation
          if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
        metastoreCatalog.convert(relation)

      // CTAS
      case CreateTable(tableDesc, mode, Some(query))
          if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty &&
            isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
        // validation is required to be done here before relation conversion.
        DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
        OptimizedCreateHiveTableAsSelectCommand(
          tableDesc, query, query.output.map(_.name), mode)
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

SparkSQL的执行计划会经过RelationConversions类进行一些转换,比如将HiveTableRelation转换为HadoopFsRelation。

这里的代码逻辑是碰到Hive表存储格式如果是ORC或者Parquet,并且spark.sql.hive.convertInsertingPartitionedTable、spark.sql.hive.convertMetastoreParquet / spark.sql.hive.convertMetastoreOrc都为true,则SparkSQL会进行执行计划的转换。

2、解决方案

因此只要我们将spark.sql.hive.convertInsertingPartitionedTable和spark.sql.hive.convertMetastoreParquet / spark.sql.hive.convertMetastoreOrc都设置为false,即可解决我们遇到的问题

3、spark.sql.hive.convertInsertingPartitionedTable参数的作用

Hive写入Parquet/Orc表时,实现了自己的一个SerDe,Spark觉的Hive的SerDe性能比较低,于是实现了自己的SerDe。因此碰到Parquet、Orc的表数据写入时,SparkSQL默认使用自己内部的SerDe(这里感觉Spark没考虑到并发写入不同分区的问题)。

(0)

相关推荐

  • Impala 3.4在网易的最新实践

    编辑整理:甘顺 出品平台:DataFunTalk 导读:Impala是Cloudera公司主导开发的交互式查询系统,它提供SQL语义和计算能力,但是本身并不存储数据.本次分享会聚焦于Impala在网易 ...

  • 大数据开发技术之Spark SQL的多种使用方法

    Spark SQL支持多种数据源,如JDBC.HDFS.HBase.它的内部组件,如SQL的语法解析器.分析器等支持重定义进行扩展,能更好的满足不同的业务场景.与Spark Core无缝集成,提供了D ...

  • Hive,Hive on Spark和SparkSQL区别

    Hive on Mapreduce Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结 ...

  • 大数据查询Druid,Impala,Presto,SparkSQL对比

    一.OLAP和OLTP的区别 OLAP(On-Line Analytical Processing)联机分析处理,也称为面向交易的处理过程,其基本特征是前台接收的用户数据可以立即传送到计算中心进行处理 ...

  • Spark计算引擎之SparkSQL详解

    一.Spark SQL 二. Spark SQL 1. Spark SQL概述 1.1. Spark SQL的前世今生 Shark是一个为Spark设计的大规模数据仓库系统,它与Hive兼容.Shar ...

  • HeapDump性能社区Young GC异常问题排查实战案例精选合集

    在高并发下,Java程序的非正常GC带来的影响往往会被进一步放大.不管是「GC频率过快」还是「GC耗时太长」,由于GC期间都存在Stop The World问题,因此很容易导致服务超时,引发性能问题. ...

  • 实务丨如何从三张表判断财务数据异常?

    有一天,我正在极其认真地研究一份财务报告,忽然听到背后老大幽幽的一句:看这么仔细看啥?这数一看就有问题.然后她就飘走了... 我一边感慨'姜还是老的辣',一边开始思考这样一个问题:究竟如何才能一眼就判 ...

  • 实务 I 如何从三张表判断财务数据异常?

    有一天,我正在极其认真地研究一份财务报告,忽然听到背后老大幽幽的一句:看这么仔细看啥?这数一看就有问题.然后她就飘走了... 我一边感慨'姜还是老的辣',一边开始思考这样一个问题:究竟如何才能一眼就判 ...

  • 从三张表就能财务数据异常,你学会了吗?

    导语:上市是一项任重而道远的工作,财务准备更是IPO过程中最关键的环节之一.而作为企业家,看懂基础的财务数据是非常重要的. 财务工作的基本要求,就是细致.严谨,既要保证工作成果的准确性,也要善于从小细 ...

  • 孤独症儿童异常行为对策表,建议家长收藏!

    导读:他们有着明亮清澈的双眸,却极力避免眼神接触:不厌其烦地重复着某个动作,却不愿与人语言交流:对某个玩具情有独钟,却很难建立一段长期的友谊--他们是自闭症患者,又称孤独症. 如夜幕中孤独闪烁着的星星 ...

  • 当尿液出现这些异常表现时,或与这5个因素有关,不要毫不关心

    人体的尿液是食物.水等代谢之后的产物,这一代谢过程需要身体多个消化器官的参与,因此,从某种程度上来说,尿液的正常与否,也可以反映身体的健康状况. 正常的尿液是浅黄或透明的,不伴有特殊的气味,但不少人在 ...

  • spark利用sparkSQL将数据写入hive两种通用方式实现及比较

    spark利用sparkSQL将数据写入hive两种通用方式实现及比较

  • 肝脏出现异常,脸部先知道,当脸上出现这3种表现时,要引起重视

    肝脏是人体中至关重要的代谢器官,人体摄入的食物需要经过肝脏的代谢与分解,才能最终被身体所吸收,在这个过程当中,肝脏还会排出对身体有害的毒素与浊物,守护身体的健康.从一定程度上来说,肝脏部位的健康决定了 ...

  • Excel表异常变大又慢又卡,如何瘦身减肥?

    点击上方蓝字关注 Excel应用大全 置顶公众号或设为星标,避免收不到文章 每天分享Excel应用技巧,让你不仅用得上,还用的爽! 微信公众号:Excel应用大全(ExcelApp520) 个人微信号 ...