RocksDB寫入資料過程DBImpl::Write()原始碼分析
阿新 • • 發佈:2019-01-06
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } PERF_TIMER_GUARD(write_pre_and_post_process_time); // WriteThread::Writer是一個寫任務的抽象結構,代表了使用者的一次寫操作。其中的batch欄位存有 // 實際需要寫入的資料,sync欄位指明這個寫操作需不需要對事務日誌執行fsync/fdatasync操作,而 // disableWAL指明是否需要寫事務日誌,done欄位在該寫操作完成時設定,timeout_hint_us指明瞭 // 這個寫操作完成時間期限。 // 最後,in_batch_group的比較有意思。在RocksDB內部,對寫入操作做了優化,儘可能地將使用者的寫入 // 批量處理。這其中使用了一個佇列,即write_thread_內部的WriteThread::Writer*佇列。在準備寫佇列頭 // 的任務時,會試著用BuildBatchGroup()構建一個批量任務組,將緊跟著隊頭的其他寫操作任務加入 // 到一個BatchGroup,一次性地寫入資料庫。 WriteThread::Writer w(&mutex_); w.batch = my_batch; w.sync = write_options.sync; w.disableWAL = write_options.disableWAL; w.in_batch_group = false; w.done = false; w.timeout_hint_us = write_options.timeout_hint_us; uint64_t expiration_time = 0; bool has_timeout = false; if (w.timeout_hint_us == 0) { w.timeout_hint_us = WriteThread::kNoTimeOut; } else { expiration_time = env_->NowMicros() + w.timeout_hint_us; has_timeout = true; } if (!write_options.disableWAL) { RecordTick(stats_, WRITE_WITH_WAL); } // ??? WriteContext context; mutex_.Lock(); if (!write_options.disableWAL) { default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1); } // 將當前寫入任務@w掛入寫佇列,並在mutex_上睡眠等待。等待直到: // 1) 寫操作設定了超時時間,等待超時。或, // 2) @w之前的任務都已完成,@w已處於佇列頭部。或, // 3) @w這個寫任務被別的寫執行緒完成了。 // 第3個條件,任務被別的寫執行緒完成,實際上是被之前的寫任務合併進一個 // WriteBatchGroup中去了。此時的@w會被標記成in_batch_group。有意思的是,在EnterWriteThread() // 裡面,如果因為超時喚醒了,發現當前任務in_batch_group為true,則會繼續等待, // 因為它已經被別的執行緒加入BatchGroup準備寫入資料庫了。 Status status = write_thread_.EnterWriteThread(&w, expiration_time); assert(status.ok() || status.IsTimedOut()); if (status.IsTimedOut()) { mutex_.Unlock(); RecordTick(stats_, WRITE_TIMEDOUT); return Status::TimedOut(); } if (w.done) { // write was done by someone else default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, 1); mutex_.Unlock(); RecordTick(stats_, WRITE_DONE_BY_OTHER); return w.status; } RecordTick(stats_, WRITE_DONE_BY_SELF); default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); // Once reaches this point, the current writer "w" will try to do its write // job. It may also pick up some of the remaining writers in the "writers_" // when it finds suitable, and finish them in the same write batch. // This is how a write job could be done by the other writer. assert(!single_column_family_mode_ || versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0) ? 4 * max_total_in_memory_state_ : db_options_.max_total_wal_size; if (UNLIKELY(!single_column_family_mode_) && alive_log_files_.begin()->getting_flushed == false && total_log_size_ > max_total_wal_size) { // 如果column family有多個,最早的活躍的事務日誌對應的memtable還沒有被寫入磁碟, // 而且當前日誌總大小超過了設定的最大值,那麼就需要分配新的memtable,將老的 // immutable memtable內容寫入磁碟。 uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number; alive_log_files_.begin()->getting_flushed = true; Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing all column families with data in WAL number %" PRIu64 ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64, flush_column_family_if_log_file, total_log_size_, max_total_wal_size); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; } if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { status = SetNewMemtableAndNewLogFile(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); context.schedule_bg_work_ = true; } } } else if (UNLIKELY(write_buffer_.ShouldFlush())) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing all column families. Write buffer is using %" PRIu64 " bytes out of a total of %" PRIu64 ".", write_buffer_.memory_usage(), write_buffer_.buffer_size()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; } if (!cfd->mem()->IsEmpty()) { status = SetNewMemtableAndNewLogFile(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); context.schedule_bg_work_ = true; } } MaybeScheduleFlushOrCompaction(); } if (UNLIKELY(status.ok() && !bg_error_.ok())) { status = bg_error_; } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { status = ScheduleFlushes(&context); } if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || write_controller_.GetDelay() > 0))) { // If writer is stopped, we need to get it going, // so schedule flushes/compactions if (context.schedule_bg_work_) { MaybeScheduleFlushOrCompaction(); } status = DelayWrite(expiration_time); } if (UNLIKELY(status.ok() && has_timeout && env_->NowMicros() > expiration_time)) { status = Status::TimedOut(); } uint64_t last_sequence = versions_->LastSequence(); WriteThread::Writer* last_writer = &w; if (status.ok()) { autovector<WriteBatch*> write_batch_group; write_thread_.BuildBatchGroup(&last_writer, &write_batch_group); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into memtables { mutex_.Unlock(); WriteBatch* updates = nullptr; if (write_batch_group.size() == 1) { updates = write_batch_group[0]; } else { updates = &tmp_batch_; for (size_t i = 0; i < write_batch_group.size(); ++i) { WriteBatchInternal::Append(updates, write_batch_group[i]); } } const SequenceNumber current_sequence = last_sequence + 1; WriteBatchInternal::SetSequence(updates, current_sequence); int my_batch_count = WriteBatchInternal::Count(updates); last_sequence += my_batch_count; const uint64_t batch_size = WriteBatchInternal::ByteSize(updates); // Record statistics RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count); RecordTick(stats_, BYTES_WRITTEN, batch_size); if (write_options.disableWAL) { flush_on_destroy_ = true; } PERF_TIMER_STOP(write_pre_and_post_process_time); uint64_t log_size = 0; if (!write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); Slice log_entry = WriteBatchInternal::Contents(updates); status = log_->AddRecord(log_entry); total_log_size_ += log_entry.size(); alive_log_files_.back().AddSize(log_entry.size()); log_empty_ = false; log_size = log_entry.size(); RecordTick(stats_, WAL_FILE_BYTES, log_size); if (status.ok() && write_options.sync) { RecordTick(stats_, WAL_FILE_SYNCED); StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); if (db_options_.use_fsync) { status = log_->file()->Fsync(); } else { status = log_->file()->Sync(); } if (status.ok() && !log_dir_synced_) { // We only sync WAL directory the first time WAL syncing is // requested, so that in case users never turn on WAL sync, // we can avoid the disk I/O in the write code path. status = directories_.GetWalDir()->Fsync(); } log_dir_synced_ = true; } } if (status.ok()) { PERF_TIMER_GUARD(write_memtable_time); status = WriteBatchInternal::InsertInto( updates, column_family_memtables_.get(), write_options.ignore_missing_column_families, 0, this, false); // A non-OK status here indicates iteration failure (either in-memory // writebatch corruption (very bad), or the client specified invalid // column family). This will later on trigger bg_error_. // // Note that existing logic was not sound. Any partial failure writing // into the memtable would result in a state that some write ops might // have succeeded in memtable but Status reports error for all writes. SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); } PERF_TIMER_START(write_pre_and_post_process_time); if (updates == &tmp_batch_) { tmp_batch_.Clear(); } mutex_.Lock(); // internal stats default_cf_internal_stats_->AddDBStats( InternalStats::BYTES_WRITTEN, batch_size); default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, my_batch_count); if (!write_options.disableWAL) { default_cf_internal_stats_->AddDBStats( InternalStats::WAL_FILE_SYNCED, 1); default_cf_internal_stats_->AddDBStats( InternalStats::WAL_FILE_BYTES, log_size); } if (status.ok()) { versions_->SetLastSequence(last_sequence); } } } if (db_options_.paranoid_checks && !status.ok() && !status.IsTimedOut() && bg_error_.ok()) { bg_error_ = status; // stop compaction & fail any further writes } write_thread_.ExitWriteThread(&w, last_writer, status); if (context.schedule_bg_work_) { MaybeScheduleFlushOrCompaction(); } mutex_.Unlock(); if (status.IsTimedOut()) { RecordTick(stats_, WRITE_TIMEDOUT); } return status; }