/* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GrpcVehicleServer.h" #include #include #include #include #include #include "GarageModeServerSideHandler.h" #include "PowerStateListener.h" #include "VehicleServer.grpc.pb.h" #include "VehicleServer.pb.h" #include "vhal_v2_0/DefaultConfig.h" #include "vhal_v2_0/ProtoMessageConverter.h" namespace android { namespace hardware { namespace automotive { namespace vehicle { namespace V2_0 { namespace impl { class GrpcVehicleServerImpl : public GrpcVehicleServer, public vhal_proto::VehicleServer::Service { public: explicit GrpcVehicleServerImpl(const VirtualizedVhalServerInfo& serverInfo) : mServiceAddr(serverInfo.getServerUri()), mGarageModeHandler(makeGarageModeServerSideHandler(this, &mValueObjectPool, serverInfo.powerStateMarkerFilePath)), mPowerStateListener(serverInfo.powerStateSocket, serverInfo.powerStateMarkerFilePath) { setValuePool(&mValueObjectPool); } // method from GrpcVehicleServer GrpcVehicleServer& Start() override; void Wait() override; GrpcVehicleServer& Stop() override; uint32_t NumOfActivePropertyValueStream() override; // methods from IVehicleServer void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override; StatusCode onSetProperty(const VehiclePropValue& value, bool updateStatus) override; // methods from vhal_proto::VehicleServer::Service ::grpc::Status GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter* stream) override; ::grpc::Status SetProperty(::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue, vhal_proto::VehicleHalCallStatus* status) override; ::grpc::Status SendAllPropertyValuesToStream(::grpc::ServerContext* context, const ::google::protobuf::Empty*, ::google::protobuf::Empty*) override; ::grpc::Status StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter* stream) override; private: // We keep long-lasting connection for streaming the prop values. // For us, each connection can be represented as a function to send the new value, and // an ID to identify this connection struct ConnectionDescriptor { using ValueWriterType = std::function; explicit ConnectionDescriptor(ValueWriterType&& value_writer) : mValueWriter(std::move(value_writer)), mConnectionID(CONNECTION_ID_COUNTER.fetch_add(1)) {} ConnectionDescriptor(const ConnectionDescriptor&) = delete; ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete; // This move constructor is NOT THREAD-SAFE, which means it cannot be moved // while using. Since the connection descriptors are pretected by mConnectionMutex // then we are fine here ConnectionDescriptor(ConnectionDescriptor&& cd) : mValueWriter(std::move(cd.mValueWriter)), mConnectionID(cd.mConnectionID), mIsAlive(cd.mIsAlive.load()) { cd.mIsAlive.store(false); } ValueWriterType mValueWriter; uint64_t mConnectionID; std::atomic mIsAlive{true}; static std::atomic CONNECTION_ID_COUNTER; }; std::string mServiceAddr; std::unique_ptr<::grpc::Server> mServer{nullptr}; VehiclePropValuePool mValueObjectPool; std::unique_ptr mGarageModeHandler; PowerStateListener mPowerStateListener; std::thread mPowerStateListenerThread{}; mutable std::shared_mutex mConnectionMutex; mutable std::shared_mutex mWriterMutex; std::list mValueStreamingConnections; }; std::atomic GrpcVehicleServerImpl::ConnectionDescriptor::CONNECTION_ID_COUNTER = 0; static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() { // TODO(chenhaosjtuacm): get secured credentials here return ::grpc::InsecureServerCredentials(); } GrpcVehicleServerPtr makeGrpcVehicleServer(const VirtualizedVhalServerInfo& serverInfo) { return std::make_unique(serverInfo); } GrpcVehicleServer& GrpcVehicleServerImpl::Start() { if (mServer) { LOG(WARNING) << __func__ << ": GrpcVehicleServer has already started."; return *this; } ::grpc::ServerBuilder builder; builder.RegisterService(this); builder.AddListeningPort(mServiceAddr, getServerCredentials()); mServer = builder.BuildAndStart(); CHECK(mServer) << __func__ << ": failed to create the GRPC server, " << "please make sure the configuration and permissions are correct"; mPowerStateListenerThread = std::thread([this]() { mPowerStateListener.Listen(); }); return *this; } void GrpcVehicleServerImpl::Wait() { if (mServer) { mServer->Wait(); } if (mPowerStateListenerThread.joinable()) { mPowerStateListenerThread.join(); } mPowerStateListenerThread = {}; mServer.reset(); } GrpcVehicleServer& GrpcVehicleServerImpl::Stop() { if (!mServer) { LOG(WARNING) << __func__ << ": GrpcVehicleServer has not started."; return *this; } mServer->Shutdown(); mPowerStateListener.Stop(); return *this; } uint32_t GrpcVehicleServerImpl::NumOfActivePropertyValueStream() { std::shared_lock read_lock(mConnectionMutex); return mValueStreamingConnections.size(); } void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) { vhal_proto::WrappedVehiclePropValue wrappedPropValue; proto_msg_converter::toProto(wrappedPropValue.mutable_value(), value); wrappedPropValue.set_update_status(updateStatus); std::shared_lock read_lock(mConnectionMutex); bool has_terminated_connections = 0; for (auto& connection : mValueStreamingConnections) { auto writeOK = connection.mValueWriter(wrappedPropValue); if (!writeOK) { LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << connection.mConnectionID; has_terminated_connections = true; connection.mIsAlive.store(false); } } if (!has_terminated_connections) { return; } read_lock.unlock(); std::unique_lock write_lock(mConnectionMutex); for (auto itr = mValueStreamingConnections.begin(); itr != mValueStreamingConnections.end();) { if (!itr->mIsAlive.load()) { itr = mValueStreamingConnections.erase(itr); } else { ++itr; } } } StatusCode GrpcVehicleServerImpl::onSetProperty(const VehiclePropValue& value, bool updateStatus) { if (value.prop == AP_POWER_STATE_REPORT && value.value.int32Values[0] == toInt(VehicleApPowerStateReport::SHUTDOWN_POSTPONE)) { mGarageModeHandler->HandleHeartbeat(); } return GrpcVehicleServer::onSetProperty(value, updateStatus); } ::grpc::Status GrpcVehicleServerImpl::GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter* stream) { auto configs = onGetAllPropertyConfig(); for (auto& config : configs) { vhal_proto::VehiclePropConfig protoConfig; proto_msg_converter::toProto(&protoConfig, config); if (!stream->Write(protoConfig)) { return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } } return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleServerImpl::SetProperty( ::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue, vhal_proto::VehicleHalCallStatus* status) { VehiclePropValue value; proto_msg_converter::fromProto(&value, wrappedPropValue->value()); auto set_status = static_cast(onSetProperty(value, wrappedPropValue->update_status())); if (!vhal_proto::VehicleHalStatusCode_IsValid(set_status)) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "Unknown status code"); } status->set_status_code(static_cast(set_status)); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleServerImpl::SendAllPropertyValuesToStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty*, ::google::protobuf::Empty*) { sendAllValuesToClient(); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleServerImpl::StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter* stream) { std::mutex terminateMutex; std::condition_variable terminateCV; std::unique_lock terminateLock(terminateMutex); bool terminated{false}; auto callBack = [stream, &terminateMutex, &terminateCV, &terminated, this](const vhal_proto::WrappedVehiclePropValue& value) { std::unique_lock lock(mWriterMutex); if (!stream->Write(value)) { std::unique_lock terminateLock(terminateMutex); terminated = true; terminateLock.unlock(); terminateCV.notify_all(); return false; } return true; }; // Register connection std::unique_lock lock(mConnectionMutex); auto& conn = mValueStreamingConnections.emplace_back(std::move(callBack)); lock.unlock(); // Never stop until connection lost terminateCV.wait(terminateLock, [&terminated]() { return terminated; }); LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn.mConnectionID; return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } } // namespace impl } // namespace V2_0 } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android