You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

402 lines
16 KiB

/*
* Copyright 2020 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "call/adaptation/resource_adaptation_processor.h"
#include <algorithm>
#include <string>
#include <utility>
#include "absl/algorithm/container.h"
#include "api/video/video_adaptation_counters.h"
#include "call/adaptation/video_stream_adapter.h"
#include "rtc_base/logging.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/strings/string_builder.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/to_queued_task.h"
namespace webrtc {
ResourceAdaptationProcessor::ResourceListenerDelegate::ResourceListenerDelegate(
ResourceAdaptationProcessor* processor)
: resource_adaptation_queue_(nullptr), processor_(processor) {}
void ResourceAdaptationProcessor::ResourceListenerDelegate::
SetResourceAdaptationQueue(TaskQueueBase* resource_adaptation_queue) {
RTC_DCHECK(!resource_adaptation_queue_);
RTC_DCHECK(resource_adaptation_queue);
resource_adaptation_queue_ = resource_adaptation_queue;
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
}
void ResourceAdaptationProcessor::ResourceListenerDelegate::
OnProcessorDestroyed() {
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
processor_ = nullptr;
}
void ResourceAdaptationProcessor::ResourceListenerDelegate::
OnResourceUsageStateMeasured(rtc::scoped_refptr<Resource> resource,
ResourceUsageState usage_state) {
if (!resource_adaptation_queue_->IsCurrent()) {
resource_adaptation_queue_->PostTask(ToQueuedTask(
[this_ref = rtc::scoped_refptr<ResourceListenerDelegate>(this),
resource, usage_state] {
this_ref->OnResourceUsageStateMeasured(resource, usage_state);
}));
return;
}
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
if (processor_) {
processor_->OnResourceUsageStateMeasured(resource, usage_state);
}
}
ResourceAdaptationProcessor::MitigationResultAndLogMessage::
MitigationResultAndLogMessage()
: result(MitigationResult::kAdaptationApplied), message() {}
ResourceAdaptationProcessor::MitigationResultAndLogMessage::
MitigationResultAndLogMessage(MitigationResult result, std::string message)
: result(result), message(std::move(message)) {}
ResourceAdaptationProcessor::ResourceAdaptationProcessor(
VideoStreamEncoderObserver* encoder_stats_observer,
VideoStreamAdapter* stream_adapter)
: resource_adaptation_queue_(nullptr),
resource_listener_delegate_(
new rtc::RefCountedObject<ResourceListenerDelegate>(this)),
encoder_stats_observer_(encoder_stats_observer),
resources_(),
stream_adapter_(stream_adapter),
last_reported_source_restrictions_(),
previous_mitigation_results_(),
processing_in_progress_(false) {
RTC_DCHECK(stream_adapter_);
}
ResourceAdaptationProcessor::~ResourceAdaptationProcessor() {
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
RTC_DCHECK(resources_.empty())
<< "There are resource(s) attached to a ResourceAdaptationProcessor "
<< "being destroyed.";
stream_adapter_->RemoveRestrictionsListener(this);
resource_listener_delegate_->OnProcessorDestroyed();
}
void ResourceAdaptationProcessor::SetResourceAdaptationQueue(
TaskQueueBase* resource_adaptation_queue) {
RTC_DCHECK(!resource_adaptation_queue_);
RTC_DCHECK(resource_adaptation_queue);
resource_adaptation_queue_ = resource_adaptation_queue;
resource_listener_delegate_->SetResourceAdaptationQueue(
resource_adaptation_queue);
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
// Now that we have the adaptation queue we can attach as adaptation listener.
stream_adapter_->AddRestrictionsListener(this);
}
void ResourceAdaptationProcessor::AddResourceLimitationsListener(
ResourceLimitationsListener* limitations_listener) {
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
RTC_DCHECK(std::find(resource_limitations_listeners_.begin(),
resource_limitations_listeners_.end(),
limitations_listener) ==
resource_limitations_listeners_.end());
resource_limitations_listeners_.push_back(limitations_listener);
}
void ResourceAdaptationProcessor::RemoveResourceLimitationsListener(
ResourceLimitationsListener* limitations_listener) {
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
auto it =
std::find(resource_limitations_listeners_.begin(),
resource_limitations_listeners_.end(), limitations_listener);
RTC_DCHECK(it != resource_limitations_listeners_.end());
resource_limitations_listeners_.erase(it);
}
void ResourceAdaptationProcessor::AddResource(
rtc::scoped_refptr<Resource> resource) {
RTC_DCHECK(resource);
{
MutexLock crit(&resources_lock_);
RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end())
<< "Resource \"" << resource->Name() << "\" was already registered.";
resources_.push_back(resource);
}
resource->SetResourceListener(resource_listener_delegate_);
}
std::vector<rtc::scoped_refptr<Resource>>
ResourceAdaptationProcessor::GetResources() const {
MutexLock crit(&resources_lock_);
return resources_;
}
void ResourceAdaptationProcessor::RemoveResource(
rtc::scoped_refptr<Resource> resource) {
RTC_DCHECK(resource);
RTC_LOG(INFO) << "Removing resource \"" << resource->Name() << "\".";
resource->SetResourceListener(nullptr);
{
MutexLock crit(&resources_lock_);
auto it = absl::c_find(resources_, resource);
RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name()
<< "\" was not a registered resource.";
resources_.erase(it);
}
RemoveLimitationsImposedByResource(std::move(resource));
}
void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource(
rtc::scoped_refptr<Resource> resource) {
if (!resource_adaptation_queue_->IsCurrent()) {
resource_adaptation_queue_->PostTask(ToQueuedTask(
[this, resource]() { RemoveLimitationsImposedByResource(resource); }));
return;
}
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
auto resource_adaptation_limits =
adaptation_limits_by_resources_.find(resource);
if (resource_adaptation_limits != adaptation_limits_by_resources_.end()) {
VideoStreamAdapter::RestrictionsWithCounters adaptation_limits =
resource_adaptation_limits->second;
adaptation_limits_by_resources_.erase(resource_adaptation_limits);
if (adaptation_limits_by_resources_.empty()) {
// Only the resource being removed was adapted so clear restrictions.
stream_adapter_->ClearRestrictions();
return;
}
VideoStreamAdapter::RestrictionsWithCounters most_limited =
FindMostLimitedResources().second;
if (adaptation_limits.counters.Total() <= most_limited.counters.Total()) {
// The removed limitations were less limited than the most limited
// resource. Don't change the current restrictions.
return;
}
// Apply the new most limited resource as the next restrictions.
Adaptation adapt_to = stream_adapter_->GetAdaptationTo(
most_limited.counters, most_limited.restrictions);
RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid);
stream_adapter_->ApplyAdaptation(adapt_to, nullptr);
RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to "
"next most limited restrictions: "
<< most_limited.restrictions.ToString() << " with counters "
<< most_limited.counters.ToString();
}
}
void ResourceAdaptationProcessor::OnResourceUsageStateMeasured(
rtc::scoped_refptr<Resource> resource,
ResourceUsageState usage_state) {
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
RTC_DCHECK(resource);
// |resource| could have been removed after signalling.
{
MutexLock crit(&resources_lock_);
if (absl::c_find(resources_, resource) == resources_.end()) {
RTC_LOG(INFO) << "Ignoring signal from removed resource \""
<< resource->Name() << "\".";
return;
}
}
MitigationResultAndLogMessage result_and_message;
switch (usage_state) {
case ResourceUsageState::kOveruse:
result_and_message = OnResourceOveruse(resource);
break;
case ResourceUsageState::kUnderuse:
result_and_message = OnResourceUnderuse(resource);
break;
}
// Maybe log the result of the operation.
auto it = previous_mitigation_results_.find(resource.get());
if (it != previous_mitigation_results_.end() &&
it->second == result_and_message.result) {
// This resource has previously reported the same result and we haven't
// successfully adapted since - don't log to avoid spam.
return;
}
RTC_LOG(INFO) << "Resource \"" << resource->Name() << "\" signalled "
<< ResourceUsageStateToString(usage_state) << ". "
<< result_and_message.message;
if (result_and_message.result == MitigationResult::kAdaptationApplied) {
previous_mitigation_results_.clear();
} else {
previous_mitigation_results_.insert(
std::make_pair(resource.get(), result_and_message.result));
}
}
ResourceAdaptationProcessor::MitigationResultAndLogMessage
ResourceAdaptationProcessor::OnResourceUnderuse(
rtc::scoped_refptr<Resource> reason_resource) {
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
RTC_DCHECK(!processing_in_progress_);
processing_in_progress_ = true;
// How can this stream be adapted up?
Adaptation adaptation = stream_adapter_->GetAdaptationUp(reason_resource);
if (adaptation.status() != Adaptation::Status::kValid) {
processing_in_progress_ = false;
rtc::StringBuilder message;
message << "Not adapting up because VideoStreamAdapter returned "
<< Adaptation::StatusToString(adaptation.status());
return MitigationResultAndLogMessage(MitigationResult::kRejectedByAdapter,
message.Release());
}
// Check that resource is most limited.
std::vector<rtc::scoped_refptr<Resource>> most_limited_resources;
VideoStreamAdapter::RestrictionsWithCounters most_limited_restrictions;
std::tie(most_limited_resources, most_limited_restrictions) =
FindMostLimitedResources();
// If the most restricted resource is less limited than current restrictions
// then proceed with adapting up.
if (!most_limited_resources.empty() &&
most_limited_restrictions.counters.Total() >=
stream_adapter_->adaptation_counters().Total()) {
// If |reason_resource| is not one of the most limiting resources then abort
// adaptation.
if (absl::c_find(most_limited_resources, reason_resource) ==
most_limited_resources.end()) {
processing_in_progress_ = false;
rtc::StringBuilder message;
message << "Resource \"" << reason_resource->Name()
<< "\" was not the most limited resource.";
return MitigationResultAndLogMessage(
MitigationResult::kNotMostLimitedResource, message.Release());
}
if (most_limited_resources.size() > 1) {
// If there are multiple most limited resources, all must signal underuse
// before the adaptation is applied.
UpdateResourceLimitations(reason_resource, adaptation.restrictions(),
adaptation.counters());
processing_in_progress_ = false;
rtc::StringBuilder message;
message << "Resource \"" << reason_resource->Name()
<< "\" was not the only most limited resource.";
return MitigationResultAndLogMessage(
MitigationResult::kSharedMostLimitedResource, message.Release());
}
}
// Apply adaptation.
stream_adapter_->ApplyAdaptation(adaptation, reason_resource);
processing_in_progress_ = false;
rtc::StringBuilder message;
message << "Adapted up successfully. Unfiltered adaptations: "
<< stream_adapter_->adaptation_counters().ToString();
return MitigationResultAndLogMessage(MitigationResult::kAdaptationApplied,
message.Release());
}
ResourceAdaptationProcessor::MitigationResultAndLogMessage
ResourceAdaptationProcessor::OnResourceOveruse(
rtc::scoped_refptr<Resource> reason_resource) {
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
RTC_DCHECK(!processing_in_progress_);
processing_in_progress_ = true;
// How can this stream be adapted up?
Adaptation adaptation = stream_adapter_->GetAdaptationDown();
if (adaptation.min_pixel_limit_reached()) {
encoder_stats_observer_->OnMinPixelLimitReached();
}
if (adaptation.status() != Adaptation::Status::kValid) {
processing_in_progress_ = false;
rtc::StringBuilder message;
message << "Not adapting down because VideoStreamAdapter returned "
<< Adaptation::StatusToString(adaptation.status());
return MitigationResultAndLogMessage(MitigationResult::kRejectedByAdapter,
message.Release());
}
// Apply adaptation.
UpdateResourceLimitations(reason_resource, adaptation.restrictions(),
adaptation.counters());
stream_adapter_->ApplyAdaptation(adaptation, reason_resource);
processing_in_progress_ = false;
rtc::StringBuilder message;
message << "Adapted down successfully. Unfiltered adaptations: "
<< stream_adapter_->adaptation_counters().ToString();
return MitigationResultAndLogMessage(MitigationResult::kAdaptationApplied,
message.Release());
}
std::pair<std::vector<rtc::scoped_refptr<Resource>>,
VideoStreamAdapter::RestrictionsWithCounters>
ResourceAdaptationProcessor::FindMostLimitedResources() const {
std::vector<rtc::scoped_refptr<Resource>> most_limited_resources;
VideoStreamAdapter::RestrictionsWithCounters most_limited_restrictions{
VideoSourceRestrictions(), VideoAdaptationCounters()};
for (const auto& resource_and_adaptation_limit_ :
adaptation_limits_by_resources_) {
const auto& restrictions_with_counters =
resource_and_adaptation_limit_.second;
if (restrictions_with_counters.counters.Total() >
most_limited_restrictions.counters.Total()) {
most_limited_restrictions = restrictions_with_counters;
most_limited_resources.clear();
most_limited_resources.push_back(resource_and_adaptation_limit_.first);
} else if (most_limited_restrictions.counters ==
restrictions_with_counters.counters) {
most_limited_resources.push_back(resource_and_adaptation_limit_.first);
}
}
return std::make_pair(std::move(most_limited_resources),
most_limited_restrictions);
}
void ResourceAdaptationProcessor::UpdateResourceLimitations(
rtc::scoped_refptr<Resource> reason_resource,
const VideoSourceRestrictions& restrictions,
const VideoAdaptationCounters& counters) {
auto& adaptation_limits = adaptation_limits_by_resources_[reason_resource];
if (adaptation_limits.restrictions == restrictions &&
adaptation_limits.counters == counters) {
return;
}
adaptation_limits = {restrictions, counters};
std::map<rtc::scoped_refptr<Resource>, VideoAdaptationCounters> limitations;
for (const auto& p : adaptation_limits_by_resources_) {
limitations.insert(std::make_pair(p.first, p.second.counters));
}
for (auto limitations_listener : resource_limitations_listeners_) {
limitations_listener->OnResourceLimitationChanged(reason_resource,
limitations);
}
}
void ResourceAdaptationProcessor::OnVideoSourceRestrictionsUpdated(
VideoSourceRestrictions restrictions,
const VideoAdaptationCounters& adaptation_counters,
rtc::scoped_refptr<Resource> reason,
const VideoSourceRestrictions& unfiltered_restrictions) {
RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
if (reason) {
UpdateResourceLimitations(reason, unfiltered_restrictions,
adaptation_counters);
} else if (adaptation_counters.Total() == 0) {
// Adaptations are cleared.
adaptation_limits_by_resources_.clear();
previous_mitigation_results_.clear();
for (auto limitations_listener : resource_limitations_listeners_) {
limitations_listener->OnResourceLimitationChanged(nullptr, {});
}
}
}
} // namespace webrtc