OceanBase 分布式計(jì)劃的生成

2021-06-30 09:55 更新

OceanBase 數(shù)據(jù)庫的優(yōu)化器會(huì)分為以下兩大階段來生成分布式的執(zhí)行計(jì)劃。

1. 第一階段:不考慮數(shù)據(jù)的物理分布,生成所有基于本地關(guān)系優(yōu)化的最優(yōu)執(zhí)行計(jì)劃。在本地計(jì)劃生成后,優(yōu)化器會(huì)檢查數(shù)據(jù)是否訪問了多個(gè)分區(qū),或者是否訪問的是本地單分區(qū)表但是用戶使用 HINT 強(qiáng)制采用了并行查詢執(zhí)行。

2. 第二階段:生成分布式計(jì)劃。根據(jù)執(zhí)行計(jì)劃樹,在需要進(jìn)行數(shù)據(jù)重分布的地方,插入 EXCHANGE 節(jié)點(diǎn),從而將原先的本地計(jì)劃樹變成分布式執(zhí)行計(jì)劃。

分布式執(zhí)行計(jì)劃的算子

生成分布式計(jì)劃的過程就是在原始計(jì)劃樹上尋找恰當(dāng)位置插入 EXCHANGE 算子的過程,在自頂向下遍歷計(jì)劃樹的時(shí)候,需要根據(jù)相應(yīng)算子的數(shù)據(jù)處理情況以及輸入算子的數(shù)據(jù)分區(qū)情況,來決定是否需要插入 EXCHANGE 算子。

如下示例為最簡(jiǎn)單的單表掃描:

obclient>CREATE TABLE t1 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT * FROM t1\G;
*************************** 1. row ***************************
Query Plan:
==============================================
|ID|OPERATOR               |NAME    |EST. ROWS|COST  |
------------------------------------------------------
|0 |PX COORDINATOR         |        |500000   |545109|
|1 | EXCHANGE OUT DISTR    |:EX10000|500000   |320292|
|2 |  PX PARTITION ITERATOR|        |500000   |320292|
|3 |   TABLE SCAN          |T1      |500000   |320292|
======================================================

Outputs & filters:
-------------------------------------
  0 - output([T1.V1], [T1.V2]), filter(nil)
  1 - output([T1.V1], [T1.V2]), filter(nil), dop=1
  2 - output([T1.V1], [T1.V2]), filter(nil)
  3 - output([T1.V1], [T1.V2]), filter(nil),
      access([T1.V1], [T1.V2]), partitions(p[0-4])

當(dāng) t1 是一個(gè)分區(qū)表,可以在 TABLE SCAN 上插入配對(duì)的 EXCHANGE 算子,從而將 TABLE SCAN 和 EXCHANGE OUT 封裝成一個(gè) job,可以用于并行的執(zhí)行。

單輸入可下壓算子

單輸入可下壓算子主要包括 AGGREGATION、SORT、GROUP BY 和 LIMIT 算子等,除了 LIMIT 算子以外,其余所列舉的算子都會(huì)有一個(gè)操作的鍵,如果操作的鍵和輸入數(shù)據(jù)的數(shù)據(jù)分布是一致的,則可以做一階段聚合操作,也即 Partition Wise Aggregation。如果操作的鍵和輸入數(shù)據(jù)的數(shù)據(jù)分布是不一致的,則需要做兩階段聚合操作,聚合算子需要做下壓操作。

一階段聚合操作如下例所示:

obclient>CREATE TABLE t2 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT SUM(v1) FROM t2 GROUP BY v1\G;
*************************** 1. row ***************************
Query Plan:
| ======================================================
|ID|OPERATOR               |NAME    |EST. ROWS|COST  |
------------------------------------------------------
|0 |PX COORDINATOR         |        |101      |357302|
|1 | EXCHANGE OUT DISTR    |:EX10000|101      |357297|
|2 |  PX PARTITION ITERATOR|        |101      |357297|
|3 |   MERGE GROUP BY      |        |101      |357297|
|4 |    TABLE SCAN         |t2      |400000   |247403|
======================================================

Outputs & filters:
-------------------------------------
  0 - output([T_FUN_SUM(t2.v1)]), filter(nil)
  1 - output([T_FUN_SUM(t2.v1)]), filter(nil), dop=1
  2 - output([T_FUN_SUM(t2.v1)]), filter(nil)
  3 - output([T_FUN_SUM(t2.v1)]), filter(nil),
      group([t2.v1]), agg_func([T_FUN_SUM(t2.v1)])
  4 - output([t2.v1]), filter(nil),
      access([t2.v1]), partitions(p[0-3])

二階段聚合操作如下例所示:

| ============================================================
|ID|OPERATOR                     |NAME    |EST. ROWS|COST  |
------------------------------------------------------------
|0 |PX COORDINATOR               |        |101      |561383|
|1 | EXCHANGE OUT DISTR          |:EX10001|101      |561374|
|2 |  HASH GROUP BY              |        |101      |561374|
|3 |   EXCHANGE IN DISTR         |        |101      |408805|
|4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|101      |408795|
|5 |     HASH GROUP BY           |        |101      |408795|
|6 |      PX PARTITION ITERATOR  |        |400000   |256226|
|7 |       TABLE SCAN            |t2      |400000   |256226|
============================================================

Outputs & filters:
-------------------------------------
  0 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil)
  1 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil), dop=1
  2 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil),
      group([t2.v2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.v1))])
  3 - output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil)
  4 - (#keys=1, [t2.v2]), output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil), dop=1
  5 - output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil),
      group([t2.v2]), agg_func([T_FUN_SUM(t2.v1)])
  6 - output([t2.v1], [t2.v2]), filter(nil)
  7 - output([t2.v1], [t2.v2]), filter(nil),
      access([t2.v1], [t2.v2]), partitions(p[0-3])

二元輸入算子

二元輸入算子主要考慮 JOIN 算子的情況。對(duì)于 JOIN 算子來說,主要基于規(guī)則來生成分布式執(zhí)行計(jì)劃和選擇數(shù)據(jù)重分布方法。JOIN 算子主要有以下三種聯(lián)接方式:

  • Partition-Wise Join

    當(dāng)左右表都是分區(qū)表且分區(qū)方式相同,物理分布一樣,并且 JOIN 的聯(lián)接條件為分區(qū)鍵時(shí),可以使用以分區(qū)為單位的聯(lián)接方法。如下例所示:

    obclient>CREATE TABLE t3 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 4;
    Query OK, 0 rows affected (0.12 sec)
    
    obclient>EXPLAIN SELECT * FROM t2, t3 WHERE t2.v1 = t3.v1\G;
    *************************** 1. row ***************************
    Query Plan: 
    ===========================================================
    |ID|OPERATOR               |NAME    |EST. ROWS |COST      |
    |0 |PX COORDINATOR         |        |1568160000|1227554264|
    |1 | EXCHANGE OUT DISTR    |:EX10000|1568160000|930670004 |
    |2 |  PX PARTITION ITERATOR|        |1568160000|930670004 |
    |3 |   MERGE JOIN          |        |1568160000|930670004 |
    |4 |    TABLE SCAN         |t2      |400000    |256226    |
    |5 |    TABLE SCAN         |t3      |400000    |256226    |
    ===========================================================
    
    Outputs & filters:
    -------------------------------------
      0 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil)
      1 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil), dop=1
      2 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil)
      3 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil),
          equal_conds([t2.v1 = t3.v1]), other_conds(nil)
      4 - output([t2.v1], [t2.v2]), filter(nil),
          access([t2.v1], [t2.v2]), partitions(p[0-3])
      5 - output([t3.v1], [t3.v2]), filter(nil),
          access([t3.v1], [t3.v2]), partitions(p[0-3])
  • Partial Partition-Wise Join

    當(dāng)左右表中一個(gè)表為分區(qū)表,另一個(gè)表為非分區(qū)表,或者兩者皆為分區(qū)表但是聯(lián)接鍵僅和其中一個(gè)分區(qū)表的分區(qū)鍵相同的情況下,會(huì)以該分區(qū)表的分區(qū)分布為基準(zhǔn),重新分布另一個(gè)表的數(shù)據(jù)。如下例所示:

    obclient>CREATE TABLE t4 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 3;
    Query OK, 0 rows affected (0.12 sec)
    
    obclient>EXPLAIN SELECT * FROM t4, t2 WHERE t2.v1 = t4.v1\G;
    *************************** 1. row ***************************
    Query Plan:
     ===========================================================
    |ID|OPERATOR                     |NAME    |EST. ROWS|COST |
    -----------------------------------------------------------
    |0 |PX COORDINATOR               |        |11880    |17658|
    |1 | EXCHANGE OUT DISTR          |:EX10001|11880    |15409|
    |2 |  NESTED-LOOP JOIN           |        |11880    |15409|
    |3 |   EXCHANGE IN DISTR         |        |3        |37   |
    |4 |    EXCHANGE OUT DISTR (PKEY)|:EX10000|3        |37   |
    |5 |     PX PARTITION ITERATOR   |        |3        |37   |
    |6 |      TABLE SCAN             |t4      |3        |37   |
    |7 |   PX PARTITION ITERATOR     |        |3960     |2561 |
    |8 |    TABLE SCAN               |t2      |3960     |2561 |
    ===========================================================
    
    Outputs & filters:
    -------------------------------------
      0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil)
      1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=1
      2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil),
          conds(nil), nl_params_([t4.v1])
      3 - output([t4.v1], [t4.v2]), filter(nil)
      4 - (#keys=1, [t4.v1]), output([t4.v1], [t4.v2]), filter(nil), dop=1
      5 - output([t4.v1], [t4.v2]), filter(nil)
      6 - output([t4.v1], [t4.v2]), filter(nil),
          access([t4.v1], [t4.v2]), partitions(p[0-2])
      7 - output([t2.v1], [t2.v2]), filter(nil)
      8 - output([t2.v1], [t2.v2]), filter(nil),
          access([t2.v1], [t2.v2]), partitions(p[0-3])
  • 數(shù)據(jù)重分布

    當(dāng)聯(lián)接鍵和左右表的分區(qū)鍵都沒有關(guān)系的情況下,可以根據(jù)規(guī)則計(jì)算來選擇使用 BROADCAST 還是 HASH HASH 的數(shù)據(jù)重分布方式,如下例所示:

    注意 

    只有在并行度大于 1 時(shí), 以下示例中兩種數(shù)據(jù)重分發(fā)方式才有可能被選中。

    obclient>EXPLAIN SELECT /*+ PARALLEL(2)*/* FROM t4, t2 WHERE t2.v2 = t4.v2\G;
    *************************** 1. row ***************************
    Query Plan:
    =================================================================
    |ID|OPERATOR                          |NAME    |EST. ROWS|COST  |
    -----------------------------------------------------------------
    |0 |PX COORDINATOR                    |        |11880    |396863|
    |1 | EXCHANGE OUT DISTR               |:EX10001|11880    |394614|
    |2 |  HASH JOIN                       |        |11880    |394614|
    |3 |   EXCHANGE IN DISTR              |        |3        |37    |
    |4 |    EXCHANGE OUT DISTR (BROADCAST)|:EX10000|3        |37    |
    |5 |     PX BLOCK ITERATOR            |        |3        |37    |
    |6 |      TABLE SCAN                  |t4      |3        |37    |
    |7 |   PX PARTITION ITERATOR          |        |400000   |256226|
    |8 |    TABLE SCAN                    |t2      |400000   |256226|
    =================================================================
    
    Outputs & filters:
    -------------------------------------
      0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil)
      1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=2
      2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil),
          equal_conds([t2.v2 = t4.v2]), other_conds(nil)
      3 - output([t4.v1], [t4.v2]), filter(nil)
      4 - output([t4.v1], [t4.v2]), filter(nil), dop=2
      5 - output([t4.v1], [t4.v2]), filter(nil)
      6 - output([t4.v1], [t4.v2]), filter(nil),
          access([t4.v1], [t4.v2]), partitions(p[0-2])
      7 - output([t2.v1], [t2.v2]), filter(nil)
      8 - output([t2.v1], [t2.v2]), filter(nil),
          access([t2.v1], [t2.v2]), partitions(p[0-3])
          
    
    obclient>EXPLAIN SELECT /*+ PQ_DISTRIBUTE(t2 HASH HASH) PARALLEL(2)*/* FROM t4, t2 
                  WHERE t2.v2 = t4.v2\G;
    *************************** 1. row ***************************
    Query Plan:
     ============================================================
    |ID|OPERATOR                     |NAME    |EST. ROWS|COST  |
    ------------------------------------------------------------
    |0 |PX COORDINATOR               |        |11880    |434727|
    |1 | EXCHANGE OUT DISTR          |:EX10002|11880    |432478|
    |2 |  HASH JOIN                  |        |11880    |432478|
    |3 |   EXCHANGE IN DISTR         |        |3        |37    |
    |4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|3        |37    |
    |5 |     PX BLOCK ITERATOR       |        |3        |37    |
    |6 |      TABLE SCAN             |t4      |3        |37    |
    |7 |   EXCHANGE IN DISTR         |        |400000   |294090|
    |8 |    EXCHANGE OUT DISTR (HASH)|:EX10001|400000   |256226|
    |9 |     PX PARTITION ITERATOR   |        |400000   |256226|
    |10|      TABLE SCAN             |t2      |400000   |256226|
    ============================================================
    
    Outputs & filters:
    -------------------------------------
      0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil)
      1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=2
      2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil),
          equal_conds([t2.v2 = t4.v2]), other_conds(nil)
      3 - output([t4.v1], [t4.v2]), filter(nil)
      4 - (#keys=1, [t4.v2]), output([t4.v1], [t4.v2]), filter(nil), dop=2
      5 - output([t4.v1], [t4.v2]), filter(nil)
      6 - output([t4.v1], [t4.v2]), filter(nil),
          access([t4.v1], [t4.v2]), partitions(p[0-2])
      7 - output([t2.v1], [t2.v2]), filter(nil)
      8 - (#keys=1, [t2.v2]), output([t2.v1], [t2.v2]), filter(nil), dop=2
      9 - output([t2.v1], [t2.v2]), filter(nil)
      10 - output([t2.v1], [t2.v2]), filter(nil),
          access([t2.v1], [t2.v2]), partitions(p[0-3])
以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)