/*
 * Copyright 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.google.android.libraries.mobiledatadownload.lite;

import android.net.Uri;
import androidx.annotation.VisibleForTesting;
import com.google.android.libraries.mobiledatadownload.TimeSource;
import com.google.android.libraries.mobiledatadownload.file.spi.Monitor;
import com.google.android.libraries.mobiledatadownload.internal.logging.LogUtil;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/** A Download Progress Monitor to support {@link DownloadListener}. */
@ThreadSafe
public class DownloadProgressMonitor implements Monitor, SingleFileDownloadProgressMonitor {

  private static final String TAG = "DownloadProgressMonitor";

  private final TimeSource timeSource;
  private final Executor sequentialControlExecutor;

  private DownloadProgressMonitor(TimeSource timeSource, Executor controlExecutor) {
    this.timeSource = timeSource;

    // We want onProgress to be executed in order otherwise clients will observe out of order
    // updates (bigger current size update appears before smaller current size update).
    // We use Sequential Executor to ensure the onProgress will be processed sequentially.
    this.sequentialControlExecutor = MoreExecutors.newSequentialExecutor(controlExecutor);
  }

  /** Constructor overload with {@link TimeSource}. */
  // NOTE: this is necessary for use by other MDD components.
  public static DownloadProgressMonitor create(TimeSource timeSource, Executor controlExecutor) {
    return new DownloadProgressMonitor(timeSource, controlExecutor);
  }

  // We will only broadcast on progress notification at most once in this time frame.
  // Currently MobStore Monitor notify every 8KB of downloaded bytes. This may be too chatty on
  // fast network.
  // 1000 was chosen arbitrarily.
  @VisibleForTesting static final long BUFFERED_TIME_MS = 1000;

  @GuardedBy("DownloadProgressMonitor.class")
  private final HashMap<Uri, DownloadedBytesCounter> uriToDownloadedBytesCounter = new HashMap<>();

  @Override
  @Nullable
  public Monitor.InputMonitor monitorRead(Uri uri) {
    return null;
  }

  @Override
  @Nullable
  public Monitor.OutputMonitor monitorWrite(Uri uri) {
    synchronized (DownloadProgressMonitor.class) {
      if (uriToDownloadedBytesCounter.get(uri) == null) {
        // All monitors for a shared FileStorage will be invoked for all file accesses through this
        // shared FileStorage. So this monitor can receive non-MDD-Lite Uri.
        return null;
      }
      return uriToDownloadedBytesCounter.get(uri);
    }
  }

  @Override
  @Nullable
  public Monitor.OutputMonitor monitorAppend(Uri uri) {
    return monitorWrite(uri);
  }

  public void pausedForConnectivity() {
    synchronized (DownloadProgressMonitor.class) {
      for (DownloadedBytesCounter downloadedBytesCounter : uriToDownloadedBytesCounter.values()) {
        downloadedBytesCounter.pausedForConnectivity();
      }
    }
  }

  @Override
  public void addDownloadListener(Uri uri, DownloadListener downloadListener) {
    synchronized (DownloadProgressMonitor.class) {
      if (!uriToDownloadedBytesCounter.containsKey(uri)) {
        uriToDownloadedBytesCounter.put(uri, new DownloadedBytesCounter(uri, downloadListener));
      }
    }
  }

  @Override
  public void removeDownloadListener(Uri uri) {
    synchronized (DownloadProgressMonitor.class) {
      uriToDownloadedBytesCounter.remove(uri);
    }
  }

  // A counter for bytes downloaded.
  private final class DownloadedBytesCounter implements Monitor.OutputMonitor {
    private final Uri uri;
    private final DownloadListener downloadListener;

    private final AtomicLong byteCounter = new AtomicLong();

    // Last timestamp that we broadcast on progress.
    private long lastBroadcastOnProgressTimestampMs;

    DownloadedBytesCounter(Uri uri, DownloadListener downloadListener) {
      this.uri = uri;
      this.downloadListener = downloadListener;
      lastBroadcastOnProgressTimestampMs = timeSource.currentTimeMillis();
    }

    @Override
    public void bytesWritten(byte[] b, int off, int len) {
      notifyProgress(len);
    }

    private void notifyProgress(long len) {
      // Only broadcast progress update every BUFFERED_TIME_MS.
      // It will be fast (no locking) when there is no need to broadcast progress.
      // When there is a need to broadcast progress, we need to obtain the lock due to 2 reasons:
      // 1- Concurrent access to uriToDownloadedBytesCounter.
      // 2- Prevent out of order progress update.
      if (timeSource.currentTimeMillis() - lastBroadcastOnProgressTimestampMs < BUFFERED_TIME_MS) {
        byteCounter.getAndAdd(len);
        LogUtil.v(
            "%s: Received data for uri = %s, len = %d, Counter = %d",
            TAG, uri, len, byteCounter.get());
      } else {
        synchronized (DownloadProgressMonitor.class) {
          // Reset timestamp.
          lastBroadcastOnProgressTimestampMs = timeSource.currentTimeMillis();

          byteCounter.getAndAdd(len);
          LogUtil.v(
              "%s: Received data for uri = %s, len = %d, Counter = %d",
              TAG, uri, len, byteCounter.get());

          if (uriToDownloadedBytesCounter.containsKey(uri)) {
            sequentialControlExecutor.execute(() -> downloadListener.onProgress(byteCounter.get()));
          }
        }
      }
    }

    public void pausedForConnectivity() {
      downloadListener.onPausedForConnectivity();
    }
  }
}
