/*
 * Copyright (C) 2010 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

//#define LOG_NDEBUG 0
#define LOG_TAG "ARTPConnection"
#define INET_ECN_NOT_ECT    0x00    /* ECN was not enabled */
#define INET_ECN_ECT_1      0x01    /* ECN capable packet */
#define INET_ECN_ECT_0      0x02    /* ECN capable packet */
#define INET_ECN_CE         0x03    /* ECN congestion */
#define INET_ECN_MASK       0x03    /* Mask of ECN bits */

#include <utils/Log.h>

#include <media/stagefright/rtsp/ARTPAssembler.h>
#include <media/stagefright/rtsp/ARTPConnection.h>
#include <media/stagefright/rtsp/ARTPSource.h>
#include <media/stagefright/rtsp/ASessionDescription.h>

#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/foundation/AString.h>
#include <media/stagefright/foundation/hexdump.h>

#include <android/multinetwork.h>

#include <arpa/inet.h>
#include <sys/socket.h>

namespace android {

static const size_t kMaxUDPSize = 1500;

static uint16_t u16at(const uint8_t *data) {
    return data[0] << 8 | data[1];
}

static uint32_t u24at(const uint8_t *data) {
    return u16at(data) << 16 | data[2];
}

static uint32_t u32at(const uint8_t *data) {
    return u16at(data) << 16 | u16at(&data[2]);
}

static uint64_t u64at(const uint8_t *data) {
    return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
}

// static
const int64_t ARTPConnection::kSelectTimeoutUs = 1000LL;
const int64_t ARTPConnection::kMinOneSecondNotifyDelayUs = 100000ll;

struct ARTPConnection::StreamInfo {
    bool isIPv6;
    int mRTPSocket;
    int mRTCPSocket;
    sp<ASessionDescription> mSessionDesc;
    size_t mIndex;
    sp<AMessage> mNotifyMsg;
    KeyedVector<uint32_t, sp<ARTPSource> > mSources;

    int64_t mNumRTCPPacketsReceived;
    int64_t mNumRTPPacketsReceived;
    struct sockaddr_in mRemoteRTCPAddr;
    struct sockaddr_in6 mRemoteRTCPAddr6;

    bool mIsInjected;

    // A place to save time when it polls
    int64_t mLastPollTimeUs;
    // RTCP Extension for CVO
    int mCVOExtMap; // will be set to 0 if cvo is not negotiated in sdp
};

ARTPConnection::ARTPConnection(uint32_t flags)
    : mFlags(flags),
      mPollEventPending(false),
      mLastReceiverReportTimeUs(-1),
      mLastBitrateReportTimeUs(-1),
      mLastCongestionNotifyTimeUs(-1),
      mTargetBitrate(-1),
      mRtpSockOptEcn(0),
      mIsIPv6(false),
      mStaticJitterTimeMs(kStaticJitterTimeMs) {
}

ARTPConnection::~ARTPConnection() {
}

void ARTPConnection::addStream(
        int rtpSocket, int rtcpSocket,
        const sp<ASessionDescription> &sessionDesc,
        size_t index,
        const sp<AMessage> &notify,
        bool injected) {
    sp<AMessage> msg = new AMessage(kWhatAddStream, this);
    msg->setInt32("rtp-socket", rtpSocket);
    msg->setInt32("rtcp-socket", rtcpSocket);
    msg->setObject("session-desc", sessionDesc);
    msg->setSize("index", index);
    msg->setMessage("notify", notify);
    msg->setInt32("injected", injected);
    msg->post();
}

void ARTPConnection::seekStream() {
    sp<AMessage> msg = new AMessage(kWhatSeekStream, this);
    msg->post();
}

void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
    sp<AMessage> msg = new AMessage(kWhatRemoveStream, this);
    msg->setInt32("rtp-socket", rtpSocket);
    msg->setInt32("rtcp-socket", rtcpSocket);
    msg->post();
}

static void bumpSocketBufferSize(int s) {
    int size = 256 * 1024;
    CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
}

// static
void ARTPConnection::MakePortPair(
        int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
    *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
    CHECK_GE(*rtpSocket, 0);

    bumpSocketBufferSize(*rtpSocket);

    *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
    CHECK_GE(*rtcpSocket, 0);

    bumpSocketBufferSize(*rtcpSocket);

    /* rand() * 1000 may overflow int type, use long long */
    unsigned start = (unsigned)((rand()* 1000LL)/RAND_MAX) + 15550;
    start &= ~1;

    for (unsigned port = start; port < 65535; port += 2) {
        struct sockaddr_in addr;
        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = htonl(INADDR_ANY);
        addr.sin_port = htons(port);

        if (bind(*rtpSocket,
                 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
            continue;
        }

        addr.sin_port = htons(port + 1);

        if (bind(*rtcpSocket,
                 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
            *rtpPort = port;
            return;
        } else {
            // we should recreate a RTP socket to avoid bind other port in same RTP socket
            close(*rtpSocket);

            *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
            CHECK_GE(*rtpSocket, 0);
            bumpSocketBufferSize(*rtpSocket);
        }
    }

    TRESPASS();
}

// static
void ARTPConnection::MakeRTPSocketPair(
        int *rtpSocket, int *rtcpSocket, const char *localIp, const char *remoteIp,
        unsigned localPort, unsigned remotePort, int64_t socketNetwork, int32_t sockOptEcn) {
    bool isIPv6 = false;
    if (strchr(localIp, ':') != NULL)
        isIPv6 = true;

    *rtpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
    CHECK_GE(*rtpSocket, 0);

    bumpSocketBufferSize(*rtpSocket);

    *rtcpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
    CHECK_GE(*rtcpSocket, 0);

    if (socketNetwork != 0) {
        ALOGD("trying to bind rtp socket(%d) to network(%llu).",
                *rtpSocket, (unsigned long long)socketNetwork);

        int result = android_setsocknetwork((net_handle_t)socketNetwork, *rtpSocket);
        if (result != 0) {
            ALOGW("failed(%d) to bind rtp socket(%d) to network(%llu)",
                    result, *rtpSocket, (unsigned long long)socketNetwork);
        }
        result = android_setsocknetwork((net_handle_t)socketNetwork, *rtcpSocket);
        if (result != 0) {
            ALOGW("failed(%d) to bind rtcp socket(%d) to network(%llu)",
                    result, *rtcpSocket, (unsigned long long)socketNetwork);
        }
    }

    if (sockOptEcn != 0) {
        int sockOptForTOS = 1;
        if (setsockopt(*rtpSocket, isIPv6 ? IPPROTO_IPV6 : IPPROTO_IP,
               isIPv6 ? IPV6_RECVTCLASS : IP_RECVTOS,
               (int *)&sockOptForTOS, sizeof(sockOptForTOS)) < 0) {
            ALOGE("failed to set recv sockopt TOS on rtpsock(%d). err=%s", *rtpSocket,
                strerror(errno));
        } else {
            ALOGD("successfully set recv sockopt TOS on rtpsock(%d)", *rtpSocket);
            int result = setsockopt(*rtcpSocket, isIPv6 ? IPPROTO_IPV6 : IPPROTO_IP,
                isIPv6 ? IPV6_RECVTCLASS : IP_RECVTOS,
                (int *)&sockOptForTOS, sizeof(sockOptForTOS));
            if (result >= 0) {
                ALOGD("successfully set recv sockopt TOS on rtcpsock(%d).", *rtcpSocket);
            }
        }
    }

    bumpSocketBufferSize(*rtcpSocket);

    struct sockaddr *addr;
    struct sockaddr_in addr4;
    struct sockaddr_in6 addr6;

    if (isIPv6) {
        addr = (struct sockaddr *)&addr6;
        memset(&addr6, 0, sizeof(addr6));
        addr6.sin6_family = AF_INET6;
        inet_pton(AF_INET6, localIp, &addr6.sin6_addr);
        addr6.sin6_port = htons((uint16_t)localPort);
    } else {
        addr = (struct sockaddr *)&addr4;
        memset(&addr4, 0, sizeof(addr4));
        addr4.sin_family = AF_INET;
        addr4.sin_addr.s_addr = inet_addr(localIp);
        addr4.sin_port = htons((uint16_t)localPort);
    }

    int sockopt = 1;
    setsockopt(*rtpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
    setsockopt(*rtcpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));

    int sizeSockSt = isIPv6 ? sizeof(addr6) : sizeof(addr4);

    if (bind(*rtpSocket, addr, sizeSockSt) == 0) {
        ALOGI("rtp socket successfully binded. addr=%s:%d", localIp, localPort);
    } else {
        ALOGE("failed to bind rtp socket addr=%s:%d err=%s", localIp, localPort, strerror(errno));
        return;
    }

    if (isIPv6)
        addr6.sin6_port = htons(localPort + 1);
    else
        addr4.sin_port = htons(localPort + 1);

    if (bind(*rtcpSocket, addr, sizeSockSt) == 0) {
        ALOGI("rtcp socket successfully binded. addr=%s:%d", localIp, localPort + 1);
    } else {
        ALOGE("failed to bind rtcp socket addr=%s:%d err=%s", localIp,
                localPort + 1, strerror(errno));
    }

    // Re uses addr variable as remote addr.
    if (isIPv6) {
        memset(&addr6, 0, sizeof(addr6));
        addr6.sin6_family = AF_INET6;
        inet_pton(AF_INET6, remoteIp, &addr6.sin6_addr);
        addr6.sin6_port = htons((uint16_t)remotePort);
    } else {
        memset(&addr4, 0, sizeof(addr4));
        addr4.sin_family = AF_INET;
        addr4.sin_addr.s_addr = inet_addr(remoteIp);
        addr4.sin_port = htons((uint16_t)remotePort);
    }
    if (connect(*rtpSocket, addr, sizeSockSt) == 0) {
        ALOGI("rtp socket successfully connected to remote=%s:%d", remoteIp, remotePort);
    } else {
        ALOGE("failed to connect rtp socket to remote addr=%s:%d err=%s", remoteIp,
                remotePort, strerror(errno));
        return;
    }

    if (isIPv6)
        addr6.sin6_port = htons(remotePort + 1);
    else
        addr4.sin_port = htons(remotePort + 1);

    if (connect(*rtcpSocket, addr, sizeSockSt) == 0) {
        ALOGI("rtcp socket successfully connected to remote=%s:%d", remoteIp, remotePort + 1);
    } else {
        ALOGE("failed to connect rtcp socket addr=%s:%d err=%s", remoteIp,
                remotePort + 1, strerror(errno));
        return;
    }
}

void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
    switch (msg->what()) {
        case kWhatAddStream:
        {
            onAddStream(msg);
            break;
        }

        case kWhatSeekStream:
        {
            onSeekStream(msg);
            break;
        }

        case kWhatRemoveStream:
        {
            onRemoveStream(msg);
            break;
        }

        case kWhatPollStreams:
        {
            onPollStreams();
            break;
        }

        case kWhatAlarmStream:
        {
            onAlarmStream(msg);
            break;
        }

        case kWhatInjectPacket:
        {
            onInjectPacket(msg);
            break;
        }

        default:
        {
            TRESPASS();
            break;
        }
    }
}

void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
    mStreams.push_back(StreamInfo());
    StreamInfo *info = &*--mStreams.end();

    int32_t s;
    CHECK(msg->findInt32("rtp-socket", &s));
    info->mRTPSocket = s;
    CHECK(msg->findInt32("rtcp-socket", &s));
    info->mRTCPSocket = s;

    int32_t injected;
    CHECK(msg->findInt32("injected", &injected));

    info->mIsInjected = injected;

    sp<RefBase> obj;
    CHECK(msg->findObject("session-desc", &obj));
    info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());

    CHECK(msg->findSize("index", &info->mIndex));
    CHECK(msg->findMessage("notify", &info->mNotifyMsg));

    info->mNumRTCPPacketsReceived = 0;
    info->mNumRTPPacketsReceived = 0;
    memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
    memset(&info->mRemoteRTCPAddr6, 0, sizeof(info->mRemoteRTCPAddr6));

    sp<ASessionDescription> sessionDesc = info->mSessionDesc;
    info->mCVOExtMap = 0;
    for (size_t i = 1; i < sessionDesc->countTracks(); ++i) {
        int32_t cvoExtMap;
        if (sessionDesc->getCvoExtMap(i, &cvoExtMap)) {
            info->mCVOExtMap = cvoExtMap;
            ALOGI("urn:3gpp:video-orientation(cvo) found as extmap:%d", info->mCVOExtMap);
        } else {
            ALOGI("urn:3gpp:video-orientation(cvo) not found :%d", info->mCVOExtMap);
        }
    }

    if (!injected) {
        postPollEvent();
    }
}

void ARTPConnection::onSeekStream(const sp<AMessage> &msg) {
    (void)msg; // unused param as of now.
    List<StreamInfo>::iterator it = mStreams.begin();
    while (it != mStreams.end()) {
        for (size_t i = 0; i < it->mSources.size(); ++i) {
            sp<ARTPSource> source = it->mSources.valueAt(i);
            source->timeReset();
        }
        ++it;
    }
}

void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
    int32_t rtpSocket, rtcpSocket;
    CHECK(msg->findInt32("rtp-socket", &rtpSocket));
    CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));

    List<StreamInfo>::iterator it = mStreams.begin();
    while (it != mStreams.end()
           && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
        ++it;
    }

    if (it == mStreams.end()) {
        return;
    }

    mStreams.erase(it);
}

void ARTPConnection::postPollEvent() {
    if (mPollEventPending) {
        return;
    }

    sp<AMessage> msg = new AMessage(kWhatPollStreams, this);
    msg->post();

    mPollEventPending = true;
}

void ARTPConnection::onPollStreams() {
    mPollEventPending = false;

    if (mStreams.empty()) {
        return;
    }

    struct timeval tv;
    tv.tv_sec = 0;
    tv.tv_usec = kSelectTimeoutUs;

    fd_set rs;
    FD_ZERO(&rs);

    int maxSocket = -1;
    for (List<StreamInfo>::iterator it = mStreams.begin();
         it != mStreams.end(); ++it) {
        if ((*it).mIsInjected) {
            continue;
        }

        FD_SET(it->mRTPSocket, &rs);
        FD_SET(it->mRTCPSocket, &rs);

        if (it->mRTPSocket > maxSocket) {
            maxSocket = it->mRTPSocket;
        }
        if (it->mRTCPSocket > maxSocket) {
            maxSocket = it->mRTCPSocket;
        }
    }

    if (maxSocket == -1) {
        return;
    }

    int64_t nowUs = ALooper::GetNowUs();
    int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);

    if (res > 0) {
        List<StreamInfo>::iterator it = mStreams.begin();
        while (it != mStreams.end()) {
            if ((*it).mIsInjected) {
                ++it;
                continue;
            }
            it->mLastPollTimeUs = nowUs;

            status_t err = OK;
            if (FD_ISSET(it->mRTPSocket, &rs)) {
                err = receive(&*it, true);
            }
            if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
                err = receive(&*it, false);
            }

            if (err == -ECONNRESET) {
                // socket failure, this stream is dead, Jim.
                for (size_t i = 0; i < it->mSources.size(); ++i) {
                    sp<AMessage> notify = it->mNotifyMsg->dup();
                    notify->setInt32("rtcp-event", 1);
                    notify->setInt32("payload-type", 400);
                    notify->setInt32("feedback-type", 1);
                    notify->setInt32("sender", it->mSources.valueAt(i)->getSelfID());
                    notify->post();

                    ALOGW("failed to receive RTP/RTCP datagram.");
                }
                it = mStreams.erase(it);
                continue;
            }

            // add NACK and FIR that needs to be sent immediately.
            sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
            for (size_t i = 0; i < it->mSources.size(); ++i) {
                buffer->setRange(0, 0);
                int cnt = it->mSources.valueAt(i)->addNACK(buffer);
                if (cnt > 0) {
                    ALOGV("Send NACK for lost %d Packets", cnt);
                    send(&*it, buffer);
                }

                buffer->setRange(0, 0);
                it->mSources.valueAt(i)->addFIR(buffer);
                if (buffer->size() > 0) {
                    ALOGD("Send FIR immediately for lost Packets");
                    send(&*it, buffer);
                }

                buffer->setRange(0, 0);
                it->mSources.valueAt(i)->addTMMBR(buffer, mTargetBitrate);
                mTargetBitrate = -1;
                if (buffer->size() > 0) {
                    ALOGV("Sending TMMBR...");
                    ssize_t n = send(&*it, buffer);

                    if (n != (ssize_t)buffer->size()) {
                        ALOGW("failed to send RTCP TMMBR (%s).",
                                n >= 0 ? "connection gone" : strerror(errno));
                        continue;
                    }
                }
            }

            ++it;
        }
    }

    checkRxBitrate(nowUs);

    if (mLastReceiverReportTimeUs <= 0
            || mLastReceiverReportTimeUs + 5000000LL <= nowUs) {
        sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
        List<StreamInfo>::iterator it = mStreams.begin();
        while (it != mStreams.end()) {
            StreamInfo *s = &*it;

            if (s->mIsInjected) {
                ++it;
                continue;
            }

            if (s->mNumRTCPPacketsReceived == 0) {
                // We have never received any RTCP packets on this stream,
                // we don't even know where to send a report.
                ++it;
                continue;
            }

            buffer->setRange(0, 0);

            for (size_t i = 0; i < s->mSources.size(); ++i) {
                sp<ARTPSource> source = s->mSources.valueAt(i);

                source->addReceiverReport(buffer);

                if (mFlags & kRegularlyRequestFIR) {
                    source->addFIR(buffer);
                }
            }

            if (buffer->size() > 0) {
                ALOGV("Sending RR...");

                ssize_t n = send(s, buffer);

                if (n != (ssize_t)buffer->size()) {
                    ALOGW("failed to send RTCP receiver report (%s).",
                            n >= 0 ? "connection gone" : strerror(errno));
                    ++it;
                    continue;
                }

                mLastReceiverReportTimeUs = nowUs;
            }

            ++it;
        }
    }

    if (!mStreams.empty()) {
        postPollEvent();
    }
}

void ARTPConnection::onAlarmStream(const sp<AMessage> msg) {
    sp<ARTPSource> source = nullptr;
    if (msg->findObject("source", (sp<android::RefBase>*)&source)) {
        source->processRTPPacket();
    }
}

status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
    ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");

    CHECK(!s->mIsInjected);

    sp<ABuffer> buffer = new ABuffer(65536);

    struct msghdr sMsg = {};
    struct iovec sIov[1] = {};

    sIov[0].iov_base = (char *) buffer->data();
    sIov[0].iov_len = buffer->capacity();

    sMsg.msg_iov = sIov;
    sMsg.msg_iovlen = 1;

    int cMsgSize = sizeof(struct cmsghdr) + sizeof(uint8_t);
    char buf[CMSG_SPACE(cMsgSize)];
    sMsg.msg_control = buf;
    sMsg.msg_controllen = sizeof(buf);
    sMsg.msg_flags = 0;

    ssize_t nbytes;
    do {
        // Used recvmsg to get the TOS header of incoming packet
        nbytes = recvmsg(receiveRTP ? s->mRTPSocket : s->mRTCPSocket, &sMsg, 0);
        mCumulativeBytes += nbytes;
    } while (nbytes < 0 && errno == EINTR);

    if (nbytes <= 0) {
        ALOGW("failed to recv rtp packet. cause=%s", strerror(errno));
        // ECONNREFUSED may happen in next recvfrom() calling if one of
        // outgoing packet can not be delivered to remote by using sendto()
        if (errno == ECONNREFUSED) {
            return -ECONNREFUSED;
        } else {
            return -ECONNRESET;
        }
    }

    if (nbytes > 0) {
        handleIpHeadersIfReceived(s, sMsg);
    }

    buffer->setRange(0, nbytes);

    // ALOGI("received %d bytes.", buffer->size());

    status_t err;
    if (receiveRTP) {
        err = parseRTP(s, buffer);
    } else {
        err = parseRTCP(s, buffer);
    }

    return err;
}

/* This function will check if TOS is present or not in received IP packet.
 * After that if it is present then it will notify about congestion to upper
 * layer if CE bit is set in TOS header.
 **/
void ARTPConnection::handleIpHeadersIfReceived(StreamInfo *s, struct msghdr sMsg) {
    struct cmsghdr *cMsg;
    cMsg = CMSG_FIRSTHDR(&sMsg);

    if (cMsg == NULL) {
        ALOGV("cmsg is null");
    }

    for (; cMsg != NULL; cMsg = CMSG_NXTHDR(&sMsg, cMsg)) {
        bool isTOSHeader = ((cMsg->cmsg_level == (mIsIPv6 ? IPPROTO_IPV6 : IPPROTO_IP))
                              && (cMsg->cmsg_type == (mIsIPv6 ? IPV6_TCLASS : IP_TOS))
                              && (cMsg->cmsg_len));
        if (isTOSHeader) {
            uint8_t receivedTOS;
            receivedTOS = *((uint8_t *) CMSG_DATA(cMsg));
            // checking CE bit is set
            bool isCEBitMarked = ((receivedTOS & INET_ECN_MASK) == INET_ECN_CE);

            ALOGV("receivedTos(value -> %d)", receivedTOS);

            if (isCEBitMarked) {
                ALOGD("receivedTos(value -> %d), is ECN CE marked = %d",
                    receivedTOS, isCEBitMarked);
                notifyCongestionToUpperLayerIfNeeded(s);
            }
            break;
        }
    }
}

/* this function will be use to notify congestion in video call to upper layer */
void ARTPConnection::notifyCongestionToUpperLayerIfNeeded(StreamInfo *s) {
    int64_t nowUs = ALooper::GetNowUs();

    if (mLastCongestionNotifyTimeUs <= 0) {
        mLastCongestionNotifyTimeUs = nowUs;
    }

    bool isNeedToUpdate = (mLastCongestionNotifyTimeUs + kMinOneSecondNotifyDelayUs <= nowUs);
    ALOGD("ECN info set by upper layer=%d, isNeedToUpdate=%d", mRtpSockOptEcn, isNeedToUpdate);

    if ((mRtpSockOptEcn != 0) && (isNeedToUpdate)) {
        sp<AMessage> notify = s->mNotifyMsg->dup();
        notify->setInt32("rtcp-event", 1);
        notify->setInt32("payload-type", ARTPSource::RTP_QUALITY_CD);
        notify->post();
        mLastCongestionNotifyTimeUs = nowUs;
        ALOGD("Congestion detected in n/w, Notify upper layer");
    }
}

ssize_t ARTPConnection::send(const StreamInfo *info, const sp<ABuffer> buffer) {
        struct sockaddr* pRemoteRTCPAddr;
        int sizeSockSt;

        /* It seems this isIPv6 variable is useless.
         * We should remove it to prevent confusion */
        if (mIsIPv6) {
            pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr6;
            sizeSockSt = sizeof(struct sockaddr_in6);
        } else {
            pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr;
            sizeSockSt = sizeof(struct sockaddr_in);
        }

        if (mFlags & kViLTEConnection) {
            ALOGV("ViLTE RTCP");
            pRemoteRTCPAddr = NULL;
            sizeSockSt = 0;
        }

        ssize_t n;
        do {
            n = sendto(
                    info->mRTCPSocket, buffer->data(), buffer->size(), 0,
                    pRemoteRTCPAddr, sizeSockSt);
        } while (n < 0 && errno == EINTR);

        if (n < 0) {
            ALOGW("failed to send rtcp packet. cause=%s", strerror(errno));
        }

        return n;
}

status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
    size_t size = buffer->size();

    if (size < 12) {
        // Too short to be a valid RTP header.
        return -1;
    }

    const uint8_t *data = buffer->data();

    if ((data[0] >> 6) != 2) {
        // Unsupported version.
        return -1;
    }

    if ((data[1] & 0x7f) == 20 /* decimal */) {
        // Unassigned payload type
        return -1;
    }

    if (data[0] & 0x20) {
        // Padding present.

        size_t paddingLength = data[size - 1];

        if (paddingLength + 12 > size) {
            // If we removed this much padding we'd end up with something
            // that's too short to be a valid RTP header.
            return -1;
        }

        size -= paddingLength;
    }

    int numCSRCs = data[0] & 0x0f;

    size_t payloadOffset = 12 + 4 * numCSRCs;

    if (size < payloadOffset) {
        // Not enough data to fit the basic header and all the CSRC entries.
        return -1;
    }

    int32_t cvoDegrees = -1;
    if (data[0] & 0x10) {
        // Header eXtension present.

        if (size < payloadOffset + 4) {
            // Not enough data to fit the basic header, all CSRC entries
            // and the first 4 bytes of the extension header.

            return -1;
        }

        const uint8_t *extensionData = &data[payloadOffset];

        size_t extensionLength =
            (4 * (extensionData[2] << 8 | extensionData[3])) + 4;

        if (size < payloadOffset + extensionLength) {
            return -1;
        }

        parseRTPExt(s, (const uint8_t *)extensionData, extensionLength, &cvoDegrees);
        payloadOffset += extensionLength;
    }

    uint32_t srcId = u32at(&data[8]);

    sp<ARTPSource> source = findSource(s, srcId);

    uint32_t rtpTime = u32at(&data[4]);

    sp<AMessage> meta = buffer->meta();
    meta->setInt32("ssrc", srcId);
    meta->setInt32("rtp-time", rtpTime);
    meta->setInt32("PT", data[1] & 0x7f);
    meta->setInt32("M", data[1] >> 7);
    if (cvoDegrees >= 0) {
        meta->setInt32("cvo", cvoDegrees);
    }

    int32_t seq = u16at(&data[2]);
    buffer->setInt32Data(seq);
    buffer->setRange(payloadOffset, size - payloadOffset);

    if (s->mNumRTPPacketsReceived++ == 0) {
        sp<AMessage> notify = s->mNotifyMsg->dup();
        notify->setInt32("first-rtp", true);
        notify->setInt32("rtcp-event", 1);
        notify->setInt32("payload-type", ARTPSource::RTP_FIRST_PACKET);
        notify->setInt32("rtp-time", (int32_t)rtpTime);
        notify->setInt32("rtp-seq-num", seq);
        notify->setInt64("recv-time-us", ALooper::GetNowUs());
        notify->post();

        ALOGD("send first-rtp event to upper layer");
    }

    source->processRTPPacket(buffer);

    return OK;
}

status_t ARTPConnection::parseRTPExt(StreamInfo *s,
        const uint8_t *extHeader, size_t extLen, int32_t *cvoDegrees) {
    if (extLen < 4)
        return -1;

    uint16_t header = (extHeader[0] << 8) | (extHeader[1]);
    bool isOnebyteHeader = false;

    if (header == 0xBEDE) {
        isOnebyteHeader = true;
    } else if (header == 0x1000) {
        ALOGW("parseRTPExt: two-byte header is not implemented yet");
        return -1;
    } else {
        ALOGW("parseRTPExt: can not recognize header");
        return -1;
    }

    const uint8_t *extPayload = extHeader + 4;
    extLen -= 4;
    size_t offset = 0; //start from first payload of rtp extension.
    // one-byte header parser
    while (isOnebyteHeader && offset < extLen) {
        uint8_t extmapId = extPayload[offset] >> 4;
        uint8_t length = (extPayload[offset] & 0xF) + 1;
        offset++;

        // padding case
        if (extmapId == 0)
            continue;

        uint8_t data[16]; // maximum length value
        for (uint8_t j = 0; offset + j <= extLen && j < length; j++) {
            data[j] = extPayload[offset + j];
        }

        offset += length;

        if (extmapId == s->mCVOExtMap) {
            *cvoDegrees = (int32_t)data[0];
            return OK;
        }
    }

    return BAD_VALUE;
}

status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
    if (s->mNumRTCPPacketsReceived++ == 0) {
        sp<AMessage> notify = s->mNotifyMsg->dup();
        notify->setInt32("first-rtcp", true);
        notify->setInt32("rtcp-event", 1);
        notify->setInt32("payload-type", ARTPSource::RTCP_FIRST_PACKET);
        notify->setInt64("recv-time-us", ALooper::GetNowUs());
        notify->post();

        ALOGD("send first-rtcp event to upper layer");
    }

    const uint8_t *data = buffer->data();
    size_t size = buffer->size();

    while (size > 0) {
        if (size < 8) {
            // Too short to be a valid RTCP header
            return -1;
        }

        if ((data[0] >> 6) != 2) {
            // Unsupported version.
            return -1;
        }

        if (data[0] & 0x20) {
            // Padding present.

            size_t paddingLength = data[size - 1];

            if (paddingLength + 12 > size) {
                // If we removed this much padding we'd end up with something
                // that's too short to be a valid RTP header.
                return -1;
            }

            size -= paddingLength;
        }

        size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;

        if (size < headerLength) {
            // Only received a partial packet?
            return -1;
        }

        switch (data[1]) {
            case 200:
            {
                parseSenderReport(s, data, headerLength);
                break;
            }

            case 201:  // RR
            {
                parseReceiverReport(s, data, headerLength);
                break;
            }
            case 202:  // SDES
            case 204:  // APP
                break;

            case 205:  // TSFB (transport layer specific feedback)
                parseTSFB(s, data, headerLength);
                break;
            case 206:  // PSFB (payload specific feedback)
                // hexdump(data, headerLength);
                parsePSFB(s, data, headerLength);
                ALOGI("RTCP packet type %u of size %zu", (unsigned)data[1], headerLength);
                break;

            case 203:
            {
                parseBYE(s, data, headerLength);
                break;
            }

            default:
            {
                ALOGW("Unknown RTCP packet type %u of size %zu",
                     (unsigned)data[1], headerLength);
                break;
            }
        }

        data += headerLength;
        size -= headerLength;
    }

    return OK;
}

status_t ARTPConnection::parseBYE(
        StreamInfo *s, const uint8_t *data, size_t size) {
    size_t SC = data[0] & 0x3f;

    if (SC == 0 || size < (4 + SC * 4)) {
        // Packet too short for the minimal BYE header.
        return -1;
    }

    uint32_t id = u32at(&data[4]);

    sp<ARTPSource> source = findSource(s, id);

    // Report a final stastics to be used for rtp data usage.
    int64_t nowUs = ALooper::GetNowUs();
    int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
    int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
    source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);

    source->byeReceived();

    return OK;
}

status_t ARTPConnection::parseSenderReport(
        StreamInfo *s, const uint8_t *data, size_t size) {
    ALOG_ASSERT(size >= 1, "parseSenderReport: invalid packet size.");
    size_t receptionReportCount = data[0] & 0x1f;
    if (size < (7 + (receptionReportCount * 6)) * 4) {
        // Packet too short for the minimal sender report header.
        return -1;
    }

    int64_t recvTimeUs = ALooper::GetNowUs();
    uint32_t senderId = u32at(&data[4]);
    uint64_t ntpTime = u64at(&data[8]);
    uint32_t rtpTime = u32at(&data[16]);
    uint32_t pktCount = u32at(&data[20]);
    uint32_t octCount = u32at(&data[24]);

    ALOGD("SR received: ssrc=0x%08x, rtpTime%u == ntpTime %llu, pkt=%u, oct=%u",
            senderId, rtpTime, (unsigned long long)ntpTime, pktCount, octCount);

    sp<ARTPSource> source = findSource(s, senderId);
    source->timeUpdate(recvTimeUs, rtpTime, ntpTime);

    for (int32_t i = 0; i < receptionReportCount; i++) {
        int32_t offset = 28 + (i * 24);
        parseReceptionReportBlock(s, recvTimeUs, senderId, data + offset, size - offset);
    }

    return 0;
}

status_t ARTPConnection::parseReceiverReport(
        StreamInfo *s, const uint8_t *data, size_t size) {
    ALOG_ASSERT(size >= 1, "parseReceiverReport: invalid packet size.");
    size_t receptionReportCount = data[0] & 0x1f;
    if (size < (2 + (receptionReportCount * 6)) * 4) {
        // Packet too short for the minimal receiver report header.
        return -1;
    }

#if 0
    ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
         id,
         rtpTime,
         (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
#endif
    int64_t recvTimeUs = ALooper::GetNowUs();
    uint32_t senderId = u32at(&data[4]);

    for (int i = 0; i < receptionReportCount; i++) {
        int32_t offset = 8 + (i * 24);
        parseReceptionReportBlock(s, recvTimeUs, senderId, data + offset, size - offset);
    }

    return 0;
}

status_t ARTPConnection::parseReceptionReportBlock(
        StreamInfo *s, int64_t recvTimeUs, uint32_t senderId, const uint8_t *data, size_t size) {
    ALOG_ASSERT(size >= 24, "parseReceptionReportBlock: invalid packet size.");
    if (size < 24) {
        // remaining size is smaller than reception report block size.
        return -1;
    }

    uint32_t rbId = u32at(&data[0]);
    uint32_t fLost = data[4];
    int32_t cumLost = u24at(&data[5]);
    uint32_t ehSeq = u32at(&data[8]);
    uint32_t jitter = u32at(&data[12]);
    uint32_t lsr = u32at(&data[16]);
    uint32_t dlsr = u32at(&data[20]);

    ALOGD("Reception Report Block: t:%llu sid:%u rid:%u fl:%u cl:%u hs:%u jt:%u lsr:%u dlsr:%u",
            (unsigned long long)recvTimeUs, senderId, rbId, fLost, cumLost,
            ehSeq, jitter, lsr, dlsr);
    sp<ARTPSource> source = findSource(s, senderId);
    sp<ReceptionReportBlock> rrb = new ReceptionReportBlock(
            rbId, fLost, cumLost, ehSeq, jitter, lsr, dlsr);
    source->processReceptionReportBlock(recvTimeUs, senderId, rrb);

    return 0;
}

status_t ARTPConnection::parseTSFB(
        StreamInfo *s, const uint8_t *data, size_t size) {
    if (size < 12) {
        // broken packet
        return -1;
    }

    uint8_t msgType = data[0] & 0x1f;
    uint32_t id = u32at(&data[4]);

    const uint8_t *ptr = &data[12];
    size -= 12;

    using namespace std;
    size_t FCISize;
    switch(msgType) {
        case 1:     // Generic NACK
        {
            FCISize = 4;
            while (size >= FCISize) {
                uint16_t PID = u16at(&ptr[0]);  // lost packet RTP number
                uint16_t BLP = u16at(&ptr[2]);  // Bitmask of following Lost Packets

                size -= FCISize;
                ptr += FCISize;

                AString list_of_losts;
                list_of_losts.append(PID);
                for (int i=0 ; i<16 ; i++) {
                    bool is_lost = BLP & (0x1 << i);
                    if (is_lost) {
                        list_of_losts.append(", ");
                        list_of_losts.append(PID + i);
                    }
                }
                ALOGI("Opponent losts packet of RTP %s", list_of_losts.c_str());
            }
            break;
        }
        case 3:     // TMMBR
        case 4:     // TMMBN
        {
            FCISize = 8;
            while (size >= FCISize) {
                uint32_t MxTBR = u32at(&ptr[4]);
                uint32_t MxTBRExp = MxTBR >> 26;
                uint32_t MxTBRMantissa = (MxTBR >> 9) & 0x01FFFF;
                uint32_t overhead = MxTBR & 0x01FF;

                size -= FCISize;
                ptr += FCISize;

                uint32_t bitRate = (1 << MxTBRExp) * MxTBRMantissa;

                if (msgType == 3)
                    ALOGI("Op -> UE Req Tx bitrate : %d X 2^%d = %d",
                        MxTBRMantissa, MxTBRExp, bitRate);
                else if (msgType == 4)
                    ALOGI("OP -> UE Noti Rx bitrate : %d X 2^%d = %d",
                        MxTBRMantissa, MxTBRExp, bitRate);

                sp<AMessage> notify = s->mNotifyMsg->dup();
                notify->setInt32("rtcp-event", 1);
                notify->setInt32("payload-type", 205);
                notify->setInt32("feedback-type", msgType);
                notify->setInt32("sender", id);
                notify->setInt32("bit-rate", bitRate);
                notify->post();
                ALOGI("overhead : %d", overhead);
            }
            break;
        }
        default:
        {
            ALOGI("Not supported TSFB type %d", msgType);
            break;
        }
    }

    return 0;
}

status_t ARTPConnection::parsePSFB(
        StreamInfo *s, const uint8_t *data, size_t size) {
    if (size < 12) {
        // broken packet
        return -1;
    }

    uint8_t msgType = data[0] & 0x1f;
    uint32_t id = u32at(&data[4]);

    const uint8_t *ptr = &data[12];
    size -= 12;

    using namespace std;
    switch(msgType) {
        case 1:     // Picture Loss Indication (PLI)
        {
            if (size > 0) {
                // PLI does not need parameters
                break;
            };
            sp<AMessage> notify = s->mNotifyMsg->dup();
            notify->setInt32("rtcp-event", 1);
            notify->setInt32("payload-type", 206);
            notify->setInt32("feedback-type", msgType);
            notify->setInt32("sender", id);
            notify->post();
            ALOGI("PLI detected.");
            break;
        }
        case 4:     // Full Intra Request (FIR)
        {
            if (size < 4) {
                break;
            }
            uint32_t requestedId = u32at(&ptr[0]);
            if (requestedId == (uint32_t)mSelfID) {
                sp<AMessage> notify = s->mNotifyMsg->dup();
                notify->setInt32("rtcp-event", 1);
                notify->setInt32("payload-type", 206);
                notify->setInt32("feedback-type", msgType);
                notify->setInt32("sender", id);
                notify->post();
                ALOGI("FIR detected.");
            }
            break;
        }
        default:
        {
            ALOGI("Not supported PSFB type %d", msgType);
            break;
        }
    }

    return 0;
}
sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
    sp<ARTPSource> source;
    ssize_t index = info->mSources.indexOfKey(srcId);
    if (index < 0) {
        index = info->mSources.size();

        source = new ARTPSource(
                srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);

        if (mFlags & kViLTEConnection) {
            setStaticJitterTimeMs(50);
            source->setPeriodicFIR(false);
        }

        source->setSelfID(mSelfID);
        source->setStaticJitterTimeMs(mStaticJitterTimeMs);
        sp<AMessage> timer = new AMessage(kWhatAlarmStream, this);
        source->setJbTimer(timer);
        info->mSources.add(srcId, source);
    } else {
        source = info->mSources.valueAt(index);
    }

    return source;
}

void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
    sp<AMessage> msg = new AMessage(kWhatInjectPacket, this);
    msg->setInt32("index", index);
    msg->setBuffer("buffer", buffer);
    msg->post();
}

void ARTPConnection::setSelfID(const uint32_t selfID) {
    mSelfID = selfID;
}

void ARTPConnection::setStaticJitterTimeMs(const uint32_t jbTimeMs) {
    mStaticJitterTimeMs = jbTimeMs;
}

void ARTPConnection::setTargetBitrate(int32_t targetBitrate) {
    mTargetBitrate = targetBitrate;
}

void ARTPConnection::setRtpSockOptEcn(int32_t sockOptEcn) {
    mRtpSockOptEcn = sockOptEcn;
}

void ARTPConnection::setIsIPv6(const char *localIp) {
    mIsIPv6 = (strchr(localIp, ':') != nullptr);
}

void ARTPConnection::checkRxBitrate(int64_t nowUs) {
    if (mLastBitrateReportTimeUs <= 0) {
        mCumulativeBytes = 0;
        mLastBitrateReportTimeUs = nowUs;
    }
    else if (mLastEarlyNotifyTimeUs + kMinOneSecondNotifyDelayUs <= nowUs) {
        int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
        int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
        mLastEarlyNotifyTimeUs = nowUs;

        List<StreamInfo>::iterator it = mStreams.begin();
        while (it != mStreams.end()) {
            StreamInfo *s = &*it;
            if (s->mIsInjected) {
                ++it;
                continue;
            }
            for (size_t i = 0; i < s->mSources.size(); ++i) {
                sp<ARTPSource> source = s->mSources.valueAt(i);
                if (source->isNeedToEarlyNotify()) {
                    source->notifyPktInfo(bitrate, nowUs, false /* isRegular */);
                    mLastEarlyNotifyTimeUs = nowUs + (1000000ll * 3600 * 24); // after 1 day
                }
            }
            ++it;
        }
    }
    else if (mLastBitrateReportTimeUs + 1000000ll <= nowUs) {
        int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
        int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
        ALOGI("Actual Rx bitrate : %d bits/sec", bitrate);

        sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
        List<StreamInfo>::iterator it = mStreams.begin();
        while (it != mStreams.end()) {
            StreamInfo *s = &*it;
            if (s->mIsInjected) {
                ++it;
                continue;
            }

            if (s->mNumRTCPPacketsReceived == 0) {
                // We have never received any RTCP packets on this stream,
                // we don't even know where to send a report.
                ++it;
                continue;
            }

            buffer->setRange(0, 0);
            for (size_t i = 0; i < s->mSources.size(); ++i) {
                sp<ARTPSource> source = s->mSources.valueAt(i);
                source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
            }
            ++it;
        }
        mCumulativeBytes = 0;
        mLastBitrateReportTimeUs = nowUs;
        mLastEarlyNotifyTimeUs = nowUs;
    }
}
void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
    int32_t index;
    CHECK(msg->findInt32("index", &index));

    sp<ABuffer> buffer;
    CHECK(msg->findBuffer("buffer", &buffer));

    List<StreamInfo>::iterator it = mStreams.begin();
    while (it != mStreams.end()
           && it->mRTPSocket != index && it->mRTCPSocket != index) {
        ++it;
    }

    if (it == mStreams.end()) {
        TRESPASS();
    }

    StreamInfo *s = &*it;

    if (it->mRTPSocket == index) {
        parseRTP(s, buffer);
    } else {
        parseRTCP(s, buffer);
    }
}

}  // namespace android
