/* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GrpcVehicleClient.h" #include #include #include #include #include #include "VehicleServer.grpc.pb.h" #include "VehicleServer.pb.h" #include "vhal_v2_0/DefaultConfig.h" #include "vhal_v2_0/ProtoMessageConverter.h" namespace android { namespace hardware { namespace automotive { namespace vehicle { namespace V2_0 { namespace impl { static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() { // TODO(chenhaosjtuacm): get secured credentials here return ::grpc::InsecureChannelCredentials(); } class GrpcVehicleClientImpl : public VehicleHalClient { public: explicit GrpcVehicleClientImpl(const std::string& addr) : mServiceAddr(addr), mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())), mGrpcStub(vhal_proto::VehicleServer::NewStub(mGrpcChannel)) { StartValuePollingThread(); } ~GrpcVehicleClientImpl() { mShuttingDownFlag.store(true); mShutdownCV.notify_all(); if (mPollingThread.joinable()) { mPollingThread.join(); } } // methods from IVehicleClient std::vector getAllPropertyConfig() const override; StatusCode setProperty(const VehiclePropValue& value, bool updateStatus) override; // methods from VehicleHalClient void triggerSendAllValues() override; private: void StartValuePollingThread(); // private data members std::string mServiceAddr; std::shared_ptr<::grpc::Channel> mGrpcChannel; std::unique_ptr mGrpcStub; std::thread mPollingThread; std::mutex mShutdownMutex; std::condition_variable mShutdownCV; std::atomic mShuttingDownFlag{false}; }; std::unique_ptr makeGrpcVehicleClient(const std::string& addr) { return std::make_unique(addr); } std::vector GrpcVehicleClientImpl::getAllPropertyConfig() const { std::vector configs; ::grpc::ClientContext context; auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty()); vhal_proto::VehiclePropConfig protoConfig; while (config_stream->Read(&protoConfig)) { VehiclePropConfig config; proto_msg_converter::fromProto(&config, protoConfig); configs.emplace_back(std::move(config)); } auto grpc_status = config_stream->Finish(); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message(); configs.clear(); } return configs; } StatusCode GrpcVehicleClientImpl::setProperty(const VehiclePropValue& value, bool updateStatus) { ::grpc::ClientContext context; vhal_proto::WrappedVehiclePropValue wrappedProtoValue; vhal_proto::VehicleHalCallStatus vhal_status; proto_msg_converter::toProto(wrappedProtoValue.mutable_value(), value); wrappedProtoValue.set_update_status(updateStatus); auto grpc_status = mGrpcStub->SetProperty(&context, wrappedProtoValue, &vhal_status); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC SetProperty Failed: " << grpc_status.error_message(); return StatusCode::INTERNAL_ERROR; } return static_cast(vhal_status.status_code()); } void GrpcVehicleClientImpl::triggerSendAllValues() { ::grpc::ClientContext context; ::google::protobuf::Empty empty_response; auto grpc_status = mGrpcStub->SendAllPropertyValuesToStream( &context, ::google::protobuf::Empty(), &empty_response); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC SendAllPropertyValuesToStream Failed: " << grpc_status.error_message(); } } void GrpcVehicleClientImpl::StartValuePollingThread() { mPollingThread = std::thread([this]() { while (!mShuttingDownFlag.load()) { ::grpc::ClientContext context; std::atomic rpc_ok{true}; std::thread shuttingdown_watcher([this, &rpc_ok, &context]() { std::unique_lock shutdownLock(mShutdownMutex); mShutdownCV.wait(shutdownLock, [this, &rpc_ok]() { return !rpc_ok.load() || mShuttingDownFlag.load(); }); context.TryCancel(); }); auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty()); LOG(INFO) << __func__ << ": GRPC Value Streaming Started"; vhal_proto::WrappedVehiclePropValue wrappedProtoValue; while (!mShuttingDownFlag.load() && value_stream->Read(&wrappedProtoValue)) { VehiclePropValue value; proto_msg_converter::fromProto(&value, wrappedProtoValue.value()); onPropertyValue(value, wrappedProtoValue.update_status()); } rpc_ok.store(false); mShutdownCV.notify_all(); shuttingdown_watcher.join(); auto grpc_status = value_stream->Finish(); // never reach here until connection lost LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message(); // try to reconnect } }); } } // namespace impl } // namespace V2_0 } // namespace vehicle } // namespace automotive } // namespace hardware } // namespace android