# -*- coding: utf-8 -*-
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Wrapper library around ts_mon.

This library provides some wrapper functionality around ts_mon, to make it more
friendly to developers. It also provides import safety, in case ts_mon is not
deployed with your code.
"""

from __future__ import division
from __future__ import print_function

import collections
import contextlib
import ssl
import time
from functools import wraps

import six
from six.moves import queue as Queue

from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging

try:
  from infra_libs import ts_mon
except (ImportError, RuntimeError):
  ts_mon = None


# This number is chosen because 1.16^100 seconds is about
# 32 days. This is a good compromise between bucket size
# and dynamic range.
_SECONDS_BUCKET_FACTOR = 1.16

# If none, we create metrics in this process. Otherwise, we send metrics via
# this Queue to a dedicated flushing processes.
# These attributes are set by chromite.lib.ts_mon_config.SetupTsMonGlobalState.
FLUSHING_PROCESS = None
MESSAGE_QUEUE = None

_MISSING = object()

MetricCall = collections.namedtuple('MetricCall', [
    'metric_name', 'metric_args', 'metric_kwargs',
    'method', 'method_args', 'method_kwargs',
    'reset_after'
])


def _FlushingProcessClosed():
  """Returns whether the metrics flushing process has been closed."""
  return (FLUSHING_PROCESS is not None and
          FLUSHING_PROCESS.exitcode is not None)


class ProxyMetric(object):
  """Redirects any method calls to the message queue."""
  def __init__(self, metric, metric_args, metric_kwargs):
    self.metric = metric
    self.metric_args = metric_args
    self.reset_after = metric_kwargs.pop('reset_after', False)
    self.metric_kwargs = metric_kwargs

  def __getattr__(self, method_name):
    """Redirects all method calls to the MESSAGE_QUEUE."""
    def enqueue(*args, **kwargs):
      if not _FlushingProcessClosed():
        try:
          MESSAGE_QUEUE.put_nowait(
              MetricCall(
                  metric_name=self.metric,
                  metric_args=self.metric_args,
                  metric_kwargs=self.metric_kwargs,
                  method=method_name,
                  method_args=args,
                  method_kwargs=kwargs,
                  reset_after=self.reset_after))
        except Queue.Full:
          logging.warning(
              "Metrics queue is full; skipped sending metric '%s'",
              self.metric)
      else:
        try:
          exit_code = FLUSHING_PROCESS.exitcode
        except AttributeError:
          exit_code = None
        logging.warning(
            'Flushing process has been closed (exit code %s),'
            " skipped sending metric '%s'",
            exit_code,
            self.metric)

    return enqueue


def _Indirect(fn):
  """Decorates a function to be indirect If MESSAGE_QUEUE is set.

  If MESSAGE_QUEUE is set, the indirect function will return a proxy metrics
  object; otherwise, it behaves normally.
  """
  @wraps(fn)
  def AddToQueueIfPresent(*args, **kwargs):
    if MESSAGE_QUEUE:
      return ProxyMetric(fn.__name__, args, kwargs)
    else:
      # Whether to reset the metric after the flush; this is only used by
      # |ProxyMetric|, so remove this from the kwargs.
      kwargs.pop('reset_after', None)
      return fn(*args, **kwargs)
  return AddToQueueIfPresent


class MockMetric(object):
  """Mock metric object, to be returned if ts_mon is not set up."""

  def _mock_method(self, *args, **kwargs):
    pass

  def __getattr__(self, _):
    return self._mock_method


def _ImportSafe(fn):
  """Decorator which causes |fn| to return MockMetric if ts_mon not imported."""
  @wraps(fn)
  def wrapper(*args, **kwargs):
    if ts_mon:
      return fn(*args, **kwargs)
    else:
      return MockMetric()

  return wrapper


class FieldSpecAdapter(object):
  """Infers the types of fields values to work around field_spec requirement.

  See: https://chromium-review.googlesource.com/c/432120/ for the change
  which added a required field_spec argument. This class is a temporary
  workaround to allow inferring the field_spec if is not provided.
  """
  FIELD_CLASSES = {} if ts_mon is None else {
      bool: ts_mon.BooleanField,
      int: ts_mon.IntegerField,
      str: ts_mon.StringField,
      six.text_type: ts_mon.StringField,
  }

  def __init__(self, metric_cls, *args, **kwargs):
    self._metric_cls = metric_cls
    self._args = args
    self._kwargs = kwargs
    self._instance = _MISSING

  def __getattr__(self, prop):
    """Return a wrapper which constructs the metric object on demand.

    Args:
      prop: The property name

    Returns:
      If self._instance has been created, the instance's .|prop| property,
      otherwise, a wrapper function which creates the ._instance and then
      calls the |prop| method on the instance.
    """
    if self._instance is not _MISSING:
      return getattr(self._instance, prop)

    def func(*args, **kwargs):
      if self._instance is not _MISSING:
        return getattr(self._instance, prop)(*args, **kwargs)
      fields = FieldSpecAdapter._InferFields(prop, args, kwargs)
      self._kwargs['field_spec'] = FieldSpecAdapter._InferFieldSpec(fields)
      self._instance = self._metric_cls(*self._args, **self._kwargs)
      return getattr(self._instance, prop)(*args, **kwargs)

    func.__name__ = prop
    return func

  @staticmethod
  def _InferFields(method_name, args, kwargs):
    """Infers the fields argument.

    Args:
      method_name: The method called.
      args: The args list
      kwargs: The keyword args
    """
    if 'fields' in kwargs:
      return kwargs['fields']

    if method_name == 'increment' and args:
      return args[0]

    if len(args) >= 2:
      return args[1]

  @staticmethod
  def _InferFieldSpec(fields):
    """Infers the fields types from the given fields.

    Args:
      fields: A dictionary with metric fields.
    """
    if not fields or not ts_mon:
      return None

    return [FieldSpecAdapter.FIELD_CLASSES[type(v)](field)
            for (field, v) in sorted(fields.items())]


def _OptionalFieldSpec(fn):
  """Decorates a function to allow an optional description and field_spec."""
  @wraps(fn)
  def wrapper(*args, **kwargs):
    kwargs = dict(**kwargs)  # It's bad practice to mutate **kwargs
    # Slightly different than .setdefault, this line sets a default even when
    # the key is present (as long as the value is not truthy). Empty or None is
    # not allowed for descriptions.
    kwargs['description'] = kwargs.get('description') or 'No description.'
    if 'field_spec' in kwargs and kwargs['field_spec'] is not _MISSING:
      return fn(*args, **kwargs)
    else:
      return FieldSpecAdapter(fn, *args, **kwargs)
  return wrapper


def _Metric(fn):
  """A pipeline of decorators to apply to our metric constructors."""
  return _OptionalFieldSpec(_ImportSafe(_Indirect(fn)))


# This is needed for the reset_after flag used by @Indirect.
# pylint: disable=unused-argument

@_Metric
def CounterMetric(name, reset_after=False, description=None,
                  field_spec=_MISSING, start_time=None):
  """Returns a metric handle for a counter named |name|."""
  return ts_mon.CounterMetric(name,
                              description=description, field_spec=field_spec,
                              start_time=start_time)
Counter = CounterMetric


@_Metric
def GaugeMetric(name, reset_after=False, description=None, field_spec=_MISSING):
  """Returns a metric handle for a gauge named |name|."""
  return ts_mon.GaugeMetric(name, description=description,
                            field_spec=field_spec)
Gauge = GaugeMetric


@_Metric
def CumulativeMetric(name, reset_after=False, description=None,
                     field_spec=_MISSING):
  """Returns a metric handle for a cumulative float named |name|."""
  return ts_mon.CumulativeMetric(name, description=description,
                                 field_spec=field_spec)


@_Metric
def StringMetric(name, reset_after=False, description=None,
                 field_spec=_MISSING):
  """Returns a metric handle for a string named |name|."""
  return ts_mon.StringMetric(name, description=description,
                             field_spec=field_spec)
String = StringMetric


@_Metric
def BooleanMetric(name, reset_after=False, description=None,
                  field_spec=_MISSING):
  """Returns a metric handle for a boolean named |name|."""
  return ts_mon.BooleanMetric(name, description=description,
                              field_spec=field_spec)
Boolean = BooleanMetric


@_Metric
def FloatMetric(name, reset_after=False, description=None, field_spec=_MISSING):
  """Returns a metric handle for a float named |name|."""
  return ts_mon.FloatMetric(name, description=description,
                            field_spec=field_spec)
Float = FloatMetric


@_Metric
def CumulativeDistributionMetric(name, reset_after=False, description=None,
                                 bucketer=None, field_spec=_MISSING):
  """Returns a metric handle for a cumulative distribution named |name|."""
  return ts_mon.CumulativeDistributionMetric(
      name, description=description, bucketer=bucketer, field_spec=field_spec)
CumulativeDistribution = CumulativeDistributionMetric


@_Metric
def DistributionMetric(name, reset_after=False, description=None,
                       bucketer=None, field_spec=_MISSING):
  """Returns a metric handle for a distribution named |name|."""
  return ts_mon.NonCumulativeDistributionMetric(
      name, description=description, bucketer=bucketer, field_spec=field_spec)
Distribution = DistributionMetric


@_Metric
def CumulativeSmallIntegerDistribution(name, reset_after=False,
                                       description=None, field_spec=_MISSING):
  """Returns a metric handle for a cumulative distribution named |name|.

  This differs slightly from CumulativeDistribution, in that the underlying
  metric uses a uniform bucketer rather than a geometric one.

  This metric type is suitable for holding a distribution of numbers that are
  nonnegative integers in the range of 0 to 100.
  """
  return ts_mon.CumulativeDistributionMetric(
      name,
      bucketer=ts_mon.FixedWidthBucketer(1),
      description=description,
      field_spec=field_spec)


@_Metric
def CumulativeSecondsDistribution(name, scale=1, reset_after=False,
                                  description=None, field_spec=_MISSING):
  """Returns a metric handle for a cumulative distribution named |name|.

  The distribution handle returned by this method is better suited than the
  default one for recording handling times, in seconds.

  This metric handle has bucketing that is optimized for time intervals
  (in seconds) in the range of 1 second to 32 days. Use |scale| to adjust this
  (e.g. scale=0.1 covers a range from .1 seconds to 3.2 days).

  Args:
    name: string name of metric
    scale: scaling factor of buckets, and size of the first bucket. default: 1
    reset_after: Should the metric be reset after reporting.
    description: A string description of the metric.
    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
  """
  b = ts_mon.GeometricBucketer(growth_factor=_SECONDS_BUCKET_FACTOR,
                               scale=scale)
  return ts_mon.CumulativeDistributionMetric(
      name, bucketer=b, units=ts_mon.MetricsDataUnits.SECONDS,
      description=description, field_spec=field_spec)

SecondsDistribution = CumulativeSecondsDistribution


@_Metric
def PercentageDistribution(
    name, num_buckets=1000, reset_after=False,
    description=None, field_spec=_MISSING):
  """Returns a metric handle for a cumulative distribution for percentage.

  The distribution handle returned by this method is better suited for reporting
  percentage values than the default one. The bucketing is optimized for values
  in [0,100].

  Args:
    name: The name of this metric.
    num_buckets: This metric buckets the percentage values before
        reporting. This argument controls the number of the bucket the range
        [0,100] is divided in. The default gives you 0.1% resolution.
    reset_after: Should the metric be reset after reporting.
    description: A string description of the metric.
    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
  """
  # The last bucket actually covers [100, 100 + 1.0/num_buckets), so it
  # corresponds to values that exactly match 100%.
  bucket_width = 100 / num_buckets
  b = ts_mon.FixedWidthBucketer(bucket_width, num_buckets)
  return ts_mon.CumulativeDistributionMetric(
      name, bucketer=b,
      description=description, field_spec=field_spec)


@contextlib.contextmanager
def SecondsTimer(name, fields=None, description=None, field_spec=_MISSING,
                 scale=1, record_on_exception=True, add_exception_field=False):
  """Record the time of an operation to a CumulativeSecondsDistributionMetric.

  Records the time taken inside of the context block, to the
  CumulativeSecondsDistribution named |name|, with the given fields.

  Examples:
    # Time the doSomething() call, with field values that are independent of the
    # results of the operation.
    with SecondsTimer('timer/name', fields={'foo': 'bar'},
                      description='My timer',
                      field_spec=[ts_mon.StringField('foo'),
                                  ts_mon.BooleanField('success')]):
      doSomething()

    # Time the doSomethingElse call, with field values that depend on the
    # results of that operation. Note that it is important that a default value
    # is specified for these fields, in case an exception is thrown by
    # doSomethingElse()
    f = {'success': False, 'foo': 'bar'}
    with SecondsTimer('timer/name', fields=f, description='My timer',
                      field_spec=[ts_mon.StringField('foo')]) as c:
      doSomethingElse()
      c['success'] = True

    # Incorrect Usage!
    with SecondsTimer('timer/name', description='My timer') as c:
      doSomething()
      c['foo'] = bar # 'foo' is not a valid field, because no default
                     # value for it was specified in the context constructor.
                     # It will be silently ignored.

  Args:
    name: The name of the metric to create
    fields: The fields of the metric to create.
    description: A string description of the metric.
    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
    scale: A float to scale the CumulativeSecondsDistribution buckets by.
    record_on_exception: Whether to record metrics if an exception is raised.
    add_exception_field: Whether to add a BooleanField('encountered_exception')
        to the FieldSpec provided, and set its value to True iff an exception
        was raised in the context.
  """
  if field_spec is not None and field_spec is not _MISSING:
    field_spec.append(ts_mon.BooleanField('encountered_exception'))

  m = CumulativeSecondsDistribution(
      name, scale=scale, description=description, field_spec=field_spec)
  f = fields or {}
  f = dict(f)
  keys = list(f)
  t0 = _GetSystemClock()

  error = True
  try:
    yield f
    error = False
  finally:
    if record_on_exception and add_exception_field:
      keys.append('encountered_exception')
      f.setdefault('encountered_exception', error)
    # Filter out keys that were not part of the initial key set. This is to
    # avoid inconsistent fields.
    # TODO(akeshet): Doing this filtering isn't super efficient. Would be better
    # to implement some key-restricted subclass or wrapper around dict, and just
    # yield that above rather than yielding a regular dict.
    if record_on_exception or not error:
      dt = _GetSystemClock() - t0
      # TODO(ayatane): Handle backward clock jumps.  See _GetSystemClock.
      if dt >= 0:
        m.add(dt, fields={k: f[k] for k in keys})


def SecondsTimerDecorator(name, fields=None, description=None,
                          field_spec=_MISSING, scale=1,
                          record_on_exception=True, add_exception_field=False):
  """Decorator to time the duration of function calls.

  Examples:
    @SecondsTimerDecorator('timer/name', fields={'foo': 'bar'},
                           description='My timer',
                           field_spec=[ts_mon.StringField('foo')])
    def Foo(bar):
      return doStuff()

    is equivalent to

    def Foo(bar):
      with SecondsTimer('timer/name', fields={'foo': 'bar'},
                        description='My timer',
                        field_spec=[ts_mon.StringField('foo')])
        return doStuff()

  Args:
    name: The name of the metric to create
    fields: The fields of the metric to create
    description: A string description of the metric.
    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
    scale: A float to scale the distrubtion by
    record_on_exception: Whether to record metrics if an exception is raised.
    add_exception_field: Whether to add a BooleanField('encountered_exception')
        to the FieldSpec provided, and set its value to True iff an exception
        was raised in the context.
  """
  def decorator(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
      with SecondsTimer(name, fields=fields, description=description,
                        field_spec=field_spec, scale=scale,
                        record_on_exception=record_on_exception,
                        add_exception_field=add_exception_field):
        return fn(*args, **kwargs)

    return wrapper

  return decorator


@contextlib.contextmanager
def SecondsInstanceTimer(name, fields=None, description=None,
                         field_spec=_MISSING, record_on_exception=True,
                         add_exception_field=False):
  """Record the time of an operation to a FloatMetric.

  Records the time taken inside of the context block, to the
  Float metric named |name|, with the given fields.  This is
  a non-cumulative metric; this represents the absolute time
  taken for a specific block.  The duration is stored in a float
  to provide flexibility in the future for higher accuracy.

  Examples:
    # Time the doSomething() call, with field values that are independent of the
    # results of the operation.
    with SecondsInstanceTimer('timer/name', fields={'foo': 'bar'},
                              description='My timer',
                              field_spec=[ts_mon.StringField('foo'),
                                          ts_mon.BooleanField('success')]):
      doSomething()

    # Time the doSomethingElse call, with field values that depend on the
    # results of that operation. Note that it is important that a default value
    # is specified for these fields, in case an exception is thrown by
    # doSomethingElse()
    f = {'success': False, 'foo': 'bar'}
    with SecondsInstanceTimer('timer/name', fields=f, description='My timer',
                              field_spec=[ts_mon.StringField('foo')]) as c:
      doSomethingElse()
      c['success'] = True

    # Incorrect Usage!
    with SecondsInstanceTimer('timer/name', description='My timer') as c:
      doSomething()
      c['foo'] = bar # 'foo' is not a valid field, because no default
                     # value for it was specified in the context constructor.
                     # It will be silently ignored.

  Args:
    name: The name of the metric to create
    fields: The fields of the metric to create.
    description: A string description of the metric.
    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
    record_on_exception: Whether to record metrics if an exception is raised.
    add_exception_field: Whether to add a BooleanField('encountered_exception')
        to the FieldSpec provided, and set its value to True iff an exception
        was raised in the context.

  Yields:
    Float based metric measing the duration of execution.
  """
  if field_spec is not None and field_spec is not _MISSING:
    field_spec.append(ts_mon.BooleanField('encountered_exception'))

  m = FloatMetric(name, description=description, field_spec=field_spec)
  f = dict(fields or {})
  keys = list(f)
  t0 = _GetSystemClock()

  error = True
  try:
    yield f
    error = False
  finally:
    if record_on_exception and add_exception_field:
      keys.append('encountered_exception')
      f.setdefault('encountered_exception', error)
    # Filter out keys that were not part of the initial key set. This is to
    # avoid inconsistent fields.
    # TODO(akeshet): Doing this filtering isn't super efficient. Would be better
    # to implement some key-restricted subclass or wrapper around dict, and just
    # yield that above rather than yielding a regular dict.
    if record_on_exception or not error:
      dt = _GetSystemClock() - t0
      m.set(dt, fields={k: f[k] for k in keys})


def SecondsInstanceTimerDecorator(name, fields=None, description=None,
                                  field_spec=_MISSING,
                                  record_on_exception=True,
                                  add_exception_field=False):
  """Decorator to time the gauge duration of function calls.

  Examples:
    @SecondsInstanceTimerDecorator('timer/name', fields={'foo': 'bar'},
                                   description='My timer',
                                   field_spec=[ts_mon.StringField('foo'),
                                               ts_mon.BooleanField('success')]):

    def Foo(bar):
      return doStuff()

    is equivalent to

    def Foo(bar):
      with SecondsInstanceTimer('timer/name', fields={'foo': 'bar'},
                                description='My timer',
                                field_spec=[ts_mon.StringField('foo'),
                                            ts_mon.BooleanField('success')]):
        return doStuff()

  Args:
    name: The name of the metric to create
    fields: The fields of the metric to create
    description: A string description of the metric.
    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
    record_on_exception: Whether to record metrics if an exception is raised.
    add_exception_field: Whether to add a BooleanField('encountered_exception')
        to the FieldSpec provided, and set its value to True iff an exception
        was raised in the context.

  Returns:
    A SecondsInstanceTimer metric decorator.
  """
  def decorator(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
      with SecondsInstanceTimer(name, fields=fields, description=description,
                                field_spec=field_spec,
                                record_on_exception=record_on_exception,
                                add_exception_field=add_exception_field):
        return fn(*args, **kwargs)

    return wrapper

  return decorator


@contextlib.contextmanager
def SuccessCounter(name, fields=None, description=None, field_spec=_MISSING):
  """Create a counter that tracks if something succeeds.

  Args:
    name: The name of the metric to create
    fields: The fields of the metric
    description: A string description of the metric.
    field_spec: A sequence of ts_mon.Field objects to specify the field schema.
  """
  c = Counter(name)
  f = fields or {}
  f = f.copy()
  # We add in the additional field success.
  keys = list(f) + ['success']
  success = False
  try:
    yield f
    success = True
  finally:
    f.setdefault('success', success)
    f = {k: f[k] for k in keys}
    c.increment(fields=f)


@contextlib.contextmanager
def Presence(name, fields=None, description=None, field_spec=_MISSING):
  """A counter of 'active' things.

  This keeps track of how many name's are active at any given time. However,
  it's only suitable for long running tasks, since the initial true value may
  never be written out if the task doesn't run for at least a minute.
  """
  b = Boolean(name, description=None, field_spec=field_spec)
  b.set(True, fields=fields)
  try:
    yield
  finally:
    b.set(False, fields=fields)


class RuntimeBreakdownTimer(object):
  """Record the time of an operation and the breakdown into sub-steps.

  Examples:
    with RuntimeBreakdownTimer('timer/name', fields={'foo':'bar'},
                               description='My timer',
                               field_spec=[ts_mon.StringField('foo')]) as timer:
      with timer.Step('first_step'):
        doFirstStep()
      with timer.Step('second_step'):
        doSecondStep()
      # The time spent next will show up under .../timer/name/breakdown_no_step
      doSomeNonStepWork()

  This will emit the following metrics:
  - .../timer/name/total_duration - A CumulativeSecondsDistribution metric for
        the time spent inside the outer with block.
  - .../timer/name/breakdown/first_step and
    .../timer/name/breakdown/second_step - PercentageDistribution metrics for
        the fraction of time devoted to each substep.
  - .../timer/name/breakdown_unaccounted - PercentageDistribution metric for the
        fraction of time that is not accounted for in any of the substeps.
  - .../timer/name/bucketing_loss - PercentageDistribution metric buckets values
        before reporting them as distributions. This causes small errors in the
        reported values because they are rounded to the reported buckets lower
        bound. This is a CumulativeMetric measuring the total rounding error
        accrued in reporting all the percentages. The worst case bucketing loss
        for x steps is (x+1)/10. So, if you time across 9 steps, you should
        expect no more than 1% rounding error.
  [experimental]
  - .../timer/name/duration_breakdown - A Float metric, with one stream per Step
        indicating the ratio of time spent in that step. The different steps are
        differentiated via a field with key 'step_name'. Since some of the time
        can be spent outside any steps, these ratios will sum to <= 1.

  NB: This helper can only be used if the field values are known at the
  beginning of the outer context and do not change as a result of any of the
  operations timed.
  """

  PERCENT_BUCKET_COUNT = 1000

  _StepMetrics = collections.namedtuple('_StepMetrics', ['name', 'time_s'])

  def __init__(self, name, fields=None, description=None, field_spec=_MISSING):
    self._name = name
    self._fields = fields
    self._field_spec = field_spec
    self._description = description
    self._outer_t0 = None
    self._total_time_s = 0
    self._inside_step = False
    self._step_metrics = []

  def __enter__(self):
    self._outer_t0 = _GetSystemClock()
    return self

  def __exit__(self, _type, _value, _traceback):
    self._RecordTotalTime()

    outer_timer = CumulativeSecondsDistribution(
        '%s/total_duration' % (self._name,),
        field_spec=self._field_spec,
        description=self._description)
    outer_timer.add(self._total_time_s, fields=self._fields)

    for name, percent in self._GetStepBreakdowns().items():
      step_metric = PercentageDistribution(
          '%s/breakdown/%s' % (self._name, name),
          num_buckets=self.PERCENT_BUCKET_COUNT,
          field_spec=self._field_spec,
          description=self._description)
      step_metric.add(percent, fields=self._fields)

      fields = dict(self._fields) if self._fields is not None else dict()
      fields['step_name'] = name
      # TODO(pprabhu): Convert _GetStepBreakdowns() to return ratios instead of
      # percentage when the old PercentageDistribution reporting is deleted.
      Float('%s/duration_breakdown' % self._name).set(percent / 100,
                                                      fields=fields)

    unaccounted_metric = PercentageDistribution(
        '%s/breakdown_unaccounted' % self._name,
        num_buckets=self.PERCENT_BUCKET_COUNT,
        field_spec=self._field_spec,
        description=self._description)
    unaccounted_metric.add(self._GetUnaccountedBreakdown(), fields=self._fields)

    bucketing_loss_metric = CumulativeMetric(
        '%s/bucketing_loss' % self._name,
        field_spec=self._field_spec,
        description=self._description)
    bucketing_loss_metric.increment_by(self._GetBucketingLoss(),
                                       fields=self._fields)

  @contextlib.contextmanager
  def Step(self, step_name):
    """Start a new step named step_name in the timed operation.

    Note that it is not possible to start a step inside a step. i.e.,

    with RuntimeBreakdownTimer('timer') as timer:
      with timer.Step('outer_step'):
        with timer.Step('inner_step'):
          # will by design raise an exception.

    Args:
      step_name: The name of the step being timed.
    """
    if self._inside_step:
      logging.error('RuntimeBreakdownTimer.Step is not reentrant. '
                    'Dropping step: %s', step_name)
      yield
      return

    self._inside_step = True
    t0 = _GetSystemClock()
    try:
      yield
    finally:
      self._inside_step = False
      step_time_s = _GetSystemClock() - t0
      # TODO(ayatane): Handle backward clock jumps.  See _GetSystemClock.
      step_time_s = max(0, step_time_s)
      self._step_metrics.append(self._StepMetrics(step_name, step_time_s))

  def _GetStepBreakdowns(self):
    """Returns percentage of time spent in each step.

    Must be called after |_RecordTotalTime|.
    """
    if not self._total_time_s:
      return {}
    return {x.name: (x.time_s * 100) / self._total_time_s
            for x in self._step_metrics}

  def _GetUnaccountedBreakdown(self):
    """Returns the percentage time spent outside of all steps.

    Must be called after |_RecordTotalTime|.
    """
    breakdown_percentages = sum(self._GetStepBreakdowns().values())
    return max(0, 100 - breakdown_percentages)

  def _GetBucketingLoss(self):
    """Compute the actual loss in reported percentages due to bucketing.

    Must be called after |_RecordTotalTime|.
    """
    reported = list(self._GetStepBreakdowns().values())
    reported.append(self._GetUnaccountedBreakdown())
    bucket_width = 100 / self.PERCENT_BUCKET_COUNT
    return sum(x % bucket_width for x in reported)

  def _RecordTotalTime(self):
    self._total_time_s = _GetSystemClock() - self._outer_t0
    # TODO(ayatane): Handle backward clock jumps.  See _GetSystemClock.
    self._total_time_s = max(0, self._total_time_s)


def _GetSystemClock():
  """Return a clock time.

  The only thing that the return value can be used for is to subtract from
  other instances to determine time elapsed.
  """
  # TODO(ayatane): We should use a monotonic clock to measure this,
  # but Python 2 does not have one.
  return time.time()


def Flush(reset_after=()):
  """Flushes metrics, but warns on transient errors.

  Args:
    reset_after: A list of metrics to reset after flushing.
  """
  if not ts_mon:
    return

  try:
    ts_mon.flush()
    while reset_after:
      reset_after.pop().reset()
  except ssl.SSLError as e:
    logging.warning('Caught transient network error while flushing: %s', e)
  except Exception as e:
    logging.error('Caught exception while flushing: %s', e)
