OceanBase EXCHANGE

2021-06-30 09:14 更新

EXCHANGE 算子用于線程間進行數(shù)據(jù)交互的算子。

EXCHANGE 算子適用于在分布式場景,一般都是成對出現(xiàn)的,數(shù)據(jù)源端有一個 OUT 算子,目的端會有一個 IN 算子。

EXCH-IN/OUT

EXCH-IN/OUT 即 EXCHANGE IN/ EXCHANGE OUT 用于將多個分區(qū)上的數(shù)據(jù)匯聚到一起,發(fā)送到查詢所在的主節(jié)點上。

如下例所示,下面的查詢中訪問了 5 個分區(qū)的數(shù)據(jù)(p0-p4),其中 1 號算子接受 2 號算子產(chǎn)生的輸出,并將數(shù)據(jù)傳出;0 號算子接收多個分區(qū)上 1 號算子產(chǎn)生的輸出,并將結(jié)果匯總輸出。

obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 5;
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT * FROM t\G;
*************************** 1. row ***************************
Query Plan:
==============================================
|ID|OPERATOR           |NAME|EST. ROWS|COST  |
----------------------------------------------
|0 |EXCHANGE IN DISTR  |    |500000   |545109|
|1 | EXCHANGE OUT DISTR|    |500000   |320292|
|2 |  TABLE SCAN       |T   |500000   |320292|
==============================================

Outputs & filters:
-------------------------------------
  0 - output([T.C1], [T.C2]), filter(nil)
  1 - output([T.C1], [T.C2]), filter(nil)
  2 - output([T.C1], [T.C2]), filter(nil),
      access([T.C1], [T.C2]), partitions(p[0-4])

上述示例的執(zhí)行計劃展示中的 outputs & filters 詳細列出了 EXCH-IN/OUT 算子的輸出信息如下:

信息名稱

含義

output

該算子輸出的表達式。

filter

該算子上的過濾條件。

由于示例中 EXCH-IN/OUT 算子沒有設(shè)置 filter,所以為 nil。

EXCH-IN/OUT (REMOTE)

EXCH-IN/OUT (REMOTE) 算子用于將遠程的數(shù)據(jù)(單個分區(qū)的數(shù)據(jù))拉回本地。

如下例所示,在 A 機器上創(chuàng)建了一張非分區(qū)表,在 B 機器上執(zhí)行查詢,讀取該表的數(shù)據(jù)。此時,由于待讀取的數(shù)據(jù)在遠程,執(zhí)行計劃中分配了 0 號算子和 1 號算子來拉取遠程的數(shù)據(jù)。其中,1 號算子在 A 機器上執(zhí)行,讀取 t 表的數(shù)據(jù),并將數(shù)據(jù)傳出;0 號算子在 B 機器上執(zhí)行,接收 1 號算子產(chǎn)生的輸出。

obclient>CREATE TABLE t (c1 INT, c2 INT);
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT * FROM t\G;
*************************** 1. row ***************************
Query Plan:
===============================================
|ID|OPERATOR            |NAME|EST. ROWS|COST  |
-----------------------------------------------
|0 |EXCHANGE IN REMOTE  |    |100000   |109029|
|1 | EXCHANGE OUT REMOTE|    |100000   |64066 |
|2 |  TABLE SCAN        |T   |100000   |64066 |
===============================================

Outputs & filters:
-------------------------------------
  0 - output([T.C1], [T.C2]), filter(nil)
  1 - output([T.C1], [T.C2]), filter(nil)
  2 - output([T.C1], [T.C2]), filter(nil),
      access([T.C1], [T.C2]), partitions(p0)

上述示例的執(zhí)行計劃展示中的 outputs & filters 詳細列出了 EXCH-IN/OUT (REMOTE) 算子的輸出信息,字段的含義與 EXCH-IN/OUT 算子相同。

EXCH-IN/OUT (PKEY)

EXCH-IN/OUT (PKEY) 算子用于數(shù)據(jù)重分區(qū)。它通常用于二元算子中,將一側(cè)孩子節(jié)點的數(shù)據(jù)按照另外一些孩子節(jié)點的分區(qū)方式進行重分區(qū)。

如下示例中,該查詢是對兩個分區(qū)表的數(shù)據(jù)進行聯(lián)接,執(zhí)行計劃將 s 表的數(shù)據(jù)按照 t 的分區(qū)方式進行重分區(qū),4 號算子的輸入是 s 表掃描的結(jié)果,對于 s 表的每一行,該算子會根據(jù) t 表的數(shù)據(jù)分區(qū),以及根據(jù)查詢的聯(lián)接條件,確定一行數(shù)據(jù)應(yīng)該發(fā)送到哪個節(jié)點進行。

此外,可以看到 3 號算子是一個 EXCHANGE IN MERGE SORT DISTR,它是一個特殊的 EXCHANGE IN 算子,它用于在匯總多個分區(qū)的數(shù)據(jù)時,會進行一定的歸并排序,在這個執(zhí)行計劃中,3 號算子接收到的每個分區(qū)的數(shù)據(jù)都是按照 c1 有序排列的,它會對每個接收到的數(shù)據(jù)進行歸并排序,從而保證結(jié)果輸出結(jié)果也是按照 c1 有序排列的。

obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 5;
Query OK, 0 rows affected (0.12 sec)

obclient>CREATE TABLE s (c1 INT PRIMARY KEY, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT * FROM s, t WHERE s.c1 = t.c1\G;
*************************** 1. row ***************************
Query Plan:
 ===============================================================
|ID|OPERATOR                       |NAME|EST. ROWS |COST      |
---------------------------------------------------------------
|0 |EXCHANGE IN DISTR              |    |1960200000|3090308367|
|1 | EXCHANGE OUT DISTR            |    |1960200000|1327558071|
|2 |  MERGE JOIN                   |    |1960200000|1327558071|
|3 |   EXCHANGE IN MERGE SORT DISTR|    |400000    |436080    |
|4 |    EXCHANGE OUT DISTR (PKEY)  |    |400000    |256226    |
|5 |     TABLE SCAN                |S   |400000    |256226    |
|6 |   TABLE SCAN                  |T   |500000    |320292    |
===============================================================

Outputs & filters:
-------------------------------------
  0 - output([S.C1], [S.C2], [T.C1], [T.C2]), filter(nil)
  1 - output([S.C1], [S.C2], [T.C1], [T.C2]), filter(nil)
  2 - output([S.C1], [S.C2], [T.C1], [T.C2]), filter(nil),
      equal_conds([S.C1 = T.C1]), other_conds(nil)
  3 - output([S.C1], [S.C2]), filter(nil), sort_keys([S.C1, ASC])
  4 - (#keys=1, [S.C1]), output([S.C1], [S.C2]), filter(nil)
  5 - output([S.C1], [S.C2]), filter(nil),
      access([S.C1], [S.C2]), partitions(p[0-3])
  6 - output([T.C1], [T.C2]), filter(nil),
      access([T.C1], [T.C2]), partitions(p[0-4])

上述示例的執(zhí)行計劃展示中的 outputs & filters 詳細列出了 EXCH-IN/OUT (PKEY) 算子的輸出信息如下:

信息名稱

含義

output

該算子輸出的表達式。

filter

該算子上的過濾條件。

由于示例中 EXCH-IN/OUT(PKEY)算子沒有設(shè)置 filter,所以為 nil。

pkey

按照哪一列進行重分區(qū)。

例如,#keys=1, [s.c1] 表示按照 c1 這一列重分區(qū)

EXCH-IN/OUT (HASH)

EXCH-IN/OUT (HASH) 算子用于對數(shù)據(jù)使用一組 HASH 函數(shù)進行重分區(qū)。

如下例所示的執(zhí)行計劃中,3-5 號以及 7-8 號是兩組使用 HASH 重分區(qū)的 EXCHANGE 算子。這兩組算子的作用是把 t 表和 s 表的數(shù)據(jù)按照一組新的 HASH 函數(shù)打散成多份,在這個例子中 HASH 的列為 t.c2 和 s.c2,這保證了 c2 取值相同的行會被分發(fā)到同一份中?;谥胤謪^(qū)之后的數(shù)據(jù),2 號算子 HASH JOIN 會對每一份數(shù)據(jù)按照 t.c2= s.c2 進行聯(lián)接。

此外,由于查詢中執(zhí)行了并行度為 2,計劃中展示了 dop = 2 (dop 是 Degree of Parallelism 的縮寫)。

obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>CREATE TABLE s (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT /*+PQ_DISTRIBUTE(@"SEL$1" ("TEST.S"@"SEL$1" ) HASH HASH), 
             PARALLEL(2)*/ * FROM t, s WHERE t.c2 = s.c2\G;
*************************** 1. row ***************************
Query Plan:
=================================================================
|ID|OPERATOR                     |NAME    |EST. ROWS |COST      |
-----------------------------------------------------------------
|0 |PX COORDINATOR               |        |1568160000|2473629500|
|1 | EXCHANGE OUT DISTR          |:EX10002|1568160000|1063429263|
|2 |  HASH JOIN                  |        |1568160000|1063429263|
|3 |   EXCHANGE IN DISTR         |        |400000    |436080    |
|4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|400000    |256226    |
|5 |     PX PARTITION ITERATOR   |        |400000    |256226    |
|6 |      TABLE SCAN             |T       |400000    |256226    |
|7 |   EXCHANGE IN DISTR         |        |400000    |436080    |
|8 |    EXCHANGE OUT DISTR (HASH)|:EX10001|400000    |256226    |
|9 |     PX PARTITION ITERATOR   |        |400000    |256226    |
|10|      TABLE SCAN             |S       |400000    |256226    |
=================================================================

Outputs & filters:
-------------------------------------
  0 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil)
  1 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil), dop=2
  2 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil),
      equal_conds([T.C2 = S.C2]), other_conds(nil)
  3 - output([T.C1], [T.C2]), filter(nil)
  4 - (#keys=1, [T.C2]), output([T.C1], [T.C2]), filter(nil), dop=2
  5 - output([T.C1], [T.C2]), filter(nil)
  6 - output([T.C1], [T.C2]), filter(nil),
      access([T.C1], [T.C2]), partitions(p[0-3])
  7 - output([S.C1], [S.C2]), filter(nil)
  8 - (#keys=1, [S.C2]), output([S.C1], [S.C2]), filter(nil), dop=2
  9 - output([S.C1], [S.C2]), filter(nil)
  10 - output([S.C1], [S.C2]), filter(nil),
      access([S.C1], [S.C2]), partitions(p[0-3])

其中,PX PARTITION ITERATO 算子用于按照分區(qū)粒度迭代數(shù)據(jù),詳細信息請參見 GI。

上述示例的執(zhí)行計劃展示中的 outputs & filters 詳細列出了 EXCH-IN/OUT (HASH) 算子的輸出信息如下:

信息名稱

含義

output

該算子輸出的表達式。

filter

該算子上的過濾條件。

由于示例中 EXCH-IN/OUT (HASH) 算子沒有設(shè)置 filter,所以為 nil。

pkey

按照哪一列進行 HASH 重分區(qū)。

例如,#keys=1, [s.c2] 表示按照 c2 這一列進行 HASH 重分區(qū)。

EXCH-IN/OUT(BROADCAST)

EXCH-IN/OUT(BROADCAST) 算子用于對輸入數(shù)據(jù)使用 BROADCAST 的方法進行重分區(qū),它會將數(shù)據(jù)廣播到其他線程上。

如下示例的執(zhí)行計劃中,3-4 號是一組使用 BROADCAST 重分區(qū)方式的 EXCHANGE 算子。它會將 t 表的數(shù)據(jù)廣播到每個線程上,s 表每個分區(qū)的數(shù)據(jù)都會嘗試和被廣播的 t 表數(shù)據(jù)進行聯(lián)接。

obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>CREATE TABLE s (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>INSERT INTO s VALUES (1, 1), (2, 2), (3, 3), (4, 4);
Query OK, 1 rows affected (0.12 sec)

obclient>EXPALIN SELECT  /*+PARALLEL(2) */ * FROM t, s WHERE t.c2 = s.c2\G;
*************************** 1. row ***************************
Query Plan:
======================================================================
|ID|OPERATOR                          |NAME    |EST. ROWS |COST      |
----------------------------------------------------------------------
|0 |PX COORDINATOR                    |        |1568160000|2473449646|
|1 | EXCHANGE OUT DISTR               |:EX10001|1568160000|1063249409|
|2 |  HASH JOIN                       |        |1568160000|1063249409|
|3 |   EXCHANGE IN DISTR              |        |400000    |436080    |
|4 |    EXCHANGE OUT DISTR (BROADCAST)|:EX10000|400000    |256226    |
|5 |     PX PARTITION ITERATOR        |        |400000    |256226    |
|6 |      TABLE SCAN                  |T       |400000    |256226    |
|7 |   PX PARTITION ITERATOR          |        |400000    |256226    |
|8 |    TABLE SCAN                    |S       |400000    |256226    |
======================================================================

Outputs & filters:
-------------------------------------
  0 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil)
  1 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil), dop=2
  2 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil),
      equal_conds([T.C2 = S.C2]), other_conds(nil)
  3 - output([T.C1], [T.C2]), filter(nil)
  4 - output([T.C1], [T.C2]), filter(nil), dop=2
  5 - output([T.C1], [T.C2]), filter(nil)
  6 - output([T.C1], [T.C2]), filter(nil),
      access([T.C1], [T.C2]), partitions(p[0-3])
  7 - output([S.C1], [S.C2]), filter(nil)
  8 - output([S.C1], [S.C2]), filter(nil),
      access([S.C1], [S.C2]), partitions(p[0-3])

上述示例的執(zhí)行計劃展示中的 outputs & filters 詳細列出了 EXCH-IN/OUT (BROADCAST) 算子的信息,字段的含義與 EXCH-IN/OUT 算子相同。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號