Spark Trouble Shooter

_temporary目录导致加载动态分区报错

在执行Spark任务时发生如下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: get partition: Value for key partition_app_type is null or empty;
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:104)
at org.apache.spark.sql.hive.HiveExternalCatalog.loadDynamicPartitions(HiveExternalCatalog.scala:888)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:385)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:59)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:68)
at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:164)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:183)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:68)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:898)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:691)
at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:64)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:340)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:379)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:311)
at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:464)
at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:480)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:174)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:779)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: get partition: Value for key partition_app_type is null or empty
at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1693)
at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1664)
at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1276)
at org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:1515)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.sql.hive.client.Shim_v0_14.loadDynamicPartitions(HiveShim.scala:912)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveClientImpl.scala:793)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:791)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:791)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:320)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:258)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:257)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:303)
at org.apache.spark.sql.hive.client.HiveClientImpl.loadDynamicPartitions(HiveClientImpl.scala:791)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveExternalCatalog.scala:900)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:888)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:888)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:95)
... 27 more
Error in query: org.apache.hadoop.hive.ql.metadata.HiveException: get partition: Value for key partition_app_type is null or empty;

当Spark的Job提交后,在调用loadDynamicPartitions方法时抛出以上错误。

在Hive中增加日志,检查加载动态分区时所加载的目录信息:

1
2
3
4
5
6
7
20/07/13 20:59:34,560 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/_temporary
20/07/13 20:59:34,561 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=1
20/07/13 20:59:34,561 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=2
20/07/13 20:59:34,562 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=3
20/07/13 20:59:34,562 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=4
20/07/13 20:59:34,563 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=5
20/07/13 20:59:34,564 WARN Hive: rwm Test - add path to validPartitions : hdfs://router/user/hive/warehouse/offline.db/.hive-staging_hive_2020-07-13_20-53-58_275_3808957393659244553-1/-ext-10000/partition_app_type=6

Hive的getPartition方法中增加日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
20/07/13 20:59:35,413 WARN Hive: rwm Test ---- fullPartSpec info:
20/07/13 20:59:35,413 WARN Hive: rwm Test ---- key=partition_date, value=20200706
20/07/13 20:59:35,413 WARN Hive: rwm Test ---- key=partition_app_type, value=4
20/07/13 20:59:35,413 WARN Hive: rwm Test : getPartition: name='partition_date' value='20200706'
20/07/13 20:59:35,413 WARN Hive: rwm Test : getPartition: name='partition_app_type' value='4'
20/07/13 20:59:35,569 WARN Hive: rwm Test : getPartition: name='partition_date' value='20200706'
20/07/13 20:59:35,569 WARN Hive: rwm Test : getPartition: name='partition_app_type' value='4'
20/07/13 20:59:35,789 WARN Hive: rwm Test ----
20/07/13 20:59:35,789 WARN Hive: rwm Test ---- fullPartSpec info:
20/07/13 20:59:35,789 WARN Hive: rwm Test ---- key=partition_date, value=20200706
20/07/13 20:59:35,789 WARN Hive: rwm Test ---- key=partition_app_type, value=
20/07/13 20:59:35,789 WARN Hive: rwm Test : getPartition: name='partition_date' value='20200706'
20/07/13 20:59:35,789 WARN Hive: rwm Test : getPartition: name='partition_app_type' value=''

发现在解析_temporary目录时,就会抛出之前的异常信息了。

_temporary目录时Spark Write Job写数据时的临时目录,也是用来保证数据提交一致性的一个目录。该目录是在newTaskTempFile方法中,调用FileOutputCommitter的workPath得到的。理论上来说,这个目录在Job提交后(调用commitJob)之后,是会进行清理的,但是因为推测执行的原因,commitJob之后,可能因为某些推测执行的task仍然在运行,这个目录没有被删除掉。

目前的解决方案是在调用loadDynamicPartition方法之前,先判断_temporary是否存在,如果存在,则删除一次。因为这个时候已经commitJob了,所以认为_temporary已经没有用了。增加此逻辑后,对比任务的执行结果,条数一致,认为可行。