深入了解跳表(Skip List):Redis Sorted Set 的高效數(shù)據(jù)結(jié)構(gòu)

2024-12-17 12:00 更新

跳表(Skip List)是一種隨機化的數(shù)據(jù)結(jié)構(gòu),基于有序鏈表,通過在鏈表上增加多級索引來提高數(shù)據(jù)的查找效率。它是由 William Pugh 在 1990 年提出的。

為什么 Redis 中的 Sorted Set 使用跳躍表

Redis 的有序集合(Sorted Set)使用跳躍表(Skip List)作為底層實現(xiàn),主要是因為跳躍表在性能、實現(xiàn)復(fù)雜度和靈活性等方面具有顯著優(yōu)勢,可以替代平衡樹的數(shù)據(jù)結(jié)構(gòu)。

跳躍表的原理

跳躍表是一種擴展的有序鏈表,通過維護(hù)多個層級的索引來提高查找效率。每個節(jié)點包含一個數(shù)據(jù)元素和一組指向其他節(jié)點的指針,這些指針分布在不同的層級。最底層的鏈表包含所有元素,而每一層的節(jié)點數(shù)量逐漸減少。這樣,查找操作可以從高層開始,快速跳過不需要的元素,減少遍歷的節(jié)點數(shù),從而提高查找效率。

  • 查找過程:從最高層的頭節(jié)點開始,沿著索引節(jié)點向右查找,如果當(dāng)前節(jié)點的下一個節(jié)點的值大于或等于查找的目標(biāo)值,則向下移動到下一層繼續(xù)查找;否則,向右移動到下一個索引節(jié)點。這個過程一直持續(xù)到找到目標(biāo)節(jié)點或到達(dá)最底層鏈表。

  • 插入和刪除操作:跳躍表支持高效的動態(tài)插入和刪除,時間復(fù)雜度均為 O(log n)。插入時,首先找到插入位置,然后根據(jù)隨機函數(shù)決定新節(jié)點的層數(shù),最后在相應(yīng)的層中插入節(jié)點。

跳躍表的優(yōu)勢

  1. 簡單性:跳躍表的實現(xiàn)相對簡單,易于理解和維護(hù),而平衡樹(如紅黑樹)的實現(xiàn)較為復(fù)雜,容易出錯。

  1. 高效的范圍查詢:跳躍表在進(jìn)行范圍查詢時效率更高,因為它是有序的鏈表,可以直接遍歷后繼節(jié)點,而平衡樹需要中序遍歷,復(fù)雜度更高。

  1. 靈活性:跳躍表的層數(shù)可以根據(jù)需要動態(tài)調(diào)整,適應(yīng)不同的查詢需求。

  1. 高并發(fā)性能:跳躍表的節(jié)點可以支持多個線程同時插入或刪除節(jié)點,而平衡樹和哈希表通常需要復(fù)雜的并發(fā)控制。

  1. 空間效率:跳躍表的空間復(fù)雜度為 O(n),并且通過調(diào)整節(jié)點的抽取間隔,可以靈活平衡空間和時間復(fù)雜度。

正是因為有這些優(yōu)勢,Redis 選擇使用跳躍表來實現(xiàn)有序集合,而不是紅黑樹或其他數(shù)據(jù)結(jié)構(gòu)。這使得 Redis 在處理有序集合時能夠高效地支持插入、刪除和查找操作,同時保持元素的有序性,非常適合實現(xiàn)如排行榜、范圍查詢等功能。

為了講明白跳表的原理,現(xiàn)在我們來模擬一個簡單的跳表實現(xiàn)。

在 Java 中模擬實現(xiàn) Redis 的 Sorted Set 跳表,我們需要定義跳表的數(shù)據(jù)結(jié)構(gòu),包括節(jié)點和跳表本身。以下是一個簡單的實現(xiàn):

  1. import java.util.Random;
  2. public class SkipList {
  3. private static final double P = 0.5; // 節(jié)點晉升的概率
  4. private static final int MAX_LEVEL = 16; // 最大層數(shù)
  5. private int levelCount; // 當(dāng)前跳表的層數(shù)
  6. private Node head; // 頭節(jié)點
  7. private int size; // 跳表中元素的數(shù)量
  8. private Random random; // 隨機數(shù)生成器
  9. public SkipList() {
  10. this.levelCount = 0;
  11. this.size = 0;
  12. this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
  13. this.random = new Random();
  14. }
  15. // 節(jié)點定義
  16. private class Node {
  17. int value;
  18. int key;
  19. Node[] forward; // 向前指針數(shù)組
  20. Node(int value, int key, int level) {
  21. this.value = value;
  22. this.key = key;
  23. this.forward = new Node[level + 1];
  24. }
  25. }
  26. // 隨機生成節(jié)點的層數(shù)
  27. private int randomLevel() {
  28. int level = 0;
  29. while (random.nextFloat() < P && level < MAX_LEVEL) {
  30. level++;
  31. }
  32. return level;
  33. }
  34. // 搜索指定值的節(jié)點
  35. public Node search(int key) {
  36. Node current = head;
  37. for (int i = levelCount - 1; i >= 0; i--) {
  38. while (current.forward[i] != null && current.forward[i].key < key) {
  39. current = current.forward[i]; // 沿著當(dāng)前層的指針移動
  40. }
  41. }
  42. current = current.forward[0]; // 移動到第0層
  43. return current;
  44. }
  45. // 插入節(jié)點
  46. public void insert(int key, int value) {
  47. Node[] update = new Node[MAX_LEVEL + 1];
  48. Node current = head;
  49. // 查找插入位置
  50. for (int i = levelCount - 1; i >= 0; i--) {
  51. while (current.forward[i] != null && current.forward[i].key < key) {
  52. current = current.forward[i];
  53. }
  54. update[i] = current;
  55. }
  56. int level = randomLevel(); // 隨機生成層數(shù)
  57. if (level > levelCount) {
  58. levelCount = level;
  59. for (int i = levelCount - 1; i >= 0; i--) {
  60. update[i] = head;
  61. }
  62. }
  63. current = new Node(value, key, level);
  64. for (int i = 0; i < level; i++) {
  65. current.forward[i] = update[i].forward[i];
  66. update[i].forward[i] = current;
  67. }
  68. size++;
  69. }
  70. // 刪除節(jié)點
  71. public void delete(int key) {
  72. Node[] update = new Node[MAX_LEVEL + 1];
  73. Node current = head;
  74. // 查找要刪除的節(jié)點
  75. for (int i = levelCount - 1; i >= 0; i--) {
  76. while (current.forward[i] != null && current.forward[i].key < key) {
  77. current = current.forward[i];
  78. }
  79. update[i] = current;
  80. }
  81. current = current.forward[0];
  82. if (current != null && current.key == key) {
  83. for (int i = 0; i < levelCount; i++) {
  84. if (update[i].forward[i] != current) {
  85. break;
  86. }
  87. update[i].forward[i] = current.forward[i];
  88. }
  89. size--;
  90. while (levelCount > 0 && head.forward[levelCount - 1] == null) {
  91. levelCount--;
  92. }
  93. }
  94. }
  95. // 打印跳表
  96. public void printList() {
  97. Node current = head.forward[0];
  98. while (current != null) {
  99. System.out.println(current.key + ":" + current.value);
  100. current = current.forward[0];
  101. }
  102. }
  103. // 主函數(shù)測試跳表
  104. public static void main(String[] args) {
  105. SkipList list = new SkipList();
  106. list.insert(3, 100);
  107. list.insert(6, 300);
  108. list.insert(4, 400);
  109. list.insert(7, 700);
  110. list.insert(5, 500);
  111. list.printList();
  112. list.delete(3);
  113. list.printList();
  114. }
  115. }

下面,V 哥來解釋一下:

  1. 跳表節(jié)點:每個節(jié)點包含一個值、一個鍵和一個向前指針數(shù)組。向前指針數(shù)組存儲了指向同一層下一個節(jié)點的引用。

  1. 隨機層數(shù):每個節(jié)點的層數(shù)是根據(jù)預(yù)設(shè)的概率 P 隨機生成的,這樣可以模擬出不同高度的索引層。

  1. 搜索操作:從最高層開始,沿著當(dāng)前層的指針移動,直到找到插入點或到達(dá)底層。

  1. 插入操作:首先找到插入位置,然后根據(jù)隨機生成的層數(shù)創(chuàng)建新節(jié)點,并更新每一層的指針。

  1. 刪除操作:找到要刪除的節(jié)點,然后逐層更新指針,最后減少跳表的層數(shù)。

  1. 打印跳表:從底層的頭節(jié)點開始,遍歷打印每個節(jié)點的鍵和值。

從這個簡化版的跳表實現(xiàn)可以看到跳表的基本操作??梢詭椭覀兝斫馓韺崿F(xiàn)的原理。

如何在 Java 中實現(xiàn)跳表的并發(fā)操作?

在 Java 中實現(xiàn)跳表的并發(fā)操作需要考慮線程安全問題??梢酝ㄟ^以下方法來實現(xiàn):

  1. 使用同步塊:在每個公共方法中使用 synchronized 關(guān)鍵字來確保同一時間只有一個線程可以執(zhí)行方法。

  1. 使用鎖:使用 ReentrantLock 或其他并發(fā)鎖來控制對跳表的并發(fā)訪問。

  1. 使用原子引用:使用 AtomicReferenceAtomicReferenceArray 來確保節(jié)點的原子更新。

  1. 使用并發(fā)集合:使用 ConcurrentHashMap 等并發(fā)集合作為輔助工具來實現(xiàn)線程安全的跳表。

以下是一個使用 synchronized 關(guān)鍵字實現(xiàn)線程安全的跳表的示例:

  1. import java.util.Random;
  2. public class ConcurrentSkipList {
  3. private static final double P = 0.5; // 節(jié)點晉升的概率
  4. private static final int MAX_LEVEL = 16; // 最大層數(shù)
  5. private int levelCount; // 當(dāng)前跳表的層數(shù)
  6. private Node head; // 頭節(jié)點
  7. private int size; // 跳表中元素的數(shù)量
  8. private Random random; // 隨機數(shù)生成器
  9. public ConcurrentSkipList() {
  10. this.levelCount = 0;
  11. this.size = 0;
  12. this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
  13. this.random = new Random();
  14. }
  15. private class Node {
  16. int value;
  17. int key;
  18. Node[] forward; // 向前指針數(shù)組
  19. Node(int value, int key, int level) {
  20. this.value = value;
  21. this.key = key;
  22. this.forward = new Node[level + 1];
  23. }
  24. }
  25. private int randomLevel() {
  26. int level = 0;
  27. while (random.nextFloat() < P && level < MAX_LEVEL) {
  28. level++;
  29. }
  30. return level;
  31. }
  32. public synchronized Node search(int key) {
  33. Node current = head;
  34. for (int i = levelCount - 1; i >= 0; i--) {
  35. while (current.forward[i] != null && current.forward[i].key < key) {
  36. current = current.forward[i];
  37. }
  38. }
  39. current = current.forward[0];
  40. return current;
  41. }
  42. public synchronized void insert(int key, int value) {
  43. Node[] update = new Node[MAX_LEVEL + 1];
  44. Node current = head;
  45. for (int i = levelCount - 1; i >= 0; i--) {
  46. while (current.forward[i] != null && current.forward[i].key < key) {
  47. current = current.forward[i];
  48. }
  49. update[i] = current;
  50. }
  51. int level = randomLevel();
  52. if (level > levelCount) {
  53. for (int i = levelCount; i < level; i++) {
  54. update[i] = head;
  55. }
  56. levelCount = level;
  57. }
  58. current = new Node(value, key, level);
  59. for (int i = 0; i < level; i++) {
  60. current.forward[i] = update[i].forward[i];
  61. update[i].forward[i] = current;
  62. }
  63. size++;
  64. }
  65. public synchronized void delete(int key) {
  66. Node[] update = new Node[MAX_LEVEL + 1];
  67. Node current = head;
  68. for (int i = levelCount - 1; i >= 0; i--) {
  69. while (current.forward[i] != null && current.forward[i].key < key) {
  70. current = current.forward[i];
  71. }
  72. update[i] = current;
  73. }
  74. current = current.forward[0];
  75. if (current != null && current.key == key) {
  76. for (int i = 0; i < levelCount; i++) {
  77. if (update[i].forward[i] != current) {
  78. break;
  79. }
  80. update[i].forward[i] = current.forward[i];
  81. }
  82. size--;
  83. while (levelCount > 0 && head.forward[levelCount - 1] == null) {
  84. levelCount--;
  85. }
  86. }
  87. }
  88. public synchronized void printList() {
  89. Node current = head.forward[0];
  90. while (current != null) {
  91. System.out.println(current.key + ":" + current.value);
  92. current = current.forward[0];
  93. }
  94. }
  95. public static void main(String[] args) {
  96. ConcurrentSkipList list = new ConcurrentSkipList();
  97. list.insert(3, 100);
  98. list.insert(6, 300);
  99. list.insert(4, 400);
  100. list.insert(7, 700);
  101. list.insert(5, 500);
  102. list.printList();
  103. list.delete(3);
  104. list.printList();
  105. }
  106. }

在這個示例中,我們使用了 synchronized 關(guān)鍵字來確保 search、insertdelete 方法是線程安全的。這會鎖定當(dāng)前對象,確保同一時間只有一個線程可以執(zhí)行這些方法。

請注意,雖然 synchronized 可以提供線程安全,但它也可能導(dǎo)致性能瓶頸,特別是在高并發(fā)環(huán)境下。在實際應(yīng)用中,可以考慮使用更細(xì)粒度的鎖,如 ReentrantLock,或者使用原子引用和其他并發(fā)工具來提高性能。

使用 ReentrantLock 的實現(xiàn)方式

使用 ReentrantLock 實現(xiàn)跳表的并發(fā)操作可以提供比 synchronized 更細(xì)粒度的鎖定,從而提高并發(fā)性能。ReentrantLock 允許您在不同的方法中鎖定和解鎖,甚至可以在不同的類中使用同一個鎖對象。

以下是使用 ReentrantLock 實現(xiàn)線程安全的跳表的示例:

  1. import java.util.Random;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class ConcurrentSkipList {
  5. private static final double P = 0.5; // 節(jié)點晉升的概率
  6. private static final int MAX_LEVEL = 16; // 最大層數(shù)
  7. private final Lock lock = new ReentrantLock(); // 創(chuàng)建一個 ReentrantLock 對象
  8. private int levelCount; // 當(dāng)前跳表的層數(shù)
  9. private Node head; // 頭節(jié)點
  10. private int size; // 跳表中元素的數(shù)量
  11. private Random random; // 隨機數(shù)生成器
  12. public ConcurrentSkipList() {
  13. this.levelCount = 0;
  14. this.size = 0;
  15. this.head = new Node(Integer.MIN_VALUE, MAX_LEVEL);
  16. this.random = new Random();
  17. }
  18. private class Node {
  19. int value;
  20. int key;
  21. Node[] forward; // 向前指針數(shù)組
  22. Node(int key, int level) {
  23. this.key = key;
  24. this.forward = new Node[level + 1];
  25. }
  26. }
  27. private int randomLevel() {
  28. int level = 0;
  29. while (random.nextFloat() < P && level < MAX_LEVEL) {
  30. level++;
  31. }
  32. return level;
  33. }
  34. public void search(int key) {
  35. lock.lock(); // 加鎖
  36. try {
  37. Node current = head;
  38. for (int i = levelCount - 1; i >= 0; i--) {
  39. while (current.forward[i] != null && current.forward[i].key < key) {
  40. current = current.forward[i];
  41. }
  42. }
  43. current = current.forward[0];
  44. // ... 處理找到的節(jié)點
  45. } finally {
  46. lock.unlock(); // 釋放鎖
  47. }
  48. }
  49. public void insert(int key, int value) {
  50. lock.lock(); // 加鎖
  51. try {
  52. Node[] update = new Node[MAX_LEVEL + 1];
  53. Node current = head;
  54. for (int i = levelCount - 1; i >= 0; i--) {
  55. while (current.forward[i] != null && current.forward[i].key < key) {
  56. current = current.forward[i];
  57. }
  58. update[i] = current;
  59. }
  60. int level = randomLevel();
  61. if (level > levelCount) {
  62. levelCount = level;
  63. }
  64. current = new Node(value, key, level);
  65. for (int i = 0; i < level; i++) {
  66. current.forward[i] = update[i].forward[i];
  67. update[i].forward[i] = current;
  68. }
  69. size++;
  70. } finally {
  71. lock.unlock(); // 釋放鎖
  72. }
  73. }
  74. public void delete(int key) {
  75. lock.lock(); // 加鎖
  76. try {
  77. Node[] update = new Node[MAX_LEVEL + 1];
  78. Node current = head;
  79. for (int i = levelCount - 1; i >= 1; i--) {
  80. while (current.forward[i] != null && current.forward[i].key < key) {
  81. current = current.forward[i];
  82. }
  83. update[i] = current;
  84. }
  85. current = current.forward[0];
  86. if (current != null && current.key == key) {
  87. for (int i = 0; i < levelCount; i++) {
  88. if (update[i].forward[i] != current) {
  89. break;
  90. }
  91. update[i].forward[i] = current.forward[i];
  92. }
  93. size--;
  94. while (levelCount > 0 && head.forward[levelCount - 1] == null) {
  95. levelCount--;
  96. }
  97. }
  98. } finally {
  99. lock.unlock(); // 釋放鎖
  100. }
  101. }
  102. public void printList() {
  103. lock.lock(); // 加鎖
  104. try {
  105. Node current = head.forward[0];
  106. while (current != null) {
  107. System.out.println(current.key + ":" + current.value);
  108. current = current.forward[0];
  109. }
  110. } finally {
  111. lock.unlock(); // 釋放鎖
  112. }
  113. }
  114. public static void main(String[] args) {
  115. ConcurrentSkipList list = new ConcurrentSkipList();
  116. list.insert(3, 100);
  117. list.insert(6, 300);
  118. list.insert(4, 400);
  119. list.insert(7, 700);
  120. list.insert(5, 500);
  121. list.printList();
  122. list.delete(3);
  123. list.printList();
  124. }
  125. }

在這個示例中,我們使用了 ReentrantLock 對象來控制對跳表的并發(fā)訪問。每個公共方法在執(zhí)行之前都會調(diào)用 lock.lock() 加鎖,并在執(zhí)行完畢后(包括正常執(zhí)行和異常退出)調(diào)用 lock.unlock() 釋放鎖。

使用 ReentrantLock 的好處是它提供了比 synchronized 更靈活的鎖定機制,例如:

  1. 可中斷的鎖定ReentrantLock 允許線程在嘗試獲取鎖時被中斷。

  1. 嘗試非阻塞鎖定ReentrantLock 提供了 tryLock() 方法,允許線程嘗試獲取鎖而不無限等待。

  1. 超時獲取鎖ReentrantLock 還提供了 tryLock(long timeout, TimeUnit unit) 方法,允許線程在指定時間內(nèi)等待鎖。

  1. 公平鎖ReentrantLock 可以選擇是否使用公平鎖,公平鎖會按照線程請求鎖的順序來分配鎖。

  1. 多個條件變量ReentrantLock 可以與多個 Condition 對象配合使用,而 synchronized 只能與一個條件變量配合使用。

理解了以上代碼實現(xiàn)原理后,我們再來理解 Redis Sorted Set 就不難了。

Redis 的 Sorted Set 是一種包含元素和關(guān)聯(lián)分?jǐn)?shù)的數(shù)據(jù)結(jié)構(gòu),它能夠根據(jù)分?jǐn)?shù)對元素進(jìn)行排序。Sorted Set 在 Redis 中的內(nèi)部實現(xiàn)可以是跳躍表(Skip List)和字典(Hash Table)的組合,或者是壓縮列表(Zip List),具體使用哪種實現(xiàn)取決于 Sorted Set 的大小和元素的長度。

跳躍表 + 字典實現(xiàn)

當(dāng) Sorted Set 的元素數(shù)量較多或者元素長度較長時,Redis 使用跳躍表和字典來實現(xiàn) Sorted Set。跳躍表是一種概率平衡的數(shù)據(jù)結(jié)構(gòu),它通過多級索引來提高搜索效率,類似于二分查找。字典則用于快速查找和更新操作。

跳躍表的每個節(jié)點包含以下信息:

  • 元素(member)
  • 分?jǐn)?shù)(score)
  • 后退指針(backward)
  • 多層前進(jìn)指針(forward),每一層都是一個有序鏈表

字典則存儲了元素到分?jǐn)?shù)的映射,以便快速訪問。

壓縮列表實現(xiàn)

當(dāng) Sorted Set 的元素數(shù)量較少(默認(rèn)小于128個),并且所有元素的長度都小于64字節(jié)時,Redis 使用壓縮列表來存儲 Sorted Set。壓縮列表是一種連續(xù)內(nèi)存塊的順序存儲結(jié)構(gòu),它將所有的元素和分?jǐn)?shù)緊湊地存儲在一起,以節(jié)省內(nèi)存空間。

應(yīng)用場景

Sorted Set 常用于以下場景:

  1. 排行榜:例如游戲中的玩家分?jǐn)?shù)排名。
  2. 范圍查詢:例如獲取一定分?jǐn)?shù)范圍內(nèi)的用戶。
  3. 任務(wù)調(diào)度:例如按照任務(wù)的優(yōu)先級執(zhí)行。
  4. 實時排名:例如股票價格的實時排名。

代碼分析

在 Redis 源碼中,Sorted Set 的實現(xiàn)主要在 t_zset.c 文件中。插入操作(zaddCommand)會根據(jù) Sorted Set 的編碼類型(跳躍表或壓縮列表)來執(zhí)行不同的邏輯。如果是跳躍表編碼,那么插入操作會涉及到字典的更新和跳躍表節(jié)點的添加。如果是壓縮列表編碼,則會檢查是否需要轉(zhuǎn)換為跳躍表編碼。

總結(jié)

Sorted Set 是 Redis 提供的一種強大的有序數(shù)據(jù)結(jié)構(gòu),它結(jié)合了跳躍表和字典的優(yōu)點,提供了高效的插入、刪除、更新和范圍查詢操作。通過合理的使用 Sorted Set,可以有效地解決許多實際問題。如何以上內(nèi)容對你有幫助,懇請點贊轉(zhuǎn)發(fā),V 哥在此感謝各位兄弟的支持。88,洗洗睡了。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號