/* * Copyright (C) 2020 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "TranscodingSessionController" #define VALIDATE_STATE 1 #include #include #include #include #include #include #include #include namespace android { static_assert((SessionIdType)-1 < 0, "SessionIdType should be signed"); constexpr static uid_t OFFLINE_UID = -1; constexpr static size_t kSessionHistoryMax = 100; //static String8 TranscodingSessionController::sessionToString(const SessionKeyType& sessionKey) { return String8::format("{client:%lld, session:%d}", (long long)sessionKey.first, sessionKey.second); } //static const char* TranscodingSessionController::sessionStateToString(const Session::State sessionState) { switch (sessionState) { case Session::State::NOT_STARTED: return "NOT_STARTED"; case Session::State::RUNNING: return "RUNNING"; case Session::State::PAUSED: return "PAUSED"; case Session::State::FINISHED: return "FINISHED"; case Session::State::CANCELED: return "CANCELED"; case Session::State::ERROR: return "ERROR"; default: break; } return "(unknown)"; } /////////////////////////////////////////////////////////////////////////////// struct TranscodingSessionController::Watchdog { Watchdog(TranscodingSessionController* owner, int64_t timeoutUs); ~Watchdog(); // Starts monitoring the session. void start(const SessionKeyType& key); // Stops monitoring the session. void stop(); // Signals that the session is still alive. Must be sent at least every mTimeoutUs. // (Timeout will happen if no ping in mTimeoutUs since the last ping.) void keepAlive(); private: void threadLoop(); void updateTimer_l(); TranscodingSessionController* mOwner; const int64_t mTimeoutUs; mutable std::mutex mLock; std::condition_variable mCondition GUARDED_BY(mLock); // Whether watchdog is monitoring a session for timeout. bool mActive GUARDED_BY(mLock); // Whether watchdog is aborted and the monitoring thread should exit. bool mAbort GUARDED_BY(mLock); // When watchdog is active, the next timeout time point. std::chrono::steady_clock::time_point mNextTimeoutTime GUARDED_BY(mLock); // When watchdog is active, the session being watched. SessionKeyType mSessionToWatch GUARDED_BY(mLock); std::thread mThread; }; TranscodingSessionController::Watchdog::Watchdog(TranscodingSessionController* owner, int64_t timeoutUs) : mOwner(owner), mTimeoutUs(timeoutUs), mActive(false), mAbort(false), mThread(&Watchdog::threadLoop, this) { ALOGV("Watchdog CTOR: %p", this); } TranscodingSessionController::Watchdog::~Watchdog() { ALOGV("Watchdog DTOR: %p", this); { // Exit the looper thread. std::scoped_lock lock{mLock}; mAbort = true; mCondition.notify_one(); } mThread.join(); ALOGV("Watchdog DTOR: %p, done.", this); } void TranscodingSessionController::Watchdog::start(const SessionKeyType& key) { std::scoped_lock lock{mLock}; if (!mActive) { ALOGI("Watchdog start: %s", sessionToString(key).c_str()); mActive = true; mSessionToWatch = key; updateTimer_l(); mCondition.notify_one(); } } void TranscodingSessionController::Watchdog::stop() { std::scoped_lock lock{mLock}; if (mActive) { ALOGI("Watchdog stop: %s", sessionToString(mSessionToWatch).c_str()); mActive = false; mCondition.notify_one(); } } void TranscodingSessionController::Watchdog::keepAlive() { std::scoped_lock lock{mLock}; if (mActive) { ALOGI("Watchdog keepAlive: %s", sessionToString(mSessionToWatch).c_str()); updateTimer_l(); mCondition.notify_one(); } } // updateTimer_l() is only called with lock held. void TranscodingSessionController::Watchdog::updateTimer_l() NO_THREAD_SAFETY_ANALYSIS { std::chrono::microseconds timeout(mTimeoutUs); mNextTimeoutTime = std::chrono::steady_clock::now() + timeout; } // Unfortunately std::unique_lock is incompatible with -Wthread-safety. void TranscodingSessionController::Watchdog::threadLoop() NO_THREAD_SAFETY_ANALYSIS { androidSetThreadPriority(0 /*tid (0 = current) */, ANDROID_PRIORITY_BACKGROUND); std::unique_lock lock{mLock}; while (!mAbort) { if (!mActive) { mCondition.wait(lock); continue; } // Watchdog active, wait till next timeout time. if (mCondition.wait_until(lock, mNextTimeoutTime) == std::cv_status::timeout) { // If timeout happens, report timeout and deactivate watchdog. mActive = false; // Make a copy of session key, as once we unlock, it could be unprotected. SessionKeyType sessionKey = mSessionToWatch; ALOGE("Watchdog timeout: %s", sessionToString(sessionKey).c_str()); lock.unlock(); mOwner->onError(sessionKey.first, sessionKey.second, TranscodingErrorCode::kWatchdogTimeout); lock.lock(); } } } /////////////////////////////////////////////////////////////////////////////// struct TranscodingSessionController::Pacer { Pacer(const ControllerConfig& config) : mBurstThresholdMs(config.pacerBurstThresholdMs), mBurstCountQuota(config.pacerBurstCountQuota), mBurstTimeQuotaSec(config.pacerBurstTimeQuotaSeconds) {} ~Pacer() = default; bool onSessionStarted(uid_t uid, uid_t callingUid); void onSessionCompleted(uid_t uid, std::chrono::microseconds runningTime); void onSessionCancelled(uid_t uid); private: // Threshold of time between finish/start below which a back-to-back start is counted. int32_t mBurstThresholdMs; // Maximum allowed back-to-back start count. int32_t mBurstCountQuota; // Maximum allowed back-to-back running time. int32_t mBurstTimeQuotaSec; struct UidHistoryEntry { bool sessionActive = false; int32_t burstCount = 0; std::chrono::steady_clock::duration burstDuration{0}; std::chrono::steady_clock::time_point lastCompletedTime; }; std::map mUidHistoryMap; std::unordered_set mMtpUids; std::unordered_set mNonMtpUids; bool isSubjectToQuota(uid_t uid, uid_t callingUid); }; bool TranscodingSessionController::Pacer::isSubjectToQuota(uid_t uid, uid_t callingUid) { // Submitting with self uid is not limited (which can only happen if it's used as an // app-facing API). MediaProvider usage always submit on behalf of other uids. if (uid == callingUid) { return false; } if (mMtpUids.find(uid) != mMtpUids.end()) { return false; } if (mNonMtpUids.find(uid) != mNonMtpUids.end()) { return true; } // We don't have MTP permission info about this uid yet, check permission and save the result. int32_t result; if (__builtin_available(android __TRANSCODING_MIN_API__, *)) { if (APermissionManager_checkPermission("android.permission.ACCESS_MTP", -1 /*pid*/, uid, &result) == PERMISSION_MANAGER_STATUS_OK && result == PERMISSION_MANAGER_PERMISSION_GRANTED) { mMtpUids.insert(uid); return false; } } mNonMtpUids.insert(uid); return true; } bool TranscodingSessionController::Pacer::onSessionStarted(uid_t uid, uid_t callingUid) { if (!isSubjectToQuota(uid, callingUid)) { ALOGI("Pacer::onSessionStarted: uid %d (caling uid: %d): not subject to quota", uid, callingUid); return true; } // If uid doesn't exist, only insert the entry and mark session active. Skip quota checking. if (mUidHistoryMap.find(uid) == mUidHistoryMap.end()) { mUidHistoryMap.emplace(uid, UidHistoryEntry{}); mUidHistoryMap[uid].sessionActive = true; ALOGV("Pacer::onSessionStarted: uid %d: new", uid); return true; } // TODO: if Thermal throttling or resoure lost happened to occurr between this start // and the previous completion, we should deduct the paused time from the elapsed time. // (Individual session's pause time, on the other hand, doesn't need to be deducted // because it doesn't affect the gap between last completion and the start. auto timeSinceLastComplete = std::chrono::steady_clock::now() - mUidHistoryMap[uid].lastCompletedTime; if (mUidHistoryMap[uid].burstCount >= mBurstCountQuota && mUidHistoryMap[uid].burstDuration >= std::chrono::seconds(mBurstTimeQuotaSec)) { ALOGW("Pacer::onSessionStarted: uid %d: over quota, burst count %d, time %lldms", uid, mUidHistoryMap[uid].burstCount, (long long)mUidHistoryMap[uid].burstDuration.count() / 1000000); return false; } // If not over quota, allow the session, and reset as long as this is not too close // to previous completion. if (timeSinceLastComplete > std::chrono::milliseconds(mBurstThresholdMs)) { ALOGV("Pacer::onSessionStarted: uid %d: reset quota", uid); mUidHistoryMap[uid].burstCount = 0; mUidHistoryMap[uid].burstDuration = std::chrono::milliseconds(0); } else { ALOGV("Pacer::onSessionStarted: uid %d: burst count %d, time %lldms", uid, mUidHistoryMap[uid].burstCount, (long long)mUidHistoryMap[uid].burstDuration.count() / 1000000); } mUidHistoryMap[uid].sessionActive = true; return true; } void TranscodingSessionController::Pacer::onSessionCompleted( uid_t uid, std::chrono::microseconds runningTime) { // Skip quota update if this uid missed the start. (Could happen if the uid is added via // addClientUid() after the session start.) if (mUidHistoryMap.find(uid) == mUidHistoryMap.end() || !mUidHistoryMap[uid].sessionActive) { ALOGV("Pacer::onSessionCompleted: uid %d: not started", uid); return; } ALOGV("Pacer::onSessionCompleted: uid %d: runningTime %lld", uid, runningTime.count() / 1000); mUidHistoryMap[uid].sessionActive = false; mUidHistoryMap[uid].burstCount++; mUidHistoryMap[uid].burstDuration += runningTime; mUidHistoryMap[uid].lastCompletedTime = std::chrono::steady_clock::now(); } void TranscodingSessionController::Pacer::onSessionCancelled(uid_t uid) { if (mUidHistoryMap.find(uid) == mUidHistoryMap.end()) { ALOGV("Pacer::onSessionCancelled: uid %d: not present", uid); return; } // This is only called if a uid is removed from a session (due to it being killed // or the original submitting client was gone but session was kept for offline use). // Since the uid is going to miss the onSessionCompleted(), we can't track this // session, and have to check back at next onSessionStarted(). mUidHistoryMap[uid].sessionActive = false; } /////////////////////////////////////////////////////////////////////////////// TranscodingSessionController::TranscodingSessionController( const TranscoderFactoryType& transcoderFactory, const std::shared_ptr& uidPolicy, const std::shared_ptr& resourcePolicy, const std::shared_ptr& thermalPolicy, const ControllerConfig* config) : mTranscoderFactory(transcoderFactory), mUidPolicy(uidPolicy), mResourcePolicy(resourcePolicy), mThermalPolicy(thermalPolicy), mCurrentSession(nullptr), mResourceLost(false) { // Only push empty offline queue initially. Realtime queues are added when requests come in. mUidSortedList.push_back(OFFLINE_UID); mOfflineUidIterator = mUidSortedList.begin(); mSessionQueues.emplace(OFFLINE_UID, SessionQueueType()); mUidPackageNames[OFFLINE_UID] = "(offline)"; mThermalThrottling = thermalPolicy->getThrottlingStatus(); if (config != nullptr) { mConfig = *config; } mPacer.reset(new Pacer(mConfig)); ALOGD("@@@ watchdog %lld, burst count %d, burst time %d, burst threshold %d", (long long)mConfig.watchdogTimeoutUs, mConfig.pacerBurstCountQuota, mConfig.pacerBurstTimeQuotaSeconds, mConfig.pacerBurstThresholdMs); } TranscodingSessionController::~TranscodingSessionController() {} void TranscodingSessionController::dumpSession_l(const Session& session, String8& result, bool closedSession) { const size_t SIZE = 256; char buffer[SIZE]; const TranscodingRequestParcel& request = session.request; snprintf(buffer, SIZE, " Session: %s, %s, %d%%\n", sessionToString(session.key).c_str(), sessionStateToString(session.getState()), session.lastProgress); result.append(buffer); snprintf(buffer, SIZE, " pkg: %s\n", request.clientPackageName.c_str()); result.append(buffer); snprintf(buffer, SIZE, " src: %s\n", request.sourceFilePath.c_str()); result.append(buffer); snprintf(buffer, SIZE, " dst: %s\n", request.destinationFilePath.c_str()); result.append(buffer); if (closedSession) { snprintf(buffer, SIZE, " waiting: %.1fs, running: %.1fs, paused: %.1fs, paused count: %d\n", session.waitingTime.count() / 1000000.0f, session.runningTime.count() / 1000000.0f, session.pausedTime.count() / 1000000.0f, session.pauseCount); result.append(buffer); } } void TranscodingSessionController::dumpAllSessions(int fd, const Vector& args __unused) { String8 result; const size_t SIZE = 256; char buffer[SIZE]; std::scoped_lock lock{mLock}; snprintf(buffer, SIZE, "\n========== Dumping live sessions queues =========\n"); result.append(buffer); snprintf(buffer, SIZE, " Total num of Sessions: %zu\n", mSessionMap.size()); result.append(buffer); std::vector uids(mUidSortedList.begin(), mUidSortedList.end()); for (int32_t i = 0; i < uids.size(); i++) { const uid_t uid = uids[i]; if (mSessionQueues[uid].empty()) { continue; } snprintf(buffer, SIZE, " uid: %d, pkg: %s\n", uid, mUidPackageNames.count(uid) > 0 ? mUidPackageNames[uid].c_str() : "(unknown)"); result.append(buffer); snprintf(buffer, SIZE, " Num of sessions: %zu\n", mSessionQueues[uid].size()); result.append(buffer); for (auto& sessionKey : mSessionQueues[uid]) { auto sessionIt = mSessionMap.find(sessionKey); if (sessionIt == mSessionMap.end()) { snprintf(buffer, SIZE, "Failed to look up Session %s \n", sessionToString(sessionKey).c_str()); result.append(buffer); continue; } dumpSession_l(sessionIt->second, result); } } snprintf(buffer, SIZE, "\n========== Dumping past sessions =========\n"); result.append(buffer); for (auto& session : mSessionHistory) { dumpSession_l(session, result, true /*closedSession*/); } write(fd, result.string(), result.size()); } /* * Returns nullptr if there is no session, or we're paused globally (due to resource lost, * thermal throttling, etc.). Otherwise, return the session that should be run next. */ TranscodingSessionController::Session* TranscodingSessionController::getTopSession_l() { if (mSessionMap.empty()) { return nullptr; } // Return nullptr if we're paused globally due to resource lost or thermal throttling. if (((mResourcePolicy != nullptr && mResourceLost) || (mThermalPolicy != nullptr && mThermalThrottling))) { return nullptr; } uid_t topUid = *mUidSortedList.begin(); // If the current session is running, and it's in the topUid's queue, let it continue // to run even if it's not the earliest in that uid's queue. // For example, uid(B) is added to a session while it's pending in uid(A)'s queue, then // B is brought to front which caused the session to run, then user switches back to A. if (mCurrentSession != nullptr && mCurrentSession->getState() == Session::RUNNING && mCurrentSession->allClientUids.count(topUid) > 0) { return mCurrentSession; } SessionKeyType topSessionKey = *mSessionQueues[topUid].begin(); return &mSessionMap[topSessionKey]; } void TranscodingSessionController::setSessionState_l(Session* session, Session::State state) { bool wasRunning = (session->getState() == Session::RUNNING); session->setState(state); bool isRunning = (session->getState() == Session::RUNNING); if (wasRunning == isRunning) { return; } // Currently we only have 1 running session, and we always put the previous // session in non-running state before we run the new session, so it's okay // to start/stop the watchdog here. If this assumption changes, we need to // track the number of running sessions and start/stop watchdog based on that. if (isRunning) { mWatchdog->start(session->key); } else { mWatchdog->stop(); } } void TranscodingSessionController::Session::setState(Session::State newState) { if (state == newState) { return; } auto nowTime = std::chrono::steady_clock::now(); if (state != INVALID) { std::chrono::microseconds elapsedTime = std::chrono::duration_cast(nowTime - stateEnterTime); switch (state) { case PAUSED: pausedTime = pausedTime + elapsedTime; break; case RUNNING: runningTime = runningTime + elapsedTime; break; case NOT_STARTED: waitingTime = waitingTime + elapsedTime; break; default: break; } } if (newState == PAUSED) { pauseCount++; } stateEnterTime = nowTime; state = newState; } void TranscodingSessionController::updateCurrentSession_l() { Session* curSession = mCurrentSession; Session* topSession = nullptr; // Delayed init of transcoder and watchdog. if (mTranscoder == nullptr) { mTranscoder = mTranscoderFactory(shared_from_this()); mWatchdog = std::make_shared(this, mConfig.watchdogTimeoutUs); } // If we found a different top session, or the top session's running state is not // correct. Take some actions to ensure it's correct. while ((topSession = getTopSession_l()) != curSession || (topSession != nullptr && !topSession->isRunning())) { ALOGV("updateCurrentSession_l: topSession is %s, curSession is %s", topSession == nullptr ? "null" : sessionToString(topSession->key).c_str(), curSession == nullptr ? "null" : sessionToString(curSession->key).c_str()); // If current session is running, pause it first. Note this is needed for either // cases: 1) Top session is changing to another session, or 2) Top session is // changing to null (which means we should be globally paused). if (curSession != nullptr && curSession->getState() == Session::RUNNING) { mTranscoder->pause(curSession->key.first, curSession->key.second); setSessionState_l(curSession, Session::PAUSED); } if (topSession == nullptr) { // Nothing more to run (either no session or globally paused). break; } // Otherwise, ensure topSession is running. if (topSession->getState() == Session::NOT_STARTED) { // Check if at least one client has quota to start the session. bool keepForClient = false; for (uid_t uid : topSession->allClientUids) { if (mPacer->onSessionStarted(uid, topSession->callingUid)) { keepForClient = true; // DO NOT break here, because book-keeping still needs to happen // for the other uids. } } if (!keepForClient) { // Unfortunately all uids requesting this session are out of quota. // Drop this session and try the next one. { auto clientCallback = mSessionMap[topSession->key].callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingFailed( topSession->key.second, TranscodingErrorCode::kDroppedByService); } } removeSession_l(topSession->key, Session::DROPPED_BY_PACER); continue; } mTranscoder->start(topSession->key.first, topSession->key.second, topSession->request, topSession->callingUid, topSession->callback.lock()); setSessionState_l(topSession, Session::RUNNING); } else if (topSession->getState() == Session::PAUSED) { mTranscoder->resume(topSession->key.first, topSession->key.second, topSession->request, topSession->callingUid, topSession->callback.lock()); setSessionState_l(topSession, Session::RUNNING); } break; } mCurrentSession = topSession; } void TranscodingSessionController::addUidToSession_l(uid_t clientUid, const SessionKeyType& sessionKey) { // If it's an offline session, the queue was already added in constructor. // If it's a real-time sessions, check if a queue is already present for the uid, // and add a new queue if needed. if (clientUid != OFFLINE_UID) { if (mSessionQueues.count(clientUid) == 0) { mUidPolicy->registerMonitorUid(clientUid); if (mUidPolicy->isUidOnTop(clientUid)) { mUidSortedList.push_front(clientUid); } else { // Shouldn't be submitting real-time requests from non-top app, // put it in front of the offline queue. mUidSortedList.insert(mOfflineUidIterator, clientUid); } } else if (clientUid != *mUidSortedList.begin()) { if (mUidPolicy->isUidOnTop(clientUid)) { mUidSortedList.remove(clientUid); mUidSortedList.push_front(clientUid); } } } // Append this session to the uid's queue. mSessionQueues[clientUid].push_back(sessionKey); } void TranscodingSessionController::removeSession_l( const SessionKeyType& sessionKey, Session::State finalState, const std::shared_ptr>& keepUid) { ALOGV("%s: session %s", __FUNCTION__, sessionToString(sessionKey).c_str()); if (mSessionMap.count(sessionKey) == 0) { ALOGE("session %s doesn't exist", sessionToString(sessionKey).c_str()); return; } // Remove session from uid's queue. bool uidQueueRemoved = false; std::unordered_set remainingUids; for (uid_t uid : mSessionMap[sessionKey].allClientUids) { if (keepUid != nullptr) { if ((*keepUid)(uid)) { remainingUids.insert(uid); continue; } // If we have uids to keep, the session is not going to any final // state we can't use onSessionCompleted as the running time will // not be valid. Only notify pacer to stop tracking this session. mPacer->onSessionCancelled(uid); } SessionQueueType& sessionQueue = mSessionQueues[uid]; auto it = std::find(sessionQueue.begin(), sessionQueue.end(), sessionKey); if (it == sessionQueue.end()) { ALOGW("couldn't find session %s in queue for uid %d", sessionToString(sessionKey).c_str(), uid); continue; } sessionQueue.erase(it); // If this is the last session in a real-time queue, remove this uid's queue. if (uid != OFFLINE_UID && sessionQueue.empty()) { mUidSortedList.remove(uid); mSessionQueues.erase(uid); mUidPolicy->unregisterMonitorUid(uid); uidQueueRemoved = true; } } if (uidQueueRemoved) { std::unordered_set topUids = mUidPolicy->getTopUids(); moveUidsToTop_l(topUids, false /*preserveTopUid*/); } if (keepUid != nullptr) { mSessionMap[sessionKey].allClientUids = remainingUids; return; } // Clear current session. if (mCurrentSession == &mSessionMap[sessionKey]) { mCurrentSession = nullptr; } setSessionState_l(&mSessionMap[sessionKey], finalState); // We can use onSessionCompleted() even for CANCELLED, because runningTime is // now updated by setSessionState_l(). for (uid_t uid : mSessionMap[sessionKey].allClientUids) { mPacer->onSessionCompleted(uid, mSessionMap[sessionKey].runningTime); } mSessionHistory.push_back(mSessionMap[sessionKey]); if (mSessionHistory.size() > kSessionHistoryMax) { mSessionHistory.erase(mSessionHistory.begin()); } // Remove session from session map. mSessionMap.erase(sessionKey); } /** * Moves the set of uids to the front of mUidSortedList (which is used to pick * the next session to run). * * This is called when 1) we received a onTopUidsChanged() callback from UidPolicy, * or 2) we removed the session queue for a uid because it becomes empty. * * In case of 1), if there are multiple uids in the set, and the current front * uid in mUidSortedList is still in the set, we try to keep that uid at front * so that current session run is not interrupted. (This is not a concern for case 2) * because the queue for a uid was just removed entirely.) */ void TranscodingSessionController::moveUidsToTop_l(const std::unordered_set& uids, bool preserveTopUid) { // If uid set is empty, nothing to do. Do not change the queue status. if (uids.empty()) { return; } // Save the current top uid. uid_t curTopUid = *mUidSortedList.begin(); bool pushCurTopToFront = false; int32_t numUidsMoved = 0; // Go through the sorted uid list once, and move the ones in top set to front. for (auto it = mUidSortedList.begin(); it != mUidSortedList.end();) { uid_t uid = *it; if (uid != OFFLINE_UID && uids.count(uid) > 0) { it = mUidSortedList.erase(it); // If this is the top we're preserving, don't push it here, push // it after the for-loop. if (uid == curTopUid && preserveTopUid) { pushCurTopToFront = true; } else { mUidSortedList.push_front(uid); } // If we found all uids in the set, break out. if (++numUidsMoved == uids.size()) { break; } } else { ++it; } } if (pushCurTopToFront) { mUidSortedList.push_front(curTopUid); } } bool TranscodingSessionController::submit( ClientIdType clientId, SessionIdType sessionId, uid_t callingUid, uid_t clientUid, const TranscodingRequestParcel& request, const std::weak_ptr& callback) { SessionKeyType sessionKey = std::make_pair(clientId, sessionId); ALOGV("%s: session %s, uid %d, prioirty %d", __FUNCTION__, sessionToString(sessionKey).c_str(), clientUid, (int32_t)request.priority); std::scoped_lock lock{mLock}; if (mSessionMap.count(sessionKey) > 0) { ALOGE("session %s already exists", sessionToString(sessionKey).c_str()); return false; } // Add the uid package name to the store of package names we already know. if (mUidPackageNames.count(clientUid) == 0) { mUidPackageNames.emplace(clientUid, request.clientPackageName); } // TODO(chz): only support offline vs real-time for now. All kUnspecified sessions // go to offline queue. if (request.priority == TranscodingSessionPriority::kUnspecified) { clientUid = OFFLINE_UID; } // Add session to session map. mSessionMap[sessionKey].key = sessionKey; mSessionMap[sessionKey].callingUid = callingUid; mSessionMap[sessionKey].allClientUids.insert(clientUid); mSessionMap[sessionKey].request = request; mSessionMap[sessionKey].callback = callback; setSessionState_l(&mSessionMap[sessionKey], Session::NOT_STARTED); addUidToSession_l(clientUid, sessionKey); updateCurrentSession_l(); validateState_l(); return true; } bool TranscodingSessionController::cancel(ClientIdType clientId, SessionIdType sessionId) { SessionKeyType sessionKey = std::make_pair(clientId, sessionId); ALOGV("%s: session %s", __FUNCTION__, sessionToString(sessionKey).c_str()); std::list sessionsToRemove, sessionsForOffline; std::scoped_lock lock{mLock}; if (sessionId < 0) { for (auto it = mSessionMap.begin(); it != mSessionMap.end(); ++it) { if (it->first.first == clientId) { // If there is offline request, only keep the offline client; // otherwise remove the session. if (it->second.allClientUids.count(OFFLINE_UID) > 0) { sessionsForOffline.push_back(it->first); } else { sessionsToRemove.push_back(it->first); } } } } else { if (mSessionMap.count(sessionKey) == 0) { ALOGE("session %s doesn't exist", sessionToString(sessionKey).c_str()); return false; } sessionsToRemove.push_back(sessionKey); } for (auto it = sessionsToRemove.begin(); it != sessionsToRemove.end(); ++it) { // If the session has ever been started, stop it now. // Note that stop() is needed even if the session is currently paused. This instructs // the transcoder to discard any states for the session, otherwise the states may // never be discarded. if (mSessionMap[*it].getState() != Session::NOT_STARTED) { mTranscoder->stop(it->first, it->second); } // Remove the session. removeSession_l(*it, Session::CANCELED); } auto keepUid = std::make_shared>( [](uid_t uid) { return uid == OFFLINE_UID; }); for (auto it = sessionsForOffline.begin(); it != sessionsForOffline.end(); ++it) { removeSession_l(*it, Session::CANCELED, keepUid); } // Start next session. updateCurrentSession_l(); validateState_l(); return true; } bool TranscodingSessionController::addClientUid(ClientIdType clientId, SessionIdType sessionId, uid_t clientUid) { SessionKeyType sessionKey = std::make_pair(clientId, sessionId); std::scoped_lock lock{mLock}; if (mSessionMap.count(sessionKey) == 0) { ALOGE("session %s doesn't exist", sessionToString(sessionKey).c_str()); return false; } if (mSessionMap[sessionKey].allClientUids.count(clientUid) > 0) { ALOGE("session %s already has uid %d", sessionToString(sessionKey).c_str(), clientUid); return false; } mSessionMap[sessionKey].allClientUids.insert(clientUid); addUidToSession_l(clientUid, sessionKey); updateCurrentSession_l(); validateState_l(); return true; } bool TranscodingSessionController::getClientUids(ClientIdType clientId, SessionIdType sessionId, std::vector* out_clientUids) { SessionKeyType sessionKey = std::make_pair(clientId, sessionId); std::scoped_lock lock{mLock}; if (mSessionMap.count(sessionKey) == 0) { ALOGE("session %s doesn't exist", sessionToString(sessionKey).c_str()); return false; } out_clientUids->clear(); for (uid_t uid : mSessionMap[sessionKey].allClientUids) { if (uid != OFFLINE_UID) { out_clientUids->push_back(uid); } } return true; } bool TranscodingSessionController::getSession(ClientIdType clientId, SessionIdType sessionId, TranscodingRequestParcel* request) { SessionKeyType sessionKey = std::make_pair(clientId, sessionId); std::scoped_lock lock{mLock}; if (mSessionMap.count(sessionKey) == 0) { ALOGE("session %s doesn't exist", sessionToString(sessionKey).c_str()); return false; } *(TranscodingRequest*)request = mSessionMap[sessionKey].request; return true; } void TranscodingSessionController::notifyClient(ClientIdType clientId, SessionIdType sessionId, const char* reason, std::function func) { SessionKeyType sessionKey = std::make_pair(clientId, sessionId); std::scoped_lock lock{mLock}; if (mSessionMap.count(sessionKey) == 0) { ALOGW("%s: ignoring %s for session %s that doesn't exist", __FUNCTION__, reason, sessionToString(sessionKey).c_str()); return; } // Only ignore if session was never started. In particular, propagate the status // to client if the session is paused. Transcoder could have posted finish when // we're pausing it, and the finish arrived after we changed current session. if (mSessionMap[sessionKey].getState() == Session::NOT_STARTED) { ALOGW("%s: ignoring %s for session %s that was never started", __FUNCTION__, reason, sessionToString(sessionKey).c_str()); return; } ALOGV("%s: session %s %s", __FUNCTION__, sessionToString(sessionKey).c_str(), reason); func(sessionKey); } void TranscodingSessionController::onStarted(ClientIdType clientId, SessionIdType sessionId) { notifyClient(clientId, sessionId, "started", [=](const SessionKeyType& sessionKey) { auto callback = mSessionMap[sessionKey].callback.lock(); if (callback != nullptr) { callback->onTranscodingStarted(sessionId); } }); } void TranscodingSessionController::onPaused(ClientIdType clientId, SessionIdType sessionId) { notifyClient(clientId, sessionId, "paused", [=](const SessionKeyType& sessionKey) { auto callback = mSessionMap[sessionKey].callback.lock(); if (callback != nullptr) { callback->onTranscodingPaused(sessionId); } }); } void TranscodingSessionController::onResumed(ClientIdType clientId, SessionIdType sessionId) { notifyClient(clientId, sessionId, "resumed", [=](const SessionKeyType& sessionKey) { auto callback = mSessionMap[sessionKey].callback.lock(); if (callback != nullptr) { callback->onTranscodingResumed(sessionId); } }); } void TranscodingSessionController::onFinish(ClientIdType clientId, SessionIdType sessionId) { notifyClient(clientId, sessionId, "finish", [=](const SessionKeyType& sessionKey) { { auto clientCallback = mSessionMap[sessionKey].callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingFinished( sessionId, TranscodingResultParcel({sessionId, -1 /*actualBitrateBps*/, std::nullopt /*sessionStats*/})); } } // Remove the session. removeSession_l(sessionKey, Session::FINISHED); // Start next session. updateCurrentSession_l(); validateState_l(); }); } void TranscodingSessionController::onError(ClientIdType clientId, SessionIdType sessionId, TranscodingErrorCode err) { notifyClient(clientId, sessionId, "error", [=](const SessionKeyType& sessionKey) { if (err == TranscodingErrorCode::kWatchdogTimeout) { // Abandon the transcoder, as its handler thread might be stuck in some call to // MediaTranscoder altogether, and may not be able to handle any new tasks. mTranscoder->stop(clientId, sessionId, true /*abandon*/); // Clear the last ref count before we create new transcoder. mTranscoder = nullptr; mTranscoder = mTranscoderFactory(shared_from_this()); } { auto clientCallback = mSessionMap[sessionKey].callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingFailed(sessionId, err); } } // Remove the session. removeSession_l(sessionKey, Session::ERROR); // Start next session. updateCurrentSession_l(); validateState_l(); }); } void TranscodingSessionController::onProgressUpdate(ClientIdType clientId, SessionIdType sessionId, int32_t progress) { notifyClient(clientId, sessionId, "progress", [=](const SessionKeyType& sessionKey) { auto callback = mSessionMap[sessionKey].callback.lock(); if (callback != nullptr) { callback->onProgressUpdate(sessionId, progress); } mSessionMap[sessionKey].lastProgress = progress; }); } void TranscodingSessionController::onHeartBeat(ClientIdType clientId, SessionIdType sessionId) { notifyClient(clientId, sessionId, "heart-beat", [=](const SessionKeyType& /*sessionKey*/) { mWatchdog->keepAlive(); }); } void TranscodingSessionController::onResourceLost(ClientIdType clientId, SessionIdType sessionId) { ALOGI("%s", __FUNCTION__); notifyClient(clientId, sessionId, "resource_lost", [=](const SessionKeyType& sessionKey) { if (mResourceLost) { return; } Session* resourceLostSession = &mSessionMap[sessionKey]; if (resourceLostSession->getState() != Session::RUNNING) { ALOGW("session %s lost resource but is no longer running", sessionToString(sessionKey).c_str()); return; } // If we receive a resource loss event, the transcoder already paused the transcoding, // so we don't need to call onPaused() to pause it. However, we still need to notify // the client and update the session state here. setSessionState_l(resourceLostSession, Session::PAUSED); // Notify the client as a paused event. auto clientCallback = resourceLostSession->callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingPaused(sessionKey.second); } if (mResourcePolicy != nullptr) { mResourcePolicy->setPidResourceLost(resourceLostSession->request.clientPid); } mResourceLost = true; validateState_l(); }); } void TranscodingSessionController::onTopUidsChanged(const std::unordered_set& uids) { if (uids.empty()) { ALOGW("%s: ignoring empty uids", __FUNCTION__); return; } std::string uidStr; for (auto it = uids.begin(); it != uids.end(); it++) { if (!uidStr.empty()) { uidStr += ", "; } uidStr += std::to_string(*it); } ALOGD("%s: topUids: size %zu, uids: %s", __FUNCTION__, uids.size(), uidStr.c_str()); std::scoped_lock lock{mLock}; moveUidsToTop_l(uids, true /*preserveTopUid*/); updateCurrentSession_l(); validateState_l(); } void TranscodingSessionController::onUidGone(uid_t goneUid) { ALOGD("%s: gone uid %u", __FUNCTION__, goneUid); std::list sessionsToRemove, sessionsForOtherUids; std::scoped_lock lock{mLock}; for (auto it = mSessionMap.begin(); it != mSessionMap.end(); ++it) { if (it->second.allClientUids.count(goneUid) > 0) { // If goneUid is the only uid, remove the session; otherwise, only // remove the uid from the session. if (it->second.allClientUids.size() > 1) { sessionsForOtherUids.push_back(it->first); } else { sessionsToRemove.push_back(it->first); } } } for (auto it = sessionsToRemove.begin(); it != sessionsToRemove.end(); ++it) { // If the session has ever been started, stop it now. // Note that stop() is needed even if the session is currently paused. This instructs // the transcoder to discard any states for the session, otherwise the states may // never be discarded. if (mSessionMap[*it].getState() != Session::NOT_STARTED) { mTranscoder->stop(it->first, it->second); } { auto clientCallback = mSessionMap[*it].callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingFailed(it->second, TranscodingErrorCode::kUidGoneCancelled); } } // Remove the session. removeSession_l(*it, Session::CANCELED); } auto keepUid = std::make_shared>( [goneUid](uid_t uid) { return uid != goneUid; }); for (auto it = sessionsForOtherUids.begin(); it != sessionsForOtherUids.end(); ++it) { removeSession_l(*it, Session::CANCELED, keepUid); } // Start next session. updateCurrentSession_l(); validateState_l(); } void TranscodingSessionController::onResourceAvailable() { std::scoped_lock lock{mLock}; if (!mResourceLost) { return; } ALOGI("%s", __FUNCTION__); mResourceLost = false; updateCurrentSession_l(); validateState_l(); } void TranscodingSessionController::onThrottlingStarted() { std::scoped_lock lock{mLock}; if (mThermalThrottling) { return; } ALOGI("%s", __FUNCTION__); mThermalThrottling = true; updateCurrentSession_l(); validateState_l(); } void TranscodingSessionController::onThrottlingStopped() { std::scoped_lock lock{mLock}; if (!mThermalThrottling) { return; } ALOGI("%s", __FUNCTION__); mThermalThrottling = false; updateCurrentSession_l(); validateState_l(); } void TranscodingSessionController::validateState_l() { #ifdef VALIDATE_STATE LOG_ALWAYS_FATAL_IF(mSessionQueues.count(OFFLINE_UID) != 1, "mSessionQueues offline queue number is not 1"); LOG_ALWAYS_FATAL_IF(*mOfflineUidIterator != OFFLINE_UID, "mOfflineUidIterator not pointing to offline uid"); LOG_ALWAYS_FATAL_IF(mUidSortedList.size() != mSessionQueues.size(), "mUidSortedList and mSessionQueues size mismatch, %zu vs %zu", mUidSortedList.size(), mSessionQueues.size()); int32_t totalSessions = 0; for (auto uid : mUidSortedList) { LOG_ALWAYS_FATAL_IF(mSessionQueues.count(uid) != 1, "mSessionQueues count for uid %d is not 1", uid); for (auto& sessionKey : mSessionQueues[uid]) { LOG_ALWAYS_FATAL_IF(mSessionMap.count(sessionKey) != 1, "mSessions count for session %s is not 1", sessionToString(sessionKey).c_str()); } totalSessions += mSessionQueues[uid].size(); } int32_t totalSessionsAlternative = 0; for (auto const& s : mSessionMap) { totalSessionsAlternative += s.second.allClientUids.size(); } LOG_ALWAYS_FATAL_IF(totalSessions != totalSessionsAlternative, "session count (including dup) from mSessionQueues doesn't match that from " "mSessionMap, %d vs %d", totalSessions, totalSessionsAlternative); #endif // VALIDATE_STATE } } // namespace android