create table t1(
    c1 int primary key,
    c2 int
);
create table t2(
    d1 int primary key, 
    d2 int
);
insert into t1 values(1,1);
insert into t1 values(2,2);
insert into t1 values(3,3);
insert into t1 values(4,4);
insert into t1 values(5,5);
insert into t1 values(6,6);
insert into t1 values(7,7);
insert into t2 values(1,1);
insert into t2 values(2,2);
insert into t2 values(3,3);
insert into t2 values(4,4);
insert into t2 values(5,5);
insert into t2 values(6,6);
insert into t2 values(7,7);
在Mysql8.0.20之后(包含),Mysql在实际执行过程中大量的使用了HashJoin,只要是以前能够用到BNL(Blocked Nested Loop Join)的地方,都会选择HashJoin的方式来代替,众所周知,HashJoin只能适用于等值查询,譬如下面的语句:
explain format=tree SELECT *  FROM t1 JOIN t2 ON t1.c2=t2.d2;
/*explain的结果如下*/
| EXPLAIN                                                                                                                                                         |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| -> Inner hash join (t2.d2 = t1.c2)  (cost=6.10 rows=7)
    -> Table scan on t2  (cost=0.05 rows=7)
    -> Hash
        -> Table scan on t1  (cost=0.95 rows=7)
 |
因为上面join列都没有使用到索引,所以对于这个语句,Mysql会优先使用HashJoin,但是如果使用到了索引,那么就依旧会采用Nested Loop Join的方式,比如下面的语句:
explain format=tree SELECT * FROM t1 JOIN t2 ON t1.c1=t2.d2;
| EXPLAIN                                                                                                                                                                                                                                 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| -> Nested loop inner join  (cost=3.40 rows=7)
    -> Filter: (t2.d2 is not null)  (cost=0.95 rows=7)
        -> Table scan on t2  (cost=0.95 rows=7)
    -> Single-row index lookup on t1 using PRIMARY (c1=t2.d2)  (cost=0.26 rows=1)
 |
重点来了,在Mysql中,即使join条件没有采用等值的方式,那么也会采用HashJoin的方式,比如下面的语句:
 explain format=tree SELECT * FROM t1 JOIN t2 ON t1.c2 > t2.d2;
----------------------------------------------+
| EXPLAIN                                                                                                                                                                                                                         |
+-------------------------------------------------------------------------------------------------------------+
/*可以看到,join条件变成了filter的方式,但是join方式还是采用了HashJoin的方式*/
| -> Filter: (t1.c2 > t2.d2)  (cost=6.10 rows=16)
    -> Inner hash join (no condition)  (cost=6.10 rows=16)
        -> Table scan on t2  (cost=0.07 rows=7)
        -> Hash
            -> Table scan on t1  (cost=0.95 rows=7)
 |
可以看到,join条件变成了filter的方式,但是join方式还是采用了HashJoin的方式,那么Mysql是怎么实现非等值情况下的HashJoin的呢?这里我们分析下代码。
本示例代码采用如下的sql语句来进行示例:
SELECT * FROM t1 JOIN t2 ON t1.c1=t2.d2
上述语句在Mysql里面使用到的Iterator如下:
HashJoinIterator
 TableScanIterator
 TableScanIterator
在HashJoin中,主要是流程如下:
左表扫描数据  ->  构建Hash表 -> 右表扫描数据 -> 在Hash进行probe操作 -> 将结果返回给用户
重要函数调用的流程图大体如下:
左表扫描数据并构建Hash表的流程
sql_union.cc里面的ExecuteIteratorQuery函数,会在执行前对iterator进行如下操作:  if (m_root_iterator->Init()) {
    return true;
  }
HashJoinIterator的Init函数介绍如下:  // Prepare to read the build input into the hash map.
  // m_build_input:为做表的对象,这里也就是t1表的TableScanIterator,先进行init操作,里面其实就是准备扫描表相关的准备工作
  if (m_build_input->Init()) {
    DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
    return true;
  }
  // We always start out by doing everything in memory.
  m_hash_join_type = HashJoinType::IN_MEMORY;
  m_write_to_probe_row_saving = false;
//省略部分代码
  // Build the hash table
  // 扫描左表,第一次进行构建Hash表的操作,这是一个比较重要的函数
  if (BuildHashTable()) {
    DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
    return true;
  }
  if (m_state == State::END_OF_ROWS) {
    // BuildHashTable() decided that the join is done (the build input is
    // empty, and we are in an inner-/semijoin. Anti-/outer join must output
    // NULL-complemented rows from the probe input).
    return false;
  }
  // 进行右表的准备工作。
  return InitProbeIterator();
BuildHashTable()函数如下:  if (!m_build_iterator_has_more_rows) {
    m_state = State::END_OF_ROWS;
    return false;
  }
//省略部分代码
// 初始化m_row_buffer成员变量,该变量本质上存储了一个std::unordered_multimap对象
  if (InitRowBuffer()) {
    return true;
  }
  const bool reject_duplicate_keys = RejectDuplicateKeys();
  const bool store_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;
  // If Init() is called multiple times (e.g., if hash join is inside an
  // dependent subquery), we must clear the NULL row flag, as it may have been
  // set by the previous executing of this hash join.
  m_build_input->SetNullRowFlag(/*is_null_row=*/false);
  PFSBatchMode batch_mode(m_build_input.get());
//在Init阶段,Mysql会尽可能的多从底层读取数据构建Hash表。
  for (;;) {  // Termination condition within loop.
    int res = m_build_input->Read();
    if (res == 1) {
      DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
      return true;
    }
//省略部分代码
//将底层扫描的数据放在m_row_buffer map里面,该函数下面会详细介绍。
    const hash_join_buffer::StoreRowResult store_row_result =
        m_row_buffer.StoreRow(thd(), reject_duplicate_keys,
                              store_rows_with_null_in_join_key);
//校验返回的结果,分别为:存储成功,buffer满了需要下盘和致命错误三种情况。
    switch (store_row_result) {
      case hash_join_buffer::StoreRowResult::ROW_STORED:
        break;
      case hash_join_buffer::StoreRowResult::BUFFER_FULL: {
        // The row buffer is full, so start spilling to disk (if allowed). Note
        // that the row buffer checks for OOM _after_ the row was inserted, so
        // we should always manage to insert at least one row.
        DBUG_ASSERT(!m_row_buffer.empty());
        // If we are not allowed to spill to disk, just go on to reading from
        // the probe iterator.
        if (!m_allow_spill_to_disk) {
          if (m_join_type != JoinType::INNER) {
            // Enable probe row saving, so that unmatched probe rows are written
            // to the probe row saving file. After the next refill of the hash
            // table, we will read rows from the probe row saving file, ensuring
            // that we only read unmatched probe rows.
            InitWritingToProbeRowSavingFile();
          }
          SetReadingProbeRowState();
          return false;
        }
        // Ideally, we would use the estimated row count from the iterator. But
        // not all iterators has the row count available (i.e.
        // RemoveDuplicatesIterator), so get the row count directly from the
        // QEP_TAB.
        const QEP_TAB *last_table_in_join =
            m_build_input_tables.tables().back().qep_tab;
        if (InitializeChunkFiles(
                last_table_in_join->position()->prefix_rowcount,
                m_row_buffer.size(), kMaxChunks, m_probe_input_tables,
                m_build_input_tables,
                /*include_match_flag_for_probe=*/m_join_type == JoinType::OUTER,
                &m_chunk_files_on_disk)) {
          DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
          return true;
        }
        // Write out the remaining rows from the build input out to chunk files.
        // The probe input will be written out to chunk files later; we will do
        // it _after_ we have checked the probe input for matches against the
        // rows that are already written to the hash table. An alternative
        // approach would be to write out the remaining rows from the build
        // _and_ the rows that already are in the hash table. In that case, we
        // could also write out the entire probe input to disk here as well. But
        // we don not want to waste the rows that we already have stored in
        // memory.
        //
        // We never write out rows with NULL in condition for the build/right
        // input, as these rows will never match in a join condition.
        if (WriteRowsToChunks(thd(), m_build_input.get(), m_build_input_tables,
                              m_join_conditions, kChunkPartitioningHashSeed,
                              &m_chunk_files_on_disk,
                              true /* write_to_build_chunks */,
                              false /* write_rows_with_null_in_join_key */,
                              &m_temporary_row_and_join_key_buffer)) {
          DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
          return true;
        }
        // Flush and position all chunk files from the build input at the
        // beginning.
        for (ChunkPair &chunk_pair : m_chunk_files_on_disk) {
          if (chunk_pair.build_chunk.Rewind()) {
            DBUG_ASSERT(
                thd()->is_error());  // my_error should have been called.
            return true;
          }
        }
        SetReadingProbeRowState();
        return false;
      }
      case hash_join_buffer::StoreRowResult::FATAL_ERROR:
        // An unrecoverable error. Most likely, malloc failed, so report OOM.
        // Note that we cannot say for sure how much memory we tried to allocate
        // when failing, so just report ‘join_buffer_size‘ as the amount of
        // memory we tried to allocate.
        my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),
                 thd()->variables.join_buff_size);
        return true;
    }
  }
HashJoinRowBuffer::StoreRow函数
StoreRowResult HashJoinRowBuffer::StoreRow(
    THD *thd, bool reject_duplicate_keys,
    bool store_rows_with_null_in_condition) {
  // Make the key from the join conditions.
  m_buffer.length(0);
    //所有的condition都是equal的形式,append_join_key_for_hash_join函数里面会根据不同的数据类型,将数据拷贝到m_buffer里面。
  for (const HashJoinCondition &hash_join_condition : m_join_conditions) {
    bool null_in_join_condition =
        hash_join_condition.join_condition()->append_join_key_for_hash_join(
            thd, m_tables.tables_bitmap(), hash_join_condition, &m_buffer);
    if (null_in_join_condition && !store_rows_with_null_in_condition) {
      // SQL NULL values will never match in an inner join or semijoin, so skip
      // the row.
      return StoreRowResult::ROW_STORED;
    }
  }
  // TODO(efroseth): We should probably use an unordered_map instead of multimap
  // for these cases so we do not have to hash and lookup twice.
    //有的语句是会reject 重复key的,所以这里专门做一个检查。
  if (reject_duplicate_keys &&
      contains(Key(pointer_cast<const uchar *>(m_buffer.ptr()),
                   m_buffer.length()))) {
    return StoreRowResult::ROW_STORED;
  }
  // Allocate the join key on the same MEM_ROOT that the hash table is
  // allocated on, so it has the same lifetime as the rest of the contents in
  // the hash map (until Clear() is called on the HashJoinBuffer).
    //从m_buffer中拷贝join key到join_key_data中,方便后续放入map中的操作
  const size_t join_key_size = m_buffer.length();
  uchar *join_key_data = nullptr;
  if (join_key_size > 0) {
    join_key_data = m_mem_root.ArrayAlloc<uchar>(join_key_size);
    if (join_key_data == nullptr) {
      return StoreRowResult::FATAL_ERROR;
    }
    memcpy(join_key_data, m_buffer.ptr(), join_key_size);
  }
    //将行中的具体内容拷贝m_buffer中
  // Save the contents of all columns marked for reading.
  if (StoreFromTableBuffers(m_tables, &m_buffer)) {
    return StoreRowResult::FATAL_ERROR;
  }
  // Give the row the same lifetime as the hash map, by allocating it on the
  // same MEM_ROOT as the hash map is allocated on.
    //从m_buffer中取出具体的数据内容,然后放在row对象中,方便后面插入到map中,目前的版本中,BufferRow对象只是Key的一个重命名,本质上两者是同一个类对象。
  const size_t row_size = m_buffer.length();
  uchar *row = nullptr;
  if (row_size > 0) {
    row = m_mem_root.ArrayAlloc<uchar>(row_size);
    if (row == nullptr) {
      return StoreRowResult::FATAL_ERROR;
    }
    memcpy(row, m_buffer.ptr(), row_size);
  }
    //将查询到的数据和join key封装为Key和BufferRow对象,放入到map中
  m_last_row_stored = m_hash_map->emplace(Key(join_key_data, join_key_size),
                                          BufferRow(row, row_size));
  if (m_mem_root.allocated_size() > m_max_mem_available) {
    return StoreRowResult::BUFFER_FULL;
  }
  return StoreRowResult::ROW_STORED;
}
InitProbeIterator函数
bool HashJoinIterator::InitProbeIterator() {
  DBUG_ASSERT(m_state == State::READING_ROW_FROM_PROBE_ITERATOR);
    //本质上是一个TableScanIterator,因此这里也只是简单的Init操作,并没有进行真正的probe操作
  if (m_probe_input->Init()) {
    return true;
  }
  if (m_probe_input_batch_mode) {
    m_probe_input->StartPSIBatchMode();
  }
  return false;
}
右表扫描数据并进行probe操作。
首先绝大多数情况下,probe操作是在Read阶段执行的,如下ExecuteIteratorQuery函数里面的Read循环。
    for (;;) {
        //在这个例子里面,就是HashJoinIterator进行Read操作。
      int error = m_root_iterator->Read();
      DBUG_EXECUTE_IF("bug13822652_1", thd->killed = THD::KILL_QUERY;);
      if (error > 0 || thd->is_error())  // Fatal error
        return true;
      else if (error < 0)
        break;
      else if (thd->killed)  // Aborted by user
      {
        thd->send_kill_message();
        return true;
      }
      ++*send_records_ptr;
      if (query_result->send_data(thd, *fields)) {
        return true;
      }
      thd->get_stmt_da()->inc_current_row_for_condition();
    }
HashJoinIterator::Read函数如下:
int HashJoinIterator::Read() {
  for (;;) {
    if (thd()->killed) {  // Aborted by user.
      thd()->send_kill_message();
      return 1;
    }
    switch (m_state) {
      case State::LOADING_NEXT_CHUNK_PAIR:
        if (ReadNextHashJoinChunk()) {
          return 1;
        }
        break;
            //大多数情况下,会进入到该分支。
      case State::READING_ROW_FROM_PROBE_ITERATOR:
        if (ReadRowFromProbeIterator()) {
          return 1;
        }
        break;
		//省略部分代码,省略的case大部分都是内存无法存下,需要下盘的情况。
      case State::READING_FIRST_ROW_FROM_HASH_TABLE:
       //开始从hashtable中获取数据,准备返回给用户
      case State::READING_FROM_HASH_TABLE: {
          //这里的操作是将HashTable里面的数据重新复制给qep_tab->table()里面的相关的成员变量中,具体的行数据存储在qep_tab->table()->record数组中。该函数下面的会介绍
        const int res = ReadNextJoinedRowFromHashTable();
        if (res == 0) {
          // A joined row is ready, so send it to the client.
          return 0;
        }
        if (res == -1) {
          // No more matching rows in the hash table, or antijoin found a
          // matching row. Read a new row from the probe input.
          continue;
        }
        // An error occured, so abort the join.
        DBUG_ASSERT(res == 1);
        return res;
      }
      case State::END_OF_ROWS:
        return -1;
    }
  }
  // Unreachable.
  DBUG_ASSERT(false);
  return 1;
}
HashJoinIterator::ReadRowFromProbeIterator函数如下:
bool HashJoinIterator::ReadRowFromProbeIterator() {
  DBUG_ASSERT(m_current_chunk == -1);
    //从右表中读取具体的数据
  int result = m_probe_input->Read();
  if (result == 1) {
    DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
    return true;
  }
  if (result == 0) {
    RequestRowId(m_probe_input_tables.tables());
    // A row from the probe iterator is ready.
      //开始从hash表中进行probe操作
    LookupProbeRowInHashTable();
    return false;
  }
    //下面的代码省略掉,这里只介绍最简单的场景
}
void HashJoinIterator::LookupProbeRowInHashTable()函数如下:
void HashJoinIterator::LookupProbeRowInHashTable() {
  if (m_join_conditions.empty()) {
    // Skip the call to equal_range in case we don‘t have any join conditions.
    // This can save up to 20% in case of multi-table joins.
    m_hash_map_iterator = m_row_buffer.begin();
    m_hash_map_end = m_row_buffer.end();
      //因为没有join condition(本质上是一个笛卡尔积),所以这里不进行probe操作,直接更新state,下一次循环就可以直接从hashJoin中读取到join后的行
    m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
    return;
  }
  // Extract the join key from the probe input, and use that key as the lookup
  // key in the hash table.
    //类似于HashJoinRowBuffer::StoreRow里面的操作,将probe里面的行构建成hashMap可以用的Key
  bool null_in_join_key = ConstructJoinKey(
      thd(), m_join_conditions, m_probe_input_tables.tables_bitmap(),
      &m_temporary_row_and_join_key_buffer);
  if (null_in_join_key) {
    if (m_join_type == JoinType::ANTI || m_join_type == JoinType::OUTER) {
      // SQL NULL was found, and we will never find a matching row in the hash
      // table. Let us indicate that, so that a null-complemented row is
      // returned.
      m_hash_map_iterator = m_row_buffer.end();
      m_hash_map_end = m_row_buffer.end();
      m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
    } else {
      SetReadingProbeRowState();
    }
    return;
  }
  hash_join_buffer::Key key(
      pointer_cast<const uchar *>(m_temporary_row_and_join_key_buffer.ptr()),
      m_temporary_row_and_join_key_buffer.length());
    //这里是一个简单的trick,如果只关心第一行的数据,那么直接使用unordered_multimap的find函数可以更低的时间复杂度,否则使用equal_range函数
  if ((m_join_type == JoinType::SEMI || m_join_type == JoinType::ANTI) &&
      m_extra_condition == nullptr) {
    // find() has a better average complexity than equal_range() (constant vs.
    // linear in the number of matching elements). And for semijoins, we are
    // only interested in the first match anyways, so this may give a nice
    // speedup. An exception to this is if we have any "extra" conditions that
    // needs to be evaluated after the hash table lookup, but before the row is
    // returned; we may need to read through the entire hash table to find a row
    // that satisfies the extra condition(s).
    m_hash_map_iterator = m_row_buffer.find(key);
    m_hash_map_end = m_row_buffer.end();
  } else {
    auto range = m_row_buffer.equal_range(key);
    m_hash_map_iterator = range.first;
    m_hash_map_end = range.second;
  }
    //ok,join后的数据已经准备好了,设置状态为,下一次循环直接将结果返回给用户
  m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
}
关于HashJoinIterator::ReadNextJoinedRowFromHashTable()函数
int HashJoinIterator::ReadNextJoinedRowFromHashTable() {
  int res;
  bool passes_extra_conditions = false;
  do {
      //将jointable里面的行读取出来,拷贝qep_tab()->table()->records数组里。注意的是,之前存储的时候,采用的是pack的方式存储的,现在还原出来需要unpack。至于probe表的数据,因为已经在record数组里面的,所以这里也就不需要任何额外的操作了。
    res = ReadJoinedRow();
    // ReadJoinedRow() can only return 0 (row is ready) or -1 (EOF).
    DBUG_ASSERT(res == 0 || res == -1);
    // Evaluate any extra conditions that are attached to this iterator before
    // we return a row.
    if (res == 0) {
        //一些额外的condition evaluate,不是filter,filter会专门创建iterator来执行。
      passes_extra_conditions = JoinedRowPassesExtraConditions();
      if (thd()->is_error()) {
        // Evaluation of extra conditions raised an error, so abort the join.
        return 1;
      }
      if (!passes_extra_conditions) {
        // Advance to the next matching row in the hash table. Note that the
        // iterator stays in the state READING_FIRST_ROW_FROM_HASH_TABLE even
        // though we are not actually reading the first row anymore. This is
        // because WriteProbeRowToDiskIfApplicable() needs to know if this is
        // the first row that matches both the join condition and any extra
        // conditions; only unmatched rows will be written to disk.
        ++m_hash_map_iterator;
      }
    }
  } while (res == 0 && !passes_extra_conditions);
  // The row passed all extra conditions (or we are out of rows in the hash
  // table), so we can now write the row to disk.
  // Inner and outer joins: Write out all rows from the probe input (given that
  //   we have degraded into on-disk hash join).
  // Semijoin and antijoin: Write out rows that do not have any matching row in
  //   the hash table.
    //检查是否需要下盘
  if (WriteProbeRowToDiskIfApplicable()) {
    return 1;
  }
    //外查询的null处理
  if (res == -1) {
    // If we did not find a matching row in the hash table, antijoin and outer
    // join should ouput the last row read from the probe input together with a
    // NULL-complemented row from the build input. However, in case of on-disk
    // antijoin, a row from the probe input can match a row from the build input
    // that has already been written out to disk. So for on-disk antijoin, we
    // cannot output any rows until we have started reading from chunk files.
    //
    // On-disk outer join is a bit more tricky; we can only output a
    // NULL-complemented row if the probe row did not match anything from the
    // build input while doing any of the probe phases. We can have multiple
    // probe phases if e.g. a build chunk file is too big to fit in memory; we
    // would have to read the build chunk in multiple smaller chunks while doing
    // a probe phase for each of these smaller chunks. To keep track of this,
    // each probe row is prefixed with a match flag in the chunk files.
    bool return_null_complemented_row = false;
    if ((on_disk_hash_join() && m_current_chunk == -1) ||
        m_write_to_probe_row_saving) {
      return_null_complemented_row = false;
    } else if (m_join_type == JoinType::ANTI) {
      return_null_complemented_row = true;
    } else if (m_join_type == JoinType::OUTER &&
               m_state == State::READING_FIRST_ROW_FROM_HASH_TABLE &&
               !m_probe_row_match_flag) {
      return_null_complemented_row = true;
    }
    SetReadingProbeRowState();
    if (return_null_complemented_row) {
      m_build_input->SetNullRowFlag(true);
      return 0;
    }
    return -1;
  }
// 省略部分代码
  ++m_hash_map_iterator;
  return 0;
}
可以看到,在Mysql的处理中,如果join条件里面没有等值查询的话,直接的表现就是HashJoinIterator里面的m_join_conditions值为空,那么在往HashTable里面emplace的时候,创建的Key对象,代码如下:
    //将查询到的数据和join key封装为Key和BufferRow对象,放入到map中
  m_last_row_stored = m_hash_map->emplace(Key(join_key_data, join_key_size),
                                          BufferRow(row, row_size));
此时的join_key_data固定为NULL,而join_key_size固定为0,也就是说,这个map里面的key只有一个,与之对应的value则有多个。实际上表现出来的结果就是做了一个简单的笛卡尔积,然后在进行条件过滤,本质上并没有进行传统HashJoin的probe操作。
原文:https://www.cnblogs.com/seancheer/p/14127503.html