/* * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "test/network/network_emulation.h" #include #include #include #include "api/units/data_size.h" #include "rtc_base/bind.h" #include "rtc_base/logging.h" namespace webrtc { void LinkEmulation::OnPacketReceived(EmulatedIpPacket packet) { task_queue_->PostTask([this, packet = std::move(packet)]() mutable { RTC_DCHECK_RUN_ON(task_queue_); uint64_t packet_id = next_packet_id_++; bool sent = network_behavior_->EnqueuePacket(PacketInFlightInfo( packet.ip_packet_size(), packet.arrival_time.us(), packet_id)); if (sent) { packets_.emplace_back(StoredPacket{packet_id, std::move(packet), false}); } if (process_task_.Running()) return; absl::optional next_time_us = network_behavior_->NextDeliveryTimeUs(); if (!next_time_us) return; Timestamp current_time = clock_->CurrentTime(); process_task_ = RepeatingTaskHandle::DelayedStart( task_queue_->Get(), std::max(TimeDelta::Zero(), Timestamp::Micros(*next_time_us) - current_time), [this]() { RTC_DCHECK_RUN_ON(task_queue_); Timestamp current_time = clock_->CurrentTime(); Process(current_time); absl::optional next_time_us = network_behavior_->NextDeliveryTimeUs(); if (!next_time_us) { process_task_.Stop(); return TimeDelta::Zero(); // This is ignored. } RTC_DCHECK_GE(*next_time_us, current_time.us()); return Timestamp::Micros(*next_time_us) - current_time; }); }); } void LinkEmulation::Process(Timestamp at_time) { std::vector delivery_infos = network_behavior_->DequeueDeliverablePackets(at_time.us()); for (PacketDeliveryInfo& delivery_info : delivery_infos) { StoredPacket* packet = nullptr; for (auto& stored_packet : packets_) { if (stored_packet.id == delivery_info.packet_id) { packet = &stored_packet; break; } } RTC_CHECK(packet); RTC_DCHECK(!packet->removed); packet->removed = true; if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) { packet->packet.arrival_time = Timestamp::Micros(delivery_info.receive_time_us); receiver_->OnPacketReceived(std::move(packet->packet)); } while (!packets_.empty() && packets_.front().removed) { packets_.pop_front(); } } } NetworkRouterNode::NetworkRouterNode(rtc::TaskQueue* task_queue) : task_queue_(task_queue) {} void NetworkRouterNode::OnPacketReceived(EmulatedIpPacket packet) { RTC_DCHECK_RUN_ON(task_queue_); if (watcher_) { watcher_(packet); } if (filter_) { if (!filter_(packet)) return; } auto receiver_it = routing_.find(packet.to.ipaddr()); if (receiver_it == routing_.end()) { return; } RTC_CHECK(receiver_it != routing_.end()); receiver_it->second->OnPacketReceived(std::move(packet)); } void NetworkRouterNode::SetReceiver( const rtc::IPAddress& dest_ip, EmulatedNetworkReceiverInterface* receiver) { task_queue_->PostTask([=] { RTC_DCHECK_RUN_ON(task_queue_); EmulatedNetworkReceiverInterface* cur_receiver = routing_[dest_ip]; RTC_CHECK(cur_receiver == nullptr || cur_receiver == receiver) << "Routing for dest_ip=" << dest_ip.ToString() << " already exists"; routing_[dest_ip] = receiver; }); } void NetworkRouterNode::RemoveReceiver(const rtc::IPAddress& dest_ip) { RTC_DCHECK_RUN_ON(task_queue_); routing_.erase(dest_ip); } void NetworkRouterNode::SetWatcher( std::function watcher) { task_queue_->PostTask([=] { RTC_DCHECK_RUN_ON(task_queue_); watcher_ = watcher; }); } void NetworkRouterNode::SetFilter( std::function filter) { task_queue_->PostTask([=] { RTC_DCHECK_RUN_ON(task_queue_); filter_ = filter; }); } EmulatedNetworkNode::EmulatedNetworkNode( Clock* clock, rtc::TaskQueue* task_queue, std::unique_ptr network_behavior) : router_(task_queue), link_(clock, task_queue, std::move(network_behavior), &router_) {} void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) { link_.OnPacketReceived(std::move(packet)); } void EmulatedNetworkNode::CreateRoute( const rtc::IPAddress& receiver_ip, std::vector nodes, EmulatedNetworkReceiverInterface* receiver) { RTC_CHECK(!nodes.empty()); for (size_t i = 0; i + 1 < nodes.size(); ++i) nodes[i]->router()->SetReceiver(receiver_ip, nodes[i + 1]); nodes.back()->router()->SetReceiver(receiver_ip, receiver); } void EmulatedNetworkNode::ClearRoute(const rtc::IPAddress& receiver_ip, std::vector nodes) { for (EmulatedNetworkNode* node : nodes) node->router()->RemoveReceiver(receiver_ip); } EmulatedNetworkNode::~EmulatedNetworkNode() = default; EmulatedEndpointImpl::EmulatedEndpointImpl(uint64_t id, const rtc::IPAddress& ip, bool is_enabled, rtc::AdapterType type, rtc::TaskQueue* task_queue, Clock* clock) : id_(id), peer_local_addr_(ip), is_enabled_(is_enabled), type_(type), clock_(clock), task_queue_(task_queue), router_(task_queue_), next_port_(kFirstEphemeralPort) { constexpr int kIPv4NetworkPrefixLength = 24; constexpr int kIPv6NetworkPrefixLength = 64; int prefix_length = 0; if (ip.family() == AF_INET) { prefix_length = kIPv4NetworkPrefixLength; } else if (ip.family() == AF_INET6) { prefix_length = kIPv6NetworkPrefixLength; } rtc::IPAddress prefix = TruncateIP(ip, prefix_length); network_ = std::make_unique( ip.ToString(), "Endpoint id=" + std::to_string(id_), prefix, prefix_length, type_); network_->AddIP(ip); enabled_state_checker_.Detach(); stats_.local_addresses.push_back(peer_local_addr_); } EmulatedEndpointImpl::~EmulatedEndpointImpl() = default; uint64_t EmulatedEndpointImpl::GetId() const { return id_; } void EmulatedEndpointImpl::SendPacket(const rtc::SocketAddress& from, const rtc::SocketAddress& to, rtc::CopyOnWriteBuffer packet_data, uint16_t application_overhead) { RTC_CHECK(from.ipaddr() == peer_local_addr_); EmulatedIpPacket packet(from, to, std::move(packet_data), clock_->CurrentTime(), application_overhead); task_queue_->PostTask([this, packet = std::move(packet)]() mutable { RTC_DCHECK_RUN_ON(task_queue_); Timestamp current_time = clock_->CurrentTime(); if (stats_.first_packet_sent_time.IsInfinite()) { stats_.first_packet_sent_time = current_time; stats_.first_sent_packet_size = DataSize::Bytes(packet.ip_packet_size()); } stats_.last_packet_sent_time = current_time; stats_.packets_sent++; stats_.bytes_sent += DataSize::Bytes(packet.ip_packet_size()); router_.OnPacketReceived(std::move(packet)); }); } absl::optional EmulatedEndpointImpl::BindReceiver( uint16_t desired_port, EmulatedNetworkReceiverInterface* receiver) { rtc::CritScope crit(&receiver_lock_); uint16_t port = desired_port; if (port == 0) { // Because client can specify its own port, next_port_ can be already in // use, so we need to find next available port. int ports_pool_size = std::numeric_limits::max() - kFirstEphemeralPort + 1; for (int i = 0; i < ports_pool_size; ++i) { uint16_t next_port = NextPort(); if (port_to_receiver_.find(next_port) == port_to_receiver_.end()) { port = next_port; break; } } } RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint " << id_; bool result = port_to_receiver_.insert({port, receiver}).second; if (!result) { RTC_LOG(INFO) << "Can't bind receiver to used port " << desired_port << " in endpoint " << id_; return absl::nullopt; } RTC_LOG(INFO) << "New receiver is binded to endpoint " << id_ << " on port " << port; return port; } uint16_t EmulatedEndpointImpl::NextPort() { uint16_t out = next_port_; if (next_port_ == std::numeric_limits::max()) { next_port_ = kFirstEphemeralPort; } else { next_port_++; } return out; } void EmulatedEndpointImpl::UnbindReceiver(uint16_t port) { rtc::CritScope crit(&receiver_lock_); port_to_receiver_.erase(port); } rtc::IPAddress EmulatedEndpointImpl::GetPeerLocalAddress() const { return peer_local_addr_; } void EmulatedEndpointImpl::OnPacketReceived(EmulatedIpPacket packet) { RTC_DCHECK_RUN_ON(task_queue_); RTC_CHECK(packet.to.ipaddr() == peer_local_addr_) << "Routing error: wrong destination endpoint. Packet.to.ipaddr()=: " << packet.to.ipaddr().ToString() << "; Receiver peer_local_addr_=" << peer_local_addr_.ToString(); rtc::CritScope crit(&receiver_lock_); UpdateReceiveStats(packet); auto it = port_to_receiver_.find(packet.to.port()); if (it == port_to_receiver_.end()) { // It can happen, that remote peer closed connection, but there still some // packets, that are going to it. It can happen during peer connection close // process: one peer closed connection, second still sending data. RTC_LOG(INFO) << "Drop packet: no receiver registered in " << id_ << " on port " << packet.to.port(); stats_.incoming_stats_per_source[packet.from.ipaddr()].packets_dropped++; stats_.incoming_stats_per_source[packet.from.ipaddr()].bytes_dropped += DataSize::Bytes(packet.ip_packet_size()); return; } // Endpoint assumes frequent calls to bind and unbind methods, so it holds // lock during packet processing to ensure that receiver won't be deleted // before call to OnPacketReceived. it->second->OnPacketReceived(std::move(packet)); } void EmulatedEndpointImpl::Enable() { RTC_DCHECK_RUN_ON(&enabled_state_checker_); RTC_CHECK(!is_enabled_); is_enabled_ = true; } void EmulatedEndpointImpl::Disable() { RTC_DCHECK_RUN_ON(&enabled_state_checker_); RTC_CHECK(is_enabled_); is_enabled_ = false; } bool EmulatedEndpointImpl::Enabled() const { RTC_DCHECK_RUN_ON(&enabled_state_checker_); return is_enabled_; } EmulatedNetworkStats EmulatedEndpointImpl::stats() { RTC_DCHECK_RUN_ON(task_queue_); return stats_; } void EmulatedEndpointImpl::UpdateReceiveStats(const EmulatedIpPacket& packet) { RTC_DCHECK_RUN_ON(task_queue_); Timestamp current_time = clock_->CurrentTime(); if (stats_.incoming_stats_per_source[packet.from.ipaddr()] .first_packet_received_time.IsInfinite()) { stats_.incoming_stats_per_source[packet.from.ipaddr()] .first_packet_received_time = current_time; stats_.incoming_stats_per_source[packet.from.ipaddr()] .first_received_packet_size = DataSize::Bytes(packet.ip_packet_size()); } stats_.incoming_stats_per_source[packet.from.ipaddr()] .last_packet_received_time = current_time; stats_.incoming_stats_per_source[packet.from.ipaddr()].packets_received++; stats_.incoming_stats_per_source[packet.from.ipaddr()].bytes_received += DataSize::Bytes(packet.ip_packet_size()); } EndpointsContainer::EndpointsContainer( const std::vector& endpoints) : endpoints_(endpoints) {} EmulatedEndpointImpl* EndpointsContainer::LookupByLocalAddress( const rtc::IPAddress& local_ip) const { for (auto* endpoint : endpoints_) { rtc::IPAddress peer_local_address = endpoint->GetPeerLocalAddress(); if (peer_local_address == local_ip) { return endpoint; } } RTC_CHECK(false) << "No network found for address" << local_ip.ToString(); } bool EndpointsContainer::HasEndpoint(EmulatedEndpointImpl* endpoint) const { for (auto* e : endpoints_) { if (e->GetId() == endpoint->GetId()) { return true; } } return false; } std::vector> EndpointsContainer::GetEnabledNetworks() const { std::vector> networks; for (auto* endpoint : endpoints_) { if (endpoint->Enabled()) { networks.emplace_back( std::make_unique(endpoint->network())); } } return networks; } EmulatedNetworkStats EndpointsContainer::GetStats() const { EmulatedNetworkStats stats; for (auto* endpoint : endpoints_) { EmulatedNetworkStats endpoint_stats = endpoint->stats(); stats.packets_sent += endpoint_stats.packets_sent; stats.bytes_sent += endpoint_stats.bytes_sent; if (stats.first_packet_sent_time > endpoint_stats.first_packet_sent_time) { stats.first_packet_sent_time = endpoint_stats.first_packet_sent_time; stats.first_sent_packet_size = endpoint_stats.first_sent_packet_size; } if (stats.last_packet_sent_time < endpoint_stats.last_packet_sent_time) { stats.last_packet_sent_time = endpoint_stats.last_packet_sent_time; } for (const rtc::IPAddress& addr : endpoint_stats.local_addresses) { stats.local_addresses.push_back(addr); } for (auto& entry : endpoint_stats.incoming_stats_per_source) { const EmulatedNetworkIncomingStats& source = entry.second; EmulatedNetworkIncomingStats& in_stats = stats.incoming_stats_per_source[entry.first]; in_stats.packets_received += source.packets_received; in_stats.bytes_received += source.bytes_received; in_stats.packets_dropped += source.packets_dropped; in_stats.bytes_dropped += source.bytes_dropped; if (in_stats.first_packet_received_time > source.first_packet_received_time) { in_stats.first_packet_received_time = source.first_packet_received_time; in_stats.first_received_packet_size = source.first_received_packet_size; } if (in_stats.last_packet_received_time < source.last_packet_received_time) { in_stats.last_packet_received_time = source.last_packet_received_time; } } } return stats; } } // namespace webrtc