/*
 * Copyright (C) 2019 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "src/profiling/perf/perf_producer.h"

#include <optional>
#include <random>
#include <utility>
#include <vector>

#include <unistd.h>

#include <unwindstack/Error.h>
#include <unwindstack/Unwinder.h>

#include "perfetto/base/logging.h"
#include "perfetto/base/task_runner.h"
#include "perfetto/ext/base/file_utils.h"
#include "perfetto/ext/base/metatrace.h"
#include "perfetto/ext/base/string_utils.h"
#include "perfetto/ext/base/utils.h"
#include "perfetto/ext/base/weak_ptr.h"
#include "perfetto/ext/tracing/core/basic_types.h"
#include "perfetto/ext/tracing/core/producer.h"
#include "perfetto/ext/tracing/core/tracing_service.h"
#include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
#include "perfetto/tracing/core/data_source_config.h"
#include "perfetto/tracing/core/data_source_descriptor.h"
#include "src/profiling/common/callstack_trie.h"
#include "src/profiling/common/proc_cmdline.h"
#include "src/profiling/common/producer_support.h"
#include "src/profiling/common/profiler_guardrails.h"
#include "src/profiling/common/unwind_support.h"
#include "src/profiling/perf/common_types.h"
#include "src/profiling/perf/event_reader.h"

#include "protos/perfetto/common/builtin_clock.pbzero.h"
#include "protos/perfetto/common/perf_events.gen.h"
#include "protos/perfetto/common/perf_events.pbzero.h"
#include "protos/perfetto/config/profiling/perf_event_config.gen.h"
#include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
#include "protos/perfetto/trace/trace_packet.pbzero.h"
#include "protos/perfetto/trace/trace_packet_defaults.pbzero.h"

namespace perfetto {
namespace profiling {
namespace {

// TODO(b/151835887): on Android, when using signals, there exists a vulnerable
// window between a process image being replaced by execve, and the new
// libc instance reinstalling the proper signal handlers. During this window,
// the signal disposition is defaulted to terminating the process.
// This is a best-effort mitigation from the daemon's side, using a heuristic
// that most execve calls follow a fork. So if we get a sample for a very fresh
// process, the grace period will give it a chance to get to
// a properly initialised state prior to getting signalled. This doesn't help
// cases when a mature process calls execve, or when the target gets descheduled
// (since this is a naive walltime wait).
// The proper fix is in the platform, see bug for progress.
constexpr uint32_t kProcDescriptorsAndroidDelayMs = 50;

constexpr uint32_t kMemoryLimitCheckPeriodMs = 1000;

constexpr uint32_t kInitialConnectionBackoffMs = 100;
constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;

constexpr char kProducerName[] = "perfetto.traced_perf";
constexpr char kDataSourceName[] = "linux.perf";

size_t NumberOfCpus() {
  return static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
}

std::vector<uint32_t> GetOnlineCpus() {
  size_t cpu_count = NumberOfCpus();
  if (cpu_count == 0) {
    return {};
  }

  static constexpr char kOnlineValue[] = "1\n";
  std::vector<uint32_t> online_cpus;
  online_cpus.reserve(cpu_count);
  for (uint32_t cpu = 0; cpu < cpu_count; ++cpu) {
    std::string res;
    base::StackString<1024> path("/sys/devices/system/cpu/cpu%u/online", cpu);
    if (!base::ReadFile(path.c_str(), &res)) {
      // Always consider CPU 0 to be online if the "online" file does not exist
      // for it. There seem to be several assumptions in the kernel which make
      // CPU 0 special so this is a pretty safe bet.
      if (cpu != 0) {
        return {};
      }
      res = kOnlineValue;
    }
    if (res != kOnlineValue) {
      continue;
    }
    online_cpus.push_back(cpu);
  }
  return online_cpus;
}

int32_t ToBuiltinClock(int32_t clockid) {
  switch (clockid) {
    case CLOCK_REALTIME:
      return protos::pbzero::BUILTIN_CLOCK_REALTIME;
    case CLOCK_MONOTONIC:
      return protos::pbzero::BUILTIN_CLOCK_MONOTONIC;
    case CLOCK_MONOTONIC_RAW:
      return protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW;
    case CLOCK_BOOTTIME:
      return protos::pbzero::BUILTIN_CLOCK_BOOTTIME;
    // Should never get invalid input here as otherwise the syscall itself
    // would've failed earlier.
    default:
      return protos::pbzero::BUILTIN_CLOCK_UNKNOWN;
  }
}

TraceWriter::TracePacketHandle StartTracePacket(TraceWriter* trace_writer) {
  auto packet = trace_writer->NewTracePacket();
  packet->set_sequence_flags(
      protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
  return packet;
}

void WritePerfEventDefaultsPacket(const EventConfig& event_config,
                                  TraceWriter* trace_writer) {
  auto packet = trace_writer->NewTracePacket();
  packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
  packet->set_timestamp_clock_id(protos::pbzero::BUILTIN_CLOCK_BOOTTIME);

  // start new incremental state generation:
  packet->set_sequence_flags(
      protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);

  // default packet timestamp clock for the samples:
  perf_event_attr* perf_attr = event_config.perf_attr();
  auto* defaults = packet->set_trace_packet_defaults();
  int32_t builtin_clock = ToBuiltinClock(perf_attr->clockid);
  defaults->set_timestamp_clock_id(static_cast<uint32_t>(builtin_clock));

  auto* perf_defaults = defaults->set_perf_sample_defaults();
  auto* timebase_pb = perf_defaults->set_timebase();

  // frequency/period:
  if (perf_attr->freq) {
    timebase_pb->set_frequency(perf_attr->sample_freq);
  } else {
    timebase_pb->set_period(perf_attr->sample_period);
  }

  // timebase event:
  const PerfCounter& timebase = event_config.timebase_event();
  switch (timebase.event_type()) {
    case PerfCounter::Type::kBuiltinCounter: {
      timebase_pb->set_counter(
          static_cast<protos::pbzero::PerfEvents::Counter>(timebase.counter));
      break;
    }
    case PerfCounter::Type::kTracepoint: {
      auto* tracepoint_pb = timebase_pb->set_tracepoint();
      tracepoint_pb->set_name(timebase.tracepoint_name);
      tracepoint_pb->set_filter(timebase.tracepoint_filter);
      break;
    }
    case PerfCounter::Type::kRawEvent: {
      auto* raw_pb = timebase_pb->set_raw_event();
      raw_pb->set_type(timebase.attr_type);
      raw_pb->set_config(timebase.attr_config);
      raw_pb->set_config1(timebase.attr_config1);
      raw_pb->set_config2(timebase.attr_config2);
      break;
    }
  }

  // optional name to identify the counter during parsing:
  if (!timebase.name.empty()) {
    timebase_pb->set_name(timebase.name);
  }

  // follower events:
  for (const auto& e : event_config.follower_events()) {
    auto* followers_pb = perf_defaults->add_followers();
    followers_pb->set_name(e.name);

    switch (e.event_type()) {
      case PerfCounter::Type::kBuiltinCounter: {
        followers_pb->set_counter(
            static_cast<protos::pbzero::PerfEvents::Counter>(e.counter));
        break;
      }
      case PerfCounter::Type::kTracepoint: {
        auto* tracepoint_pb = followers_pb->set_tracepoint();
        tracepoint_pb->set_name(e.tracepoint_name);
        tracepoint_pb->set_filter(e.tracepoint_filter);
        break;
      }
      case PerfCounter::Type::kRawEvent: {
        auto* raw_pb = followers_pb->set_raw_event();
        raw_pb->set_type(e.attr_type);
        raw_pb->set_config(e.attr_config);
        raw_pb->set_config1(e.attr_config1);
        raw_pb->set_config2(e.attr_config2);
        break;
      }
    }
  }

  // Not setting timebase.timestamp_clock since the field that matters during
  // parsing is the root timestamp_clock_id set above.

  // Record the random shard we've chosen so that the post-processing can infer
  // which processes would've been unwound if sampled. In particular this lets
  // us distinguish between "running but not chosen" and "running and chosen,
  // but not sampled" cases.
  const auto& process_sharding = event_config.filter().process_sharding;
  if (process_sharding.has_value()) {
    perf_defaults->set_process_shard_count(process_sharding->shard_count);
    perf_defaults->set_chosen_process_shard(process_sharding->chosen_shard);
  }
}

uint32_t TimeToNextReadTickMs(DataSourceInstanceID ds_id, uint32_t period_ms) {
  // Normally, we'd schedule the next tick at the next |period_ms|
  // boundary of the boot clock. However, to avoid aligning the read tasks of
  // all concurrent data sources, we select a deterministic offset based on the
  // data source id.
  std::minstd_rand prng(static_cast<std::minstd_rand::result_type>(ds_id));
  std::uniform_int_distribution<uint32_t> dist(0, period_ms - 1);
  uint32_t ds_period_offset = dist(prng);

  uint64_t now_ms = static_cast<uint64_t>(base::GetWallTimeMs().count());
  return period_ms - ((now_ms - ds_period_offset) % period_ms);
}

protos::pbzero::Profiling::CpuMode ToCpuModeEnum(uint16_t perf_cpu_mode) {
  using Profiling = protos::pbzero::Profiling;
  switch (perf_cpu_mode) {
    case PERF_RECORD_MISC_KERNEL:
      return Profiling::MODE_KERNEL;
    case PERF_RECORD_MISC_USER:
      return Profiling::MODE_USER;
    case PERF_RECORD_MISC_HYPERVISOR:
      return Profiling::MODE_HYPERVISOR;
    case PERF_RECORD_MISC_GUEST_KERNEL:
      return Profiling::MODE_GUEST_KERNEL;
    case PERF_RECORD_MISC_GUEST_USER:
      return Profiling::MODE_GUEST_USER;
    default:
      return Profiling::MODE_UNKNOWN;
  }
}

protos::pbzero::Profiling::StackUnwindError ToProtoEnum(
    unwindstack::ErrorCode error_code) {
  using Profiling = protos::pbzero::Profiling;
  switch (error_code) {
    case unwindstack::ERROR_NONE:
      return Profiling::UNWIND_ERROR_NONE;
    case unwindstack::ERROR_MEMORY_INVALID:
      return Profiling::UNWIND_ERROR_MEMORY_INVALID;
    case unwindstack::ERROR_UNWIND_INFO:
      return Profiling::UNWIND_ERROR_UNWIND_INFO;
    case unwindstack::ERROR_UNSUPPORTED:
      return Profiling::UNWIND_ERROR_UNSUPPORTED;
    case unwindstack::ERROR_INVALID_MAP:
      return Profiling::UNWIND_ERROR_INVALID_MAP;
    case unwindstack::ERROR_MAX_FRAMES_EXCEEDED:
      return Profiling::UNWIND_ERROR_MAX_FRAMES_EXCEEDED;
    case unwindstack::ERROR_REPEATED_FRAME:
      return Profiling::UNWIND_ERROR_REPEATED_FRAME;
    case unwindstack::ERROR_INVALID_ELF:
      return Profiling::UNWIND_ERROR_INVALID_ELF;
    case unwindstack::ERROR_SYSTEM_CALL:
      return Profiling::UNWIND_ERROR_SYSTEM_CALL;
    case unwindstack::ERROR_THREAD_TIMEOUT:
      return Profiling::UNWIND_ERROR_THREAD_TIMEOUT;
    case unwindstack::ERROR_THREAD_DOES_NOT_EXIST:
      return Profiling::UNWIND_ERROR_THREAD_DOES_NOT_EXIST;
    case unwindstack::ERROR_BAD_ARCH:
      return Profiling::UNWIND_ERROR_BAD_ARCH;
    case unwindstack::ERROR_MAPS_PARSE:
      return Profiling::UNWIND_ERROR_MAPS_PARSE;
    case unwindstack::ERROR_INVALID_PARAMETER:
      return Profiling::UNWIND_ERROR_INVALID_PARAMETER;
    case unwindstack::ERROR_PTRACE_CALL:
      return Profiling::UNWIND_ERROR_PTRACE_CALL;
  }
  return Profiling::UNWIND_ERROR_UNKNOWN;
}

}  // namespace

// static
bool PerfProducer::ShouldRejectDueToFilter(
    pid_t pid,
    const TargetFilter& filter,
    bool skip_cmdline,
    base::FlatSet<std::string>* additional_cmdlines,
    std::function<bool(std::string*)> read_proc_pid_cmdline) {
  PERFETTO_CHECK(additional_cmdlines);

  std::string cmdline;
  bool have_cmdline = false;
  if (!skip_cmdline)
    have_cmdline = read_proc_pid_cmdline(&cmdline);

  const char* binname = "";
  if (have_cmdline) {
    binname = glob_aware::FindBinaryName(cmdline.c_str(), cmdline.size());
  }

  auto has_matching_pattern = [](const std::vector<std::string>& patterns,
                                 const char* cmd, const char* name) {
    for (const std::string& pattern : patterns) {
      if (glob_aware::MatchGlobPattern(pattern.c_str(), cmd, name)) {
        return true;
      }
    }
    return false;
  };

  if (have_cmdline &&
      has_matching_pattern(filter.exclude_cmdlines, cmdline.c_str(), binname)) {
    PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to cmdline",
                  static_cast<int>(pid));
    return true;
  }
  if (filter.exclude_pids.count(pid)) {
    PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to pid",
                  static_cast<int>(pid));
    return true;
  }

  if (have_cmdline &&
      has_matching_pattern(filter.cmdlines, cmdline.c_str(), binname)) {
    return false;
  }
  if (filter.pids.count(pid)) {
    return false;
  }

  // Empty allow filter means keep everything that isn't explicitly excluded.
  if (filter.cmdlines.empty() && filter.pids.empty() &&
      !filter.additional_cmdline_count &&
      !filter.process_sharding.has_value()) {
    return false;
  }

  // Niche option: process sharding to amortise systemwide unwinding costs.
  // Selects a subset of all processes by using the low order bits of their pid.
  if (filter.process_sharding.has_value()) {
    uint32_t upid = static_cast<uint32_t>(pid);
    if (upid % filter.process_sharding->shard_count ==
        filter.process_sharding->chosen_shard) {
      PERFETTO_DLOG("Process sharding: keeping pid [%d]",
                    static_cast<int>(pid));
      return false;
    } else {
      PERFETTO_DLOG("Process sharding: rejecting pid [%d]",
                    static_cast<int>(pid));
      return true;
    }
  }

  // Niche option: additionally remember the first seen N process cmdlines, and
  // keep all processes with those names.
  if (have_cmdline) {
    if (additional_cmdlines->count(cmdline)) {
      return false;
    }
    if (additional_cmdlines->size() < filter.additional_cmdline_count) {
      additional_cmdlines->insert(cmdline);
      return false;
    }
  }

  PERFETTO_DLOG("Rejecting samples for pid [%d]", static_cast<int>(pid));
  return true;
}

PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter,
                           base::TaskRunner* task_runner)
    : task_runner_(task_runner),
      proc_fd_getter_(proc_fd_getter),
      unwinding_worker_(this),
      weak_factory_(this) {
  proc_fd_getter->SetDelegate(this);
}

void PerfProducer::SetupDataSource(DataSourceInstanceID,
                                   const DataSourceConfig&) {}

void PerfProducer::StartDataSource(DataSourceInstanceID ds_id,
                                   const DataSourceConfig& config) {
  uint64_t tracing_session_id = config.tracing_session_id();
  PERFETTO_LOG("StartDataSource(ds %zu, session %" PRIu64 ", name %s)",
               static_cast<size_t>(ds_id), tracing_session_id,
               config.name().c_str());

  if (config.name() == MetatraceWriter::kDataSourceName) {
    StartMetatraceSource(ds_id, static_cast<BufferID>(config.target_buffer()));
    return;
  }

  // linux.perf data source
  if (config.name() != kDataSourceName)
    return;

  // Tracepoint name -> id lookup in case the config asks for tracepoints:
  auto tracepoint_id_lookup = [this](const std::string& group,
                                     const std::string& name) {
    if (!tracefs_)  // lazy init or retry
      tracefs_ = FtraceProcfs::CreateGuessingMountPoint();
    if (!tracefs_)  // still didn't find an accessible tracefs
      return 0u;
    return tracefs_->ReadEventId(group, name);
  };

  protos::gen::PerfEventConfig event_config_pb;
  if (!event_config_pb.ParseFromString(config.perf_event_config_raw())) {
    PERFETTO_ELOG("PerfEventConfig could not be parsed.");
    return;
  }

  // Unlikely: handle a callstack sampling option that shares a random decision
  // between all data sources within a tracing session. Instead of introducing
  // session-scoped data, we replicate the decision in each per-DS EventConfig.
  std::optional<ProcessSharding> process_sharding;
  uint32_t shard_count =
      event_config_pb.callstack_sampling().scope().process_shard_count();
  if (shard_count > 0) {
    process_sharding =
        GetOrChooseCallstackProcessShard(tracing_session_id, shard_count);
  }

  std::optional<EventConfig> event_config = EventConfig::Create(
      event_config_pb, config, process_sharding, tracepoint_id_lookup);
  if (!event_config.has_value()) {
    PERFETTO_ELOG("PerfEventConfig rejected.");
    return;
  }

  std::vector<uint32_t> online_cpus = GetOnlineCpus();
  if (online_cpus.empty()) {
    PERFETTO_ELOG("No online CPUs found.");
    return;
  }

  std::vector<EventReader> per_cpu_readers;
  for (uint32_t cpu : online_cpus) {
    std::optional<EventReader> event_reader =
        EventReader::ConfigureEvents(cpu, event_config.value());
    if (!event_reader.has_value()) {
      PERFETTO_ELOG("Failed to set up perf events for cpu%" PRIu32
                    ", discarding data source.",
                    cpu);
      return;
    }
    per_cpu_readers.emplace_back(std::move(event_reader.value()));
  }

  auto buffer_id = static_cast<BufferID>(config.target_buffer());
  auto writer = endpoint_->CreateTraceWriter(buffer_id);

  // Construct the data source instance.
  std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it;
  bool inserted;
  std::tie(ds_it, inserted) = data_sources_.emplace(
      std::piecewise_construct, std::forward_as_tuple(ds_id),
      std::forward_as_tuple(event_config.value(), tracing_session_id,
                            std::move(writer), std::move(per_cpu_readers)));
  PERFETTO_CHECK(inserted);
  DataSourceState& ds = ds_it->second;

  // Start the configured events.
  for (auto& per_cpu_reader : ds.per_cpu_readers) {
    per_cpu_reader.EnableEvents();
  }

  WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());

  InterningOutputTracker::WriteFixedInterningsPacket(
      ds_it->second.trace_writer.get(),
      protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);

  // Inform unwinder of the new data source instance, and optionally start a
  // periodic task to clear its cached state.
  auto unwind_mode = (ds.event_config.unwind_mode() ==
                      protos::gen::PerfEventConfig::UNWIND_FRAME_POINTER)
                         ? Unwinder::UnwindMode::kFramePointer
                         : Unwinder::UnwindMode::kUnwindStack;
  unwinding_worker_->PostStartDataSource(ds_id, ds.event_config.kernel_frames(),
                                         unwind_mode);
  if (ds.event_config.unwind_state_clear_period_ms()) {
    unwinding_worker_->PostClearCachedStatePeriodic(
        ds_id, ds.event_config.unwind_state_clear_period_ms());
  }

  // Kick off periodic read task.
  auto tick_period_ms = ds.event_config.read_tick_period_ms();
  auto weak_this = weak_factory_.GetWeakPtr();
  task_runner_->PostDelayedTask(
      [weak_this, ds_id] {
        if (weak_this)
          weak_this->TickDataSourceRead(ds_id);
      },
      TimeToNextReadTickMs(ds_id, tick_period_ms));

  // Optionally kick off periodic memory footprint limit check.
  uint32_t max_daemon_memory_kb = event_config_pb.max_daemon_memory_kb();
  if (max_daemon_memory_kb > 0) {
    task_runner_->PostDelayedTask(
        [weak_this, ds_id, max_daemon_memory_kb] {
          if (weak_this)
            weak_this->CheckMemoryFootprintPeriodic(ds_id,
                                                    max_daemon_memory_kb);
        },
        kMemoryLimitCheckPeriodMs);
  }
}

void PerfProducer::CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,
                                                uint32_t max_daemon_memory_kb) {
  auto ds_it = data_sources_.find(ds_id);
  if (ds_it == data_sources_.end())
    return;  // stop recurring

  GuardrailConfig gconfig = {};
  gconfig.memory_guardrail_kb = max_daemon_memory_kb;

  ProfilerMemoryGuardrails footprint_snapshot;
  if (footprint_snapshot.IsOverMemoryThreshold(gconfig)) {
    PurgeDataSource(ds_id);
    return;  // stop recurring
  }

  // repost
  auto weak_this = weak_factory_.GetWeakPtr();
  task_runner_->PostDelayedTask(
      [weak_this, ds_id, max_daemon_memory_kb] {
        if (weak_this)
          weak_this->CheckMemoryFootprintPeriodic(ds_id, max_daemon_memory_kb);
      },
      kMemoryLimitCheckPeriodMs);
}

void PerfProducer::StopDataSource(DataSourceInstanceID ds_id) {
  PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(ds_id));

  // Metatrace: stop immediately (will miss the events from the
  // asynchronous shutdown of the primary data source).
  auto meta_it = metatrace_writers_.find(ds_id);
  if (meta_it != metatrace_writers_.end()) {
    meta_it->second.WriteAllAndFlushTraceWriter([] {});
    metatrace_writers_.erase(meta_it);
    return;
  }

  auto ds_it = data_sources_.find(ds_id);
  if (ds_it == data_sources_.end()) {
    // Most likely, the source is missing due to an abrupt stop (via
    // |PurgeDataSource|). Tell the service that we've stopped the source now,
    // so that it doesn't wait for the ack until the timeout.
    endpoint_->NotifyDataSourceStopped(ds_id);
    return;
  }

  // Start shutting down the reading frontend, which will propagate the stop
  // further as the intermediate buffers are cleared.
  DataSourceState& ds = ds_it->second;
  InitiateReaderStop(&ds);
}

// The perf data sources ignore flush requests, as flushing would be
// unnecessarily complicated given out-of-order unwinding and proc-fd timeouts.
// Instead of responding to explicit flushes, we can ensure that we're otherwise
// well-behaved (do not reorder packets too much), and let the service scrape
// the SMB.
void PerfProducer::Flush(FlushRequestID flush_id,
                         const DataSourceInstanceID* data_source_ids,
                         size_t num_data_sources,
                         FlushFlags) {
  // Flush metatracing if requested.
  for (size_t i = 0; i < num_data_sources; i++) {
    auto ds_id = data_source_ids[i];
    PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id));

    auto meta_it = metatrace_writers_.find(ds_id);
    if (meta_it != metatrace_writers_.end()) {
      meta_it->second.WriteAllAndFlushTraceWriter([] {});
    }
  }

  endpoint_->NotifyFlushComplete(flush_id);
}

void PerfProducer::ClearIncrementalState(
    const DataSourceInstanceID* data_source_ids,
    size_t num_data_sources) {
  for (size_t i = 0; i < num_data_sources; i++) {
    auto ds_id = data_source_ids[i];
    PERFETTO_DLOG("ClearIncrementalState(%zu)", static_cast<size_t>(ds_id));

    if (metatrace_writers_.find(ds_id) != metatrace_writers_.end())
      continue;

    auto ds_it = data_sources_.find(ds_id);
    if (ds_it == data_sources_.end()) {
      PERFETTO_DLOG("ClearIncrementalState(%zu): did not find matching entry",
                    static_cast<size_t>(ds_id));
      continue;
    }
    DataSourceState& ds = ds_it->second;

    WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());

    // Forget which incremental state we've emitted before.
    ds.interning_output.ClearHistory();
    InterningOutputTracker::WriteFixedInterningsPacket(
        ds.trace_writer.get(),
        protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);

    // Drop the cross-datasource callstack interning trie. This is not
    // necessary for correctness (the preceding step is sufficient). However,
    // incremental clearing is likely to be used in ring buffer traces, where
    // it makes sense to reset the trie's size periodically, and this is a
    // reasonable point to do so. The trie keeps the monotonic interning IDs,
    // so there is no confusion for other concurrent data sources. We do not
    // bother with clearing concurrent sources' interning output trackers as
    // their footprint should be trivial.
    callstack_trie_.ClearTrie();
  }
}

void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) {
  auto it = data_sources_.find(ds_id);
  if (it == data_sources_.end()) {
    PERFETTO_DLOG("TickDataSourceRead(%zu): source gone",
                  static_cast<size_t>(ds_id));
    return;
  }
  DataSourceState& ds = it->second;

  PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK);

  // Make a pass over all per-cpu readers.
  uint64_t max_samples = ds.event_config.samples_per_tick_limit();
  bool more_records_available = false;
  for (EventReader& reader : ds.per_cpu_readers) {
    if (ReadAndParsePerCpuBuffer(&reader, max_samples, ds_id, &ds)) {
      more_records_available = true;
    }
  }

  // Wake up the unwinder as we've (likely) pushed samples into its queue.
  unwinding_worker_->PostProcessQueue();

  if (PERFETTO_UNLIKELY(ds.status == DataSourceState::Status::kShuttingDown) &&
      !more_records_available) {
    unwinding_worker_->PostInitiateDataSourceStop(ds_id);
  } else {
    // otherwise, keep reading
    auto tick_period_ms = it->second.event_config.read_tick_period_ms();
    auto weak_this = weak_factory_.GetWeakPtr();
    task_runner_->PostDelayedTask(
        [weak_this, ds_id] {
          if (weak_this)
            weak_this->TickDataSourceRead(ds_id);
        },
        TimeToNextReadTickMs(ds_id, tick_period_ms));
  }
}

bool PerfProducer::ReadAndParsePerCpuBuffer(EventReader* reader,
                                            uint64_t max_samples,
                                            DataSourceInstanceID ds_id,
                                            DataSourceState* ds) {
  PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU);

  // If the kernel ring buffer dropped data, record it in the trace.
  size_t cpu = reader->cpu();
  auto records_lost_callback = [this, ds_id, cpu](uint64_t records_lost) {
    auto weak_this = weak_factory_.GetWeakPtr();
    task_runner_->PostTask([weak_this, ds_id, cpu, records_lost] {
      if (weak_this)
        weak_this->EmitRingBufferLoss(ds_id, cpu, records_lost);
    });
  };

  for (uint64_t i = 0; i < max_samples; i++) {
    std::optional<ParsedSample> sample =
        reader->ReadUntilSample(records_lost_callback);
    if (!sample) {
      return false;  // caught up to the writer
    }

    // Counter-only mode: skip the unwinding stage, serialise the sample
    // immediately.
    const EventConfig& event_config = ds->event_config;
    if (!event_config.sample_callstacks()) {
      CompletedSample output;
      output.common = sample->common;
      EmitSample(ds_id, std::move(output));
      continue;
    }

    // Sampling either or both of userspace and kernel callstacks.
    pid_t pid = sample->common.pid;
    auto& process_state = ds->process_states[pid];  // insert if new

    // Asynchronous proc-fd lookup timed out.
    if (process_state == ProcessTrackingStatus::kFdsTimedOut) {
      PERFETTO_DLOG("Skipping sample for pid [%d]: kFdsTimedOut",
                    static_cast<int>(pid));
      EmitSkippedSample(ds_id, std::move(sample.value()),
                        SampleSkipReason::kReadStage);
      continue;
    }

    // Previously excluded, e.g. due to failing the target filter check.
    if (process_state == ProcessTrackingStatus::kRejected) {
      PERFETTO_DLOG("Skipping sample for pid [%d]: kRejected",
                    static_cast<int>(pid));
      continue;
    }

    // Seeing pid for the first time. We need to consider whether the process
    // is a kernel thread, and which callstacks we're recording.
    //
    // {user} stacks -> user processes: signal for proc-fd lookup
    //               -> kthreads: reject
    //
    // {kernel} stacks -> user processes: accept without proc-fds
    //                 -> kthreads: accept without proc-fds
    //
    // {kernel+user} stacks -> user processes: signal for proc-fd lookup
    //                      -> kthreads: accept without proc-fds
    //
    if (process_state == ProcessTrackingStatus::kInitial) {
      PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));

      // Kernel threads (which have no userspace state) are never relevant if
      // we're not recording kernel callchains.
      bool is_kthread = !sample->regs;  // no userspace regs
      if (is_kthread && !event_config.kernel_frames()) {
        process_state = ProcessTrackingStatus::kRejected;
        continue;
      }

      // Check whether samples for this new process should be dropped due to
      // the target filtering. Kernel threads don't have a cmdline, but we
      // still check against pid inclusion/exclusion.
      if (ShouldRejectDueToFilter(
              pid, event_config.filter(), is_kthread, &ds->additional_cmdlines,
              [pid](std::string* cmdline) {
                return glob_aware::ReadProcCmdlineForPID(pid, cmdline);
              })) {
        process_state = ProcessTrackingStatus::kRejected;
        continue;
      }

      // At this point, sampled process is known to be of interest.
      if (!is_kthread && event_config.user_frames()) {
        // Start resolving the proc-fds. Response is async.
        process_state = ProcessTrackingStatus::kFdsResolving;
        InitiateDescriptorLookup(ds_id, pid,
                                 event_config.remote_descriptor_timeout_ms());
        // note: fallthrough
      } else {
        // Either a kernel thread (no need to obtain proc-fds), or a userspace
        // process but we're not recording userspace callstacks.
        process_state = ProcessTrackingStatus::kAccepted;
        unwinding_worker_->PostRecordNoUserspaceProcess(ds_id, pid);
        // note: fallthrough
      }
    }

    PERFETTO_CHECK(process_state == ProcessTrackingStatus::kAccepted ||
                   process_state == ProcessTrackingStatus::kFdsResolving);

    // If we're only interested in the kernel callchains, then userspace
    // process samples are relevant only if they were sampled during kernel
    // context.
    if (!event_config.user_frames() &&
        sample->common.cpu_mode == PERF_RECORD_MISC_USER) {
      PERFETTO_DLOG("Skipping usermode sample for kernel-only config");
      continue;
    }

    // Optionally: drop sample if above a given threshold of sampled stacks
    // that are waiting in the unwinding queue.
    uint64_t max_footprint_bytes = event_config.max_enqueued_footprint_bytes();
    uint64_t sample_stack_size = sample->stack.size();
    if (max_footprint_bytes) {
      uint64_t footprint_bytes = unwinding_worker_->GetEnqueuedFootprint();
      if (footprint_bytes + sample_stack_size >= max_footprint_bytes) {
        PERFETTO_DLOG("Skipping sample enqueueing due to footprint limit.");
        EmitSkippedSample(ds_id, std::move(sample.value()),
                          SampleSkipReason::kUnwindEnqueue);
        continue;
      }
    }

    // Push the sample into the unwinding queue if there is room.
    auto& queue = unwinding_worker_->unwind_queue();
    WriteView write_view = queue.BeginWrite();
    if (write_view.valid) {
      queue.at(write_view.write_pos) =
          UnwindEntry{ds_id, std::move(sample.value())};
      queue.CommitWrite();
      unwinding_worker_->IncrementEnqueuedFootprint(sample_stack_size);
    } else {
      PERFETTO_DLOG("Unwinder queue full, skipping sample");
      EmitSkippedSample(ds_id, std::move(sample.value()),
                        SampleSkipReason::kUnwindEnqueue);
    }
  }  // for (i < max_samples)

  // Most likely more events in the kernel buffer. Though we might be exactly on
  // the boundary due to |max_samples|.
  return true;
}

// Note: first-fit makes descriptor request fulfillment not true FIFO. But the
// edge-cases where it matters are very unlikely.
void PerfProducer::OnProcDescriptors(pid_t pid,
                                     uid_t uid,
                                     base::ScopedFile maps_fd,
                                     base::ScopedFile mem_fd) {
  // Find first-fit data source that requested descriptors for the process.
  for (auto& it : data_sources_) {
    DataSourceState& ds = it.second;
    auto proc_status_it = ds.process_states.find(pid);
    if (proc_status_it == ds.process_states.end())
      continue;

    // TODO(rsavitski): consider checking ProcessTrackingStatus before
    // CanProfile.
    if (!CanProfile(ds.event_config.raw_ds_config(), uid,
                    ds.event_config.target_installed_by())) {
      PERFETTO_DLOG("Not profileable: pid [%d], uid [%d] for DS [%zu]",
                    static_cast<int>(pid), static_cast<int>(uid),
                    static_cast<size_t>(it.first));
      continue;
    }

    // Match against either resolving, or expired state. In the latter
    // case, it means that the async response was slow enough that we've marked
    // the lookup as expired (but can now recover for future samples).
    auto proc_status = proc_status_it->second;
    if (proc_status == ProcessTrackingStatus::kFdsResolving ||
        proc_status == ProcessTrackingStatus::kFdsTimedOut) {
      PERFETTO_DLOG("Handing off proc-fds for pid [%d] to DS [%zu]",
                    static_cast<int>(pid), static_cast<size_t>(it.first));

      proc_status_it->second = ProcessTrackingStatus::kAccepted;
      unwinding_worker_->PostAdoptProcDescriptors(
          it.first, pid, std::move(maps_fd), std::move(mem_fd));
      return;  // done
    }
  }
  PERFETTO_DLOG(
      "Discarding proc-fds for pid [%d] as found no outstanding requests.",
      static_cast<int>(pid));
}

void PerfProducer::InitiateDescriptorLookup(DataSourceInstanceID ds_id,
                                            pid_t pid,
                                            uint32_t timeout_ms) {
  if (!proc_fd_getter_->RequiresDelayedRequest()) {
    StartDescriptorLookup(ds_id, pid, timeout_ms);
    return;
  }

  // Delay lookups on Android. See comment on |kProcDescriptorsAndroidDelayMs|.
  auto weak_this = weak_factory_.GetWeakPtr();
  task_runner_->PostDelayedTask(
      [weak_this, ds_id, pid, timeout_ms] {
        if (weak_this)
          weak_this->StartDescriptorLookup(ds_id, pid, timeout_ms);
      },
      kProcDescriptorsAndroidDelayMs);
}

void PerfProducer::StartDescriptorLookup(DataSourceInstanceID ds_id,
                                         pid_t pid,
                                         uint32_t timeout_ms) {
  proc_fd_getter_->GetDescriptorsForPid(pid);

  auto weak_this = weak_factory_.GetWeakPtr();
  task_runner_->PostDelayedTask(
      [weak_this, ds_id, pid] {
        if (weak_this)
          weak_this->EvaluateDescriptorLookupTimeout(ds_id, pid);
      },
      timeout_ms);
}

void PerfProducer::EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,
                                                   pid_t pid) {
  auto ds_it = data_sources_.find(ds_id);
  if (ds_it == data_sources_.end())
    return;

  DataSourceState& ds = ds_it->second;
  auto proc_status_it = ds.process_states.find(pid);
  if (proc_status_it == ds.process_states.end())
    return;

  // If the request is still outstanding, mark the process as expired (causing
  // outstanding and future samples to be discarded).
  auto proc_status = proc_status_it->second;
  if (proc_status == ProcessTrackingStatus::kFdsResolving) {
    PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]",
                  static_cast<int>(pid), static_cast<size_t>(ds_it->first));

    proc_status_it->second = ProcessTrackingStatus::kFdsTimedOut;
    // Also inform the unwinder of the state change (so that it can discard any
    // of the already-enqueued samples).
    unwinding_worker_->PostRecordTimedOutProcDescriptors(ds_id, pid);
  }
}

void PerfProducer::PostEmitSample(DataSourceInstanceID ds_id,
                                  CompletedSample sample) {
  // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
  CompletedSample* raw_sample = new CompletedSample(std::move(sample));
  auto weak_this = weak_factory_.GetWeakPtr();
  task_runner_->PostTask([weak_this, ds_id, raw_sample] {
    if (weak_this)
      weak_this->EmitSample(ds_id, std::move(*raw_sample));
    delete raw_sample;
  });
}

void PerfProducer::EmitSample(DataSourceInstanceID ds_id,
                              CompletedSample sample) {
  auto ds_it = data_sources_.find(ds_id);
  if (ds_it == data_sources_.end()) {
    PERFETTO_DLOG("EmitSample(ds: %zu): source gone",
                  static_cast<size_t>(ds_id));
    return;
  }
  DataSourceState& ds = ds_it->second;

  // intern callsite
  GlobalCallstackTrie::Node* callstack_root =
      callstack_trie_.CreateCallsite(sample.frames, sample.build_ids);
  uint64_t callstack_iid = callstack_root->id();

  // start packet, timestamp domain defaults to monotonic_raw
  auto packet = StartTracePacket(ds.trace_writer.get());
  packet->set_timestamp(sample.common.timestamp);

  // write new interning data (if any)
  protos::pbzero::InternedData* interned_out = packet->set_interned_data();
  ds.interning_output.WriteCallstack(callstack_root, &callstack_trie_,
                                     interned_out);

  // write the sample itself
  auto* perf_sample = packet->set_perf_sample();
  perf_sample->set_cpu(sample.common.cpu);
  perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
  perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
  perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
  perf_sample->set_timebase_count(sample.common.timebase_count);

  for (size_t i = 0; i < sample.common.follower_counts.size(); ++i) {
    perf_sample->add_follower_counts(sample.common.follower_counts[i]);
  }

  perf_sample->set_callstack_iid(callstack_iid);
  if (sample.unwind_error != unwindstack::ERROR_NONE) {
    perf_sample->set_unwind_error(ToProtoEnum(sample.unwind_error));
  }
}

void PerfProducer::EmitRingBufferLoss(DataSourceInstanceID ds_id,
                                      size_t cpu,
                                      uint64_t records_lost) {
  auto ds_it = data_sources_.find(ds_id);
  if (ds_it == data_sources_.end())
    return;
  DataSourceState& ds = ds_it->second;
  PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records",
                static_cast<size_t>(ds_id), cpu, records_lost);

  // The data loss record relates to a single ring buffer, and indicates loss
  // since the last successfully-written record in that buffer. Therefore the
  // data loss record itself has no timestamp.
  // We timestamp the packet with the boot clock for packet ordering purposes,
  // but it no longer has a (precise) interpretation relative to the sample
  // stream from that per-cpu buffer. See the proto comments for more details.
  auto packet = StartTracePacket(ds.trace_writer.get());
  packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
  packet->set_timestamp_clock_id(
      protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);

  auto* perf_sample = packet->set_perf_sample();
  perf_sample->set_cpu(static_cast<uint32_t>(cpu));
  perf_sample->set_kernel_records_lost(records_lost);
}

void PerfProducer::PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,
                                                 ParsedSample sample) {
  PostEmitSkippedSample(ds_id, std::move(sample),
                        SampleSkipReason::kUnwindStage);
}

void PerfProducer::PostEmitSkippedSample(DataSourceInstanceID ds_id,
                                         ParsedSample sample,
                                         SampleSkipReason reason) {
  // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
  ParsedSample* raw_sample = new ParsedSample(std::move(sample));
  auto weak_this = weak_factory_.GetWeakPtr();
  task_runner_->PostTask([weak_this, ds_id, raw_sample, reason] {
    if (weak_this)
      weak_this->EmitSkippedSample(ds_id, std::move(*raw_sample), reason);
    delete raw_sample;
  });
}

void PerfProducer::EmitSkippedSample(DataSourceInstanceID ds_id,
                                     ParsedSample sample,
                                     SampleSkipReason reason) {
  auto ds_it = data_sources_.find(ds_id);
  if (ds_it == data_sources_.end())
    return;
  DataSourceState& ds = ds_it->second;

  // Note: timestamp defaults to the monotonic_raw domain.
  auto packet = StartTracePacket(ds.trace_writer.get());
  packet->set_timestamp(sample.common.timestamp);
  auto* perf_sample = packet->set_perf_sample();
  perf_sample->set_cpu(sample.common.cpu);
  perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
  perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
  perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
  perf_sample->set_timebase_count(sample.common.timebase_count);

  for (size_t i = 0; i < sample.common.follower_counts.size(); ++i) {
    perf_sample->add_follower_counts(sample.common.follower_counts[i]);
  }

  using PerfSample = protos::pbzero::PerfSample;
  switch (reason) {
    case SampleSkipReason::kReadStage:
      perf_sample->set_sample_skipped_reason(
          PerfSample::PROFILER_SKIP_READ_STAGE);
      break;
    case SampleSkipReason::kUnwindEnqueue:
      perf_sample->set_sample_skipped_reason(
          PerfSample::PROFILER_SKIP_UNWIND_ENQUEUE);
      break;
    case SampleSkipReason::kUnwindStage:
      perf_sample->set_sample_skipped_reason(
          PerfSample::PROFILER_SKIP_UNWIND_STAGE);
      break;
  }
}

void PerfProducer::InitiateReaderStop(DataSourceState* ds) {
  PERFETTO_DLOG("InitiateReaderStop");
  PERFETTO_CHECK(ds->status != DataSourceState::Status::kShuttingDown);

  ds->status = DataSourceState::Status::kShuttingDown;
  for (auto& event_reader : ds->per_cpu_readers) {
    event_reader.DisableEvents();
  }
}

void PerfProducer::PostFinishDataSourceStop(DataSourceInstanceID ds_id) {
  auto weak_producer = weak_factory_.GetWeakPtr();
  task_runner_->PostTask([weak_producer, ds_id] {
    if (weak_producer)
      weak_producer->FinishDataSourceStop(ds_id);
  });
}

void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) {
  PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id));
  auto ds_it = data_sources_.find(ds_id);
  if (ds_it == data_sources_.end()) {
    PERFETTO_DLOG("FinishDataSourceStop(%zu): source gone",
                  static_cast<size_t>(ds_id));
    return;
  }
  DataSourceState& ds = ds_it->second;
  PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);

  ds.trace_writer->Flush();
  data_sources_.erase(ds_it);

  endpoint_->NotifyDataSourceStopped(ds_id);

  // Clean up resources if there are no more active sources.
  if (data_sources_.empty()) {
    callstack_trie_.ClearTrie();  // purge internings
    base::MaybeReleaseAllocatorMemToOS();
  }
}

// TODO(rsavitski): maybe make the tracing service respect premature
// producer-driven stops, and then issue a NotifyDataSourceStopped here.
// Alternatively (and at the expense of higher complexity) introduce a new data
// source status of "tombstoned", and propagate it until the source is stopped
// by the service (this would technically allow for stricter lifetime checking
// of data sources, and help with discarding periodic flushes).
// TODO(rsavitski): Purging while stopping will currently leave the stop
// unacknowledged. Consider checking whether the DS is stopping here, and if so,
// notifying immediately after erasing.
void PerfProducer::PurgeDataSource(DataSourceInstanceID ds_id) {
  auto ds_it = data_sources_.find(ds_id);
  if (ds_it == data_sources_.end())
    return;
  DataSourceState& ds = ds_it->second;

  PERFETTO_LOG("Stopping DataSource(%zu) prematurely",
               static_cast<size_t>(ds_id));

  unwinding_worker_->PostPurgeDataSource(ds_id);

  // Write a packet indicating the abrupt stop.
  {
    auto packet = StartTracePacket(ds.trace_writer.get());
    packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
    packet->set_timestamp_clock_id(
        protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
    auto* perf_sample = packet->set_perf_sample();
    auto* producer_event = perf_sample->set_producer_event();
    producer_event->set_source_stop_reason(
        protos::pbzero::PerfSample::ProducerEvent::PROFILER_STOP_GUARDRAIL);
  }

  ds.trace_writer->Flush();
  data_sources_.erase(ds_it);

  // Clean up resources if there are no more active sources.
  if (data_sources_.empty()) {
    callstack_trie_.ClearTrie();  // purge internings
    base::MaybeReleaseAllocatorMemToOS();
  }
}

// Either:
// * choose a random number up to |shard_count|.
// * reuse a choice made previously by a data source within this tracing
//   session. The config option requires that all data sources within one config
//   have the same shard count.
std::optional<ProcessSharding> PerfProducer::GetOrChooseCallstackProcessShard(
    uint64_t tracing_session_id,
    uint32_t shard_count) {
  for (auto& it : data_sources_) {
    const DataSourceState& ds = it.second;
    const auto& sharding = ds.event_config.filter().process_sharding;
    if ((ds.tracing_session_id != tracing_session_id) || !sharding.has_value())
      continue;

    // Found existing data source, reuse its decision while doing best-effort
    // error reporting (logging) if the shard count is not the same.
    if (sharding->shard_count != shard_count) {
      PERFETTO_ELOG(
          "Mismatch of process_shard_count between data sources in tracing "
          "session %" PRIu64 ". Overriding shard count to match.",
          tracing_session_id);
    }
    return sharding;
  }

  // First data source in this session, choose random shard.
  std::random_device r;
  std::minstd_rand minstd(r());
  std::uniform_int_distribution<uint32_t> dist(0, shard_count - 1);
  uint32_t chosen_shard = dist(minstd);

  ProcessSharding ret;
  ret.shard_count = shard_count;
  ret.chosen_shard = chosen_shard;

  PERFETTO_DCHECK(ret.shard_count && ret.chosen_shard < ret.shard_count);
  return ret;
}

void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
                                        BufferID target_buffer) {
  auto writer = endpoint_->CreateTraceWriter(target_buffer);

  auto it_and_inserted = metatrace_writers_.emplace(
      std::piecewise_construct, std::make_tuple(ds_id), std::make_tuple());
  PERFETTO_DCHECK(it_and_inserted.second);
  // Note: only the first concurrent writer will actually be active.
  metatrace_writers_[ds_id].Enable(task_runner_, std::move(writer),
                                   metatrace::TAG_ANY);
}

void PerfProducer::ConnectWithRetries(const char* socket_name) {
  PERFETTO_DCHECK(state_ == kNotStarted);
  state_ = kNotConnected;

  ResetConnectionBackoff();
  producer_socket_name_ = socket_name;
  ConnectService();
}

void PerfProducer::ConnectService() {
  PERFETTO_DCHECK(state_ == kNotConnected);
  state_ = kConnecting;
  endpoint_ = ProducerIPCClient::Connect(
      producer_socket_name_, this, kProducerName, task_runner_,
      TracingService::ProducerSMBScrapingMode::kEnabled);
}

void PerfProducer::IncreaseConnectionBackoff() {
  connection_backoff_ms_ *= 2;
  if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
    connection_backoff_ms_ = kMaxConnectionBackoffMs;
}

void PerfProducer::ResetConnectionBackoff() {
  connection_backoff_ms_ = kInitialConnectionBackoffMs;
}

void PerfProducer::OnConnect() {
  PERFETTO_DCHECK(state_ == kConnecting);
  state_ = kConnected;
  ResetConnectionBackoff();
  PERFETTO_LOG("Connected to the service");

  {
    // linux.perf
    DataSourceDescriptor desc;
    desc.set_name(kDataSourceName);
    desc.set_handles_incremental_state_clear(true);
    desc.set_will_notify_on_stop(true);
    endpoint_->RegisterDataSource(desc);
  }
  {
    // metatrace
    DataSourceDescriptor desc;
    desc.set_name(MetatraceWriter::kDataSourceName);
    endpoint_->RegisterDataSource(desc);
  }
  // Used by tracebox to synchronize with traced_probes being registered.
  if (all_data_sources_registered_cb_) {
    endpoint_->Sync(all_data_sources_registered_cb_);
  }
}

void PerfProducer::OnDisconnect() {
  PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
  PERFETTO_LOG("Disconnected from tracing service");

  auto weak_producer = weak_factory_.GetWeakPtr();
  if (state_ == kConnected)
    return task_runner_->PostTask([weak_producer] {
      if (weak_producer)
        weak_producer->Restart();
    });

  state_ = kNotConnected;
  IncreaseConnectionBackoff();
  task_runner_->PostDelayedTask(
      [weak_producer] {
        if (weak_producer)
          weak_producer->ConnectService();
      },
      connection_backoff_ms_);
}

void PerfProducer::Restart() {
  // We lost the connection with the tracing service. At this point we need
  // to reset all the data sources. Trying to handle that manually is going to
  // be error prone. What we do here is simply destroy the instance and
  // recreate it again.
  base::TaskRunner* task_runner = task_runner_;
  const char* socket_name = producer_socket_name_;
  ProcDescriptorGetter* proc_fd_getter = proc_fd_getter_;

  // Invoke destructor and then the constructor again.
  this->~PerfProducer();
  new (this) PerfProducer(proc_fd_getter, task_runner);

  ConnectWithRetries(socket_name);
}

}  // namespace profiling
}  // namespace perfetto
