本文共 9860 字,大约阅读时间需要 32 分钟。
This document explains how we are planning to add support in Hive's optimizer for pushing filters down into physical access methods. This is an important optimization for minimizing the amount of data scanned and processed by an access method (e.g. for an indexed key lookup), as well as reducing the amount of data passed into Hive for further query evaluation.
Below are the main use cases we are targeting.
There are a number of different parts to the overall effort.
To achieve the loosest possible coupling, we are going to use a string as the primary representation for the filter. In particular, the string will be in the form produced when Hive unparses an ExprNodeDesc
, e.g.
|
In general, this comes out as valid SQL, although it may not always match the original SQL exactly, e.g.
|
becomes
|
Column names in this string are unqualified references to the columns of the table over which the filter operates, as they are known in the Hive metastore. These column names may be different from those known to the underlying storage; for example, the HBase storage handler maps Hive column names to HBase column names (qualified by column family). Mapping from Hive column names is the responsibility of the code interpreting the filter string.
As mentioned above, we want to avoid duplication in code which interprets the filter string (e.g. parsing). As a first cut, we will provide access to the ExprNodeDesc
tree by passing it along in serialized form as an optional companion to the filter string. In followups, we will provide parsing utilities for the string form.
We will also provide an IndexPredicateAnalyzer class capable of detecting simple
subexpressions in anExprNodeDesc
tree. In followups, we will provide support for discriminating and combining more complex indexable subexpressions.
|
The approach for passing the filter down to the input format will follow a pattern similar to what is already in place for pushing column projections down.
org.apache.hadoop.hive.serde2.ColumnProjectionUtils
encapsulates the pushdown communicationHiveInputFormat
call ColumnProjectionUtils
to set the projection pushdown property (READ_COLUMN_IDS_CONF_STR) on a jobConf before instantiating a RecordReader
RecordReader
calls ColumnProjectionUtils
to access this propertyFor filter pushdown:
HiveInputFormat
sets properties hive.io.filter.text
(string form) and hive.io.filter.expr.serialized
(serialized form of ExprNodeDesc) in the job conf before calling getSplits as well as before instantiating a record readerNote that getSplits needs to be involved since the selectivity of the filter may prune away some of the splits which would otherwise be accessed. (In theory column projection could also influence the split boundaries, but we'll leave that for a followup.)
So, where will HiveInputFormat
get the filter expression to be passed down? Again, we can start with the pattern for column projections:
org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory's
ColumnPrunerTableScanProc
populates the pushdown information in TableScanOperator
HiveInputFormat.initColumnsNeeded
retrieves this information from the TableScanOperator
For filter pushdown, the equivalent is TableScanPPD
in org.apache.hadoop.hive.ql.ppd.OpProcFactory
. Currently, it calls createFilter
, which collapsed expressions into a single expression called condn, and then sticks that on a new FilterOperator
. We can call condn.getExprString() and store the result on TableScanOperator
.
Hive configuration parameter hive.optimize.ppd.storage
can be used to enable or disable pushing filters down to the storage handler. This will be enabled by default. However, if hive.optimize.ppd
is disabled, then this implicitly prevents pushdown to storage handlers as well.
We are starting with non-native tables only; we'll revisit this for pushing filters down to indexes and builtin storage formats such as RCFile.
Consider a filter like
|
Suppose a storage handler is capable of implementing the range scanfor x > 3
, but does not have a facility for evaluating {
In order for this to be possible, the storage handler needs to be able to negotiate the decomposition with Hive. This means that Hive gives
the storage handler the entire filter, and the storage handler passes back a "residual": the portion that needs to be evaluated by Hive. A null residual indicates that the storage handler was able to deal with the entire filter on its own (in which case noFilterOperator
is needed). In order to support this interaction, we will introduce a new (optional) interface to be implemented by storage handlers:
|
Hive's optimizer (during predicate pushdown) calls the decomposePredicate method, passing in the full expression and receiving back the decomposition (or null to indicate that no pushdown was possible). The pushedPredicate
gets passed back to the storage handler's input format later, and the residualPredicate
is attached to the FilterOperator
.
It is assumed that storage handlers which are sophisticated enough to implement this interface are suitable for tight coupling to the ExprNodeDesc
representation.
Again, this interface is optional, and pushdown is still possible even without it. If the storage handler does not implement this interface, Hive will always implement the entire expression in the FilterOperator
, but it will still provide the expression to the storage handler's input format; the storage handler is free to implement as much or as little as it wants.
转:
转载地址:http://nfvmi.baihongyu.com/