// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_H_
#define BASE_TASK_THREAD_POOL_THREAD_GROUP_H_

#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <vector>

#include "base/base_export.h"
#include "base/dcheck_is_on.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/raw_ptr_exclusion.h"
#include "base/memory/stack_allocated.h"
#include "base/task/common/checked_lock.h"
#include "base/task/thread_pool/priority_queue.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/tracked_ref.h"
#include "base/task/thread_pool/worker_thread.h"
#include "build/build_config.h"
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"

#if BUILDFLAG(IS_WIN)
#include "base/win/scoped_windows_thread_environment.h"
#endif

namespace base {

class WorkerThreadObserver;

namespace internal {

class TaskTracker;

// Interface and base implementation for a thread group. A thread group is a
// subset of the threads in the thread pool (see GetThreadGroupForTraits() for
// thread group selection logic when posting tasks and creating task runners).
//
// This class is thread-safe.
class BASE_EXPORT ThreadGroup {
 public:
  // Delegate interface for ThreadGroup.
  class BASE_EXPORT Delegate {
   public:
    virtual ~Delegate() = default;

    // Invoked when a TaskSource with |traits| is non-empty after the
    // ThreadGroup has run a task from it. The implementation must return the
    // thread group in which the TaskSource should be reenqueued.
    virtual ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) = 0;
  };

  enum class WorkerEnvironment {
    // No special worker environment required.
    NONE,
#if BUILDFLAG(IS_WIN)
    // Initialize a COM MTA on the worker.
    COM_MTA,
#endif  // BUILDFLAG(IS_WIN)
  };

  ThreadGroup(const ThreadGroup&) = delete;
  ThreadGroup& operator=(const ThreadGroup&) = delete;
  virtual ~ThreadGroup();

  // Creates threads, allowing existing and future tasks to run. The thread
  // group runs at most `max_tasks` / `max_best_effort_tasks` unblocked task
  // with any / BEST_EFFORT priority concurrently. It reclaims unused threads
  // after `suggested_reclaim_time`. It uses `service_thread_task_runner` to
  // monitor for blocked tasks, `service_thread_task_runner` is used to setup
  // FileDescriptorWatcher on worker threads. It must refer to a Thread with
  // MessagePumpType::IO. If specified, it notifies `worker_thread_observer`
  // when a worker enters and exits its main function (the observer must not be
  // destroyed before JoinForTesting() has returned). `worker_environment`
  // specifies the environment in which tasks are executed.
  // `may_block_threshold` is the timeout after which a task in a MAY_BLOCK
  // ScopedBlockingCall is considered blocked (the thread group will choose an
  // appropriate value if none is specified).
  // `synchronous_thread_start_for_testing` is true if this ThreadGroup should
  // synchronously wait for OnMainEntry() after starting each worker. Can only
  // be called once. CHECKs on failure.
  virtual void Start(
      size_t max_tasks,
      size_t max_best_effort_tasks,
      TimeDelta suggested_reclaim_time,
      scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
      WorkerThreadObserver* worker_thread_observer,
      WorkerEnvironment worker_environment,
      bool synchronous_thread_start_for_testing,
      std::optional<TimeDelta> may_block_threshold) = 0;

  // Registers the thread group in TLS.
  void BindToCurrentThread();

  // Resets the thread group in TLS.
  void UnbindFromCurrentThread();

  // Returns true if the thread group is registered in TLS.
  bool IsBoundToCurrentThread() const;

  // Removes |task_source| from |priority_queue_|. Returns a
  // RegisteredTaskSource that evaluats to true if successful, or false if
  // |task_source| is not currently in |priority_queue_|, such as when a worker
  // is running a task from it.
  RegisteredTaskSource RemoveTaskSource(const TaskSource& task_source);

  // Updates the position of the TaskSource in |transaction| in this
  // ThreadGroup's PriorityQueue based on the TaskSource's current traits.
  //
  // Implementations should instantiate a concrete ScopedCommandsExecutor and
  // invoke UpdateSortKeyImpl().
  virtual void UpdateSortKey(TaskSource::Transaction transaction) = 0;

  // Pushes the TaskSource in |transaction_with_task_source| into this
  // ThreadGroup's PriorityQueue and wakes up workers as appropriate.
  //
  // Implementations should instantiate a concrete ScopedCommandsExecutor and
  // invoke PushTaskSourceAndWakeUpWorkersImpl().
  virtual void PushTaskSourceAndWakeUpWorkers(
      RegisteredTaskSourceAndTransaction transaction_with_task_source) = 0;

  // Move all task sources from this ThreadGroup's PriorityQueue to the
  // |destination_thread_group|'s.
  void HandoffAllTaskSourcesToOtherThreadGroup(
      ThreadGroup* destination_thread_group);
  // Move all task sources except the ones with TaskPriority::USER_BLOCKING,
  // from this ThreadGroup's PriorityQueue to the |destination_thread_group|'s.
  void HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
      ThreadGroup* destination_thread_group);

  // Returns true if a task with |sort_key| running in this thread group should
  // return ASAP, either because its priority is not allowed to run or because
  // work of higher priority is pending. Thread-safe but may return an outdated
  // result (if a task unnecessarily yields due to this, it will simply be
  // re-scheduled).
  bool ShouldYield(TaskSourceSortKey sort_key);

  // Prevents new tasks from starting to run and waits for currently running
  // tasks to complete their execution. It is guaranteed that no thread will do
  // work on behalf of this ThreadGroup after this returns. It is
  // invalid to post a task once this is called. TaskTracker::Flush() can be
  // called before this to complete existing tasks, which might otherwise post a
  // task during JoinForTesting(). This can only be called once.
  virtual void JoinForTesting() = 0;

  // Returns the maximum number of non-blocked tasks that can run concurrently
  // in this ThreadGroup.
  //
  // TODO(fdoray): Remove this method. https://crbug.com/687264
  virtual size_t GetMaxConcurrentNonBlockedTasksDeprecated() const;

  // Wakes up workers as appropriate for the new CanRunPolicy policy. Must be
  // called after an update to CanRunPolicy in TaskTracker.
  virtual void DidUpdateCanRunPolicy() = 0;

  virtual void OnShutdownStarted() = 0;

  // Returns true if a thread group is registered in TLS. Used by diagnostic
  // code to check whether it's inside a ThreadPool task.
  static bool CurrentThreadHasGroup();

  // Returns |max_tasks_|/|max_best_effort_tasks_|.
  size_t GetMaxTasksForTesting() const;
  size_t GetMaxBestEffortTasksForTesting() const;

  // Waits until at least |n| workers are idle. Note that while workers are
  // disallowed from cleaning up during this call: tests using a custom
  // |suggested_reclaim_time_| need to be careful to invoke this swiftly after
  // unblocking the waited upon workers as: if a worker is already detached by
  // the time this is invoked, it will never make it onto the idle set and
  // this call will hang.
  void WaitForWorkersIdleForTesting(size_t n);

  // Waits until at least |n| workers are idle.
  void WaitForWorkersIdleLockRequiredForTesting(size_t n)
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Waits until all workers are idle.
  void WaitForAllWorkersIdleForTesting();

  // Waits until |n| workers have cleaned up (went through
  // WorkerThread::Delegate::OnMainExit()) since the last call to
  // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet).
  void WaitForWorkersCleanedUpForTesting(size_t n);

  // Returns the number of workers in this thread group.
  size_t NumberOfWorkersForTesting() const;
  // Returns the number of workers that are idle (i.e. not running tasks).
  size_t NumberOfIdleWorkersForTesting() const;
  // Returns the number of workers that are idle (i.e. not running tasks).
  virtual size_t NumberOfIdleWorkersLockRequiredForTesting() const
      EXCLUSIVE_LOCKS_REQUIRED(lock_) = 0;

  class ThreadGroupWorkerDelegate;

 protected:
  ThreadGroup(std::string_view histogram_label,
              std::string_view thread_group_label,
              ThreadType thread_type_hint,
              TrackedRef<TaskTracker> task_tracker,
              TrackedRef<Delegate> delegate);

  void StartImpl(
      size_t max_tasks,
      size_t max_best_effort_tasks,
      TimeDelta suggested_reclaim_time,
      scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
      WorkerThreadObserver* worker_thread_observer,
      WorkerEnvironment worker_environment,
      bool synchronous_thread_start_for_testing = false,
      std::optional<TimeDelta> may_block_threshold =
          std::optional<TimeDelta>());

  // Derived classes must implement a ScopedCommandsExecutor that derives from
  // this to perform operations at the end of a scope, when all locks have been
  // released.
  class BaseScopedCommandsExecutor {
   public:
    BaseScopedCommandsExecutor(const BaseScopedCommandsExecutor&) = delete;
    BaseScopedCommandsExecutor& operator=(const BaseScopedCommandsExecutor&) =
        delete;
    virtual ~BaseScopedCommandsExecutor();

    void ScheduleStart(scoped_refptr<WorkerThread> worker);
    void ScheduleAdjustMaxTasks();
    void ScheduleReleaseTaskSource(RegisteredTaskSource task_source);
    // Unlocks held_lock. Flushes this executor.
    void FlushWorkerCreation(CheckedLock* held_lock);

   protected:
    explicit BaseScopedCommandsExecutor(ThreadGroup* outer);

    // RAW_PTR_EXCLUSION: Performance: visible in sampling profiler and stack
    // scoped, also a back-pointer to the owning object.
    RAW_PTR_EXCLUSION ThreadGroup* outer_ = nullptr;

   protected:
    // Performs BaseScopedCommandsExecutor-related tasks, must be called in this
    // class' destructor.
    void Flush();

    std::vector<RegisteredTaskSource> task_sources_to_release_;
    absl::InlinedVector<scoped_refptr<WorkerThread>, 2> workers_to_start_;
    bool must_schedule_adjust_max_tasks_ = false;
  };
  virtual std::unique_ptr<BaseScopedCommandsExecutor> GetExecutor() = 0;

  // Allows a task source to be pushed to a ThreadGroup's PriorityQueue at the
  // end of a scope, when all locks have been released.
  class ScopedReenqueueExecutor {
    STACK_ALLOCATED();

   public:
    ScopedReenqueueExecutor();
    ScopedReenqueueExecutor(const ScopedReenqueueExecutor&) = delete;
    ScopedReenqueueExecutor& operator=(const ScopedReenqueueExecutor&) = delete;
    ~ScopedReenqueueExecutor();

    // A RegisteredTaskSourceAndTransaction and the ThreadGroup in which it
    // should be enqueued.
    void SchedulePushTaskSourceAndWakeUpWorkers(
        RegisteredTaskSourceAndTransaction transaction_with_task_source,
        ThreadGroup* destination_thread_group);

   private:
    // A RegisteredTaskSourceAndTransaction and the thread group in which it
    // should be enqueued.
    std::optional<RegisteredTaskSourceAndTransaction>
        transaction_with_task_source_;
    ThreadGroup* destination_thread_group_ = nullptr;
  };

  ThreadGroup(TrackedRef<TaskTracker> task_tracker,
              TrackedRef<Delegate> delegate);

#if BUILDFLAG(IS_WIN)
  static std::unique_ptr<win::ScopedWindowsThreadEnvironment>
  GetScopedWindowsThreadEnvironment(WorkerEnvironment environment);
#endif

  const TrackedRef<TaskTracker> task_tracker_;
  const TrackedRef<Delegate> delegate_;

  // Returns the number of workers required of workers to run all queued
  // BEST_EFFORT task sources allowed to run by the current CanRunPolicy.
  size_t GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Returns the number of workers required to run all queued
  // USER_VISIBLE/USER_BLOCKING task sources allowed to run by the current
  // CanRunPolicy.
  size_t GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Ensures that there are enough workers to run queued task sources.
  // |executor| is forwarded from the one received in
  // PushTaskSourceAndWakeUpWorkersImpl()
  virtual void EnsureEnoughWorkersLockRequired(
      BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_) = 0;

  // Reenqueues a |transaction_with_task_source| from which a Task just ran in
  // the current ThreadGroup into the appropriate ThreadGroup.
  void ReEnqueueTaskSourceLockRequired(
      BaseScopedCommandsExecutor* workers_executor,
      ScopedReenqueueExecutor* reenqueue_executor,
      RegisteredTaskSourceAndTransaction transaction_with_task_source)
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Returns the next task source from |priority_queue_| if permitted to run and
  // pops |priority_queue_| if the task source returned no longer needs to be
  // queued (reached its maximum concurrency). Otherwise returns nullptr and
  // pops |priority_queue_| so this can be called again.
  RegisteredTaskSource TakeRegisteredTaskSource(
      BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Must be invoked by implementations of the corresponding non-Impl() methods.
  void UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
                         TaskSource::Transaction transaction);
  void PushTaskSourceAndWakeUpWorkersImpl(
      BaseScopedCommandsExecutor* executor,
      RegisteredTaskSourceAndTransaction transaction_with_task_source);
  void OnShutDownStartedImpl(BaseScopedCommandsExecutor* executor);

  virtual ThreadGroupWorkerDelegate* GetWorkerDelegate(
      WorkerThread* worker) = 0;

  // Returns the desired number of awake workers, given current workload and
  // concurrency limits.
  size_t GetDesiredNumAwakeWorkersLockRequired() const
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Examines the list of WorkerThreads and increments |max_tasks_| for each
  // worker that has been within the scope of a MAY_BLOCK ScopedBlockingCall for
  // more than BlockedThreshold(). Reschedules a call if necessary.
  void AdjustMaxTasks();

  // Schedules AdjustMaxTasks() if required.
  void MaybeScheduleAdjustMaxTasksLockRequired(
      BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Enqueues all task sources from `new_priority_queue` into this thread group.
  void EnqueueAllTaskSources(PriorityQueue* new_priority_queue);

  // Returns the threshold after which the max tasks is increased to compensate
  // for a worker that is within a MAY_BLOCK ScopedBlockingCall.
  TimeDelta may_block_threshold_for_testing() const {
    return after_start().may_block_threshold;
  }

  // Interval at which the service thread checks for workers in this thread
  // group that have been in a MAY_BLOCK ScopedBlockingCall for more than
  // may_block_threshold().
  TimeDelta blocked_workers_poll_period_for_testing() const {
    return after_start().blocked_workers_poll_period;
  }

  // Starts calling AdjustMaxTasks() periodically on
  // |service_thread_task_runner_|.
  void ScheduleAdjustMaxTasks();

  // Returns true if AdjustMaxTasks() should periodically be called on
  // |service_thread_task_runner_|.
  bool ShouldPeriodicallyAdjustMaxTasksLockRequired()
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Updates the minimum priority allowed to run below which tasks should yield.
  // This should be called whenever |num_running_tasks_| or |max_tasks| changes,
  // or when a new task is added to |priority_queue_|.
  void UpdateMinAllowedPriorityLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Increments/decrements the number of tasks of |priority| that are currently
  // running in this thread group. Must be invoked before/after running a task.
  void DecrementTasksRunningLockRequired(TaskPriority priority)
      EXCLUSIVE_LOCKS_REQUIRED(lock_);
  void IncrementTasksRunningLockRequired(TaskPriority priority)
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Increments/decrements the number of [best effort] tasks that can run in
  // this thread group.
  void DecrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
  void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
  void DecrementMaxBestEffortTasksLockRequired()
      EXCLUSIVE_LOCKS_REQUIRED(lock_);
  void IncrementMaxBestEffortTasksLockRequired()
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Values set at Start() and never modified afterwards.
  struct InitializedInStart {
    InitializedInStart();
    ~InitializedInStart();

#if DCHECK_IS_ON()
    // Set after all members of this struct are set.
    bool initialized = false;
#endif

    // Initial value of |max_tasks_|.
    size_t initial_max_tasks = 0;

    // Suggested reclaim time for workers.
    TimeDelta suggested_reclaim_time;
    bool no_worker_reclaim = false;

    // Environment to be initialized per worker.
    WorkerEnvironment worker_environment = WorkerEnvironment::NONE;

    scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner;

    // Optional observer notified when a worker enters and exits its main.
    raw_ptr<WorkerThreadObserver> worker_thread_observer = nullptr;

    // Threshold after which the max tasks is increased to compensate for a
    // worker that is within a MAY_BLOCK ScopedBlockingCall.
    TimeDelta may_block_threshold;

    // The period between calls to AdjustMaxTasks() when the thread group is at
    // capacity.
    TimeDelta blocked_workers_poll_period;

    // The max number of workers that a ThreadGroupSemaphore will create in any
    // one EnsureEnoughWorkers() call.
    int max_num_workers_created = 2;
  } initialized_in_start_;

  InitializedInStart& in_start() {
#if DCHECK_IS_ON()
    DCHECK(!initialized_in_start_.initialized);
#endif
    return initialized_in_start_;
  }
  const InitializedInStart& after_start() const {
#if DCHECK_IS_ON()
    DCHECK(initialized_in_start_.initialized);
#endif
    return initialized_in_start_;
  }

  // Synchronizes accesses to all members of this class which are neither const,
  // atomic, nor immutable after start. Since this lock is a bottleneck to post
  // and schedule work, only simple data structure manipulations are allowed
  // within its scope (no thread creation or wake up).
  mutable CheckedLock lock_{};

  bool disable_fair_scheduling_ GUARDED_BY(lock_){false};

  // PriorityQueue from which all threads of this ThreadGroup get work.
  PriorityQueue priority_queue_ GUARDED_BY(lock_);

  struct YieldSortKey {
    TaskPriority priority;
    uint8_t worker_count;
  };
  // Sort key which compares greater than or equal to any other sort key.
  static constexpr YieldSortKey kMaxYieldSortKey = {TaskPriority::BEST_EFFORT,
                                                    0U};

  // When the thread group is at or above capacity and has pending work, this is
  // set to contain the priority and worker count of the next TaskSource to
  // schedule, or kMaxYieldSortKey otherwise. This is used to decide whether a
  // TaskSource should yield. Once ShouldYield() returns true, it is reset to
  // kMaxYieldSortKey to prevent additional from unnecessary yielding. This is
  // expected to be always kept up-to-date by derived classes when |lock_| is
  // released. It is annotated as GUARDED_BY(lock_) because it is always updated
  // under the lock (to avoid races with other state during the update) but it
  // is nonetheless always safe to read it without the lock (since it's atomic).
  std::atomic<YieldSortKey> max_allowed_sort_key_ GUARDED_BY(lock_){
      kMaxYieldSortKey};

  const std::string histogram_label_;
  const std::string thread_group_label_;
  const ThreadType thread_type_hint_;

  // All workers owned by this thread group.
  size_t worker_sequence_num_ GUARDED_BY(lock_) = 0;

  bool shutdown_started_ GUARDED_BY(lock_) = false;

  // Maximum number of tasks of any priority / BEST_EFFORT priority that can run
  // concurrently in this thread group.
  size_t max_tasks_ GUARDED_BY(lock_) = 0;
  size_t max_best_effort_tasks_ GUARDED_BY(lock_) = 0;

  // Number of tasks of any priority / BEST_EFFORT priority that are currently
  // running in this thread group.
  size_t num_running_tasks_ GUARDED_BY(lock_) = 0;
  size_t num_running_best_effort_tasks_ GUARDED_BY(lock_) = 0;

  // Number of workers running a task of any priority / BEST_EFFORT priority
  // that are within the scope of a MAY_BLOCK ScopedBlockingCall but haven't
  // caused a max tasks increase yet.
  int num_unresolved_may_block_ GUARDED_BY(lock_) = 0;
  int num_unresolved_best_effort_may_block_ GUARDED_BY(lock_) = 0;

  // Signaled when a worker is added to the idle workers set.
  ConditionVariable idle_workers_set_cv_for_testing_ GUARDED_BY(lock_);

  // Whether an AdjustMaxTasks() task was posted to the service thread.
  bool adjust_max_tasks_posted_ GUARDED_BY(lock_) = false;

  // Indicates to the delegates that workers are not permitted to cleanup.
  bool worker_cleanup_disallowed_for_testing_ GUARDED_BY(lock_) = false;

  // Counts the number of workers cleaned up (went through
  // WorkerThreadDelegateImpl::OnMainExit()) since the last call to
  // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet).
  // |some_workers_cleaned_up_for_testing_| is true if this was ever
  // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a
  // specific number of workers being cleaned up via
  // WaitForWorkersCleanedUpForTesting().
  size_t num_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = 0;
#if DCHECK_IS_ON()
  bool some_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = false;
#endif

  // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is
  // incremented.
  std::optional<ConditionVariable> num_workers_cleaned_up_for_testing_cv_
      GUARDED_BY(lock_);

  // All workers owned by this thread group.
  std::vector<scoped_refptr<WorkerThread>> workers_ GUARDED_BY(lock_);

  // Null-opt unless |synchronous_thread_start_for_testing| was true at
  // construction. In that case, it's signaled each time
  // WorkerThreadDelegateImpl::OnMainEntry() completes.
  std::optional<WaitableEvent> worker_started_for_testing_;

  // Set at the start of JoinForTesting().
  bool join_for_testing_started_ GUARDED_BY(lock_) = false;
};

}  // namespace internal
}  // namespace base

#endif  // BASE_TASK_THREAD_POOL_THREAD_GROUP_H_
