You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

306 lines
11 KiB

/*
* Copyright (C) 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "GrpcVehicleServer.h"
#include <condition_variable>
#include <mutex>
#include <shared_mutex>
#include <android-base/logging.h>
#include <grpc++/grpc++.h>
#include "GarageModeServerSideHandler.h"
#include "PowerStateListener.h"
#include "VehicleServer.grpc.pb.h"
#include "VehicleServer.pb.h"
#include "vhal_v2_0/DefaultConfig.h"
#include "vhal_v2_0/ProtoMessageConverter.h"
namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {
namespace V2_0 {
namespace impl {
class GrpcVehicleServerImpl : public GrpcVehicleServer, public vhal_proto::VehicleServer::Service {
public:
explicit GrpcVehicleServerImpl(const VirtualizedVhalServerInfo& serverInfo)
: mServiceAddr(serverInfo.getServerUri()),
mGarageModeHandler(makeGarageModeServerSideHandler(this, &mValueObjectPool,
serverInfo.powerStateMarkerFilePath)),
mPowerStateListener(serverInfo.powerStateSocket, serverInfo.powerStateMarkerFilePath) {
setValuePool(&mValueObjectPool);
}
// method from GrpcVehicleServer
GrpcVehicleServer& Start() override;
void Wait() override;
GrpcVehicleServer& Stop() override;
uint32_t NumOfActivePropertyValueStream() override;
// methods from IVehicleServer
void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override;
StatusCode onSetProperty(const VehiclePropValue& value, bool updateStatus) override;
// methods from vhal_proto::VehicleServer::Service
::grpc::Status GetAllPropertyConfig(
::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) override;
::grpc::Status SetProperty(::grpc::ServerContext* context,
const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
vhal_proto::VehicleHalCallStatus* status) override;
::grpc::Status SendAllPropertyValuesToStream(::grpc::ServerContext* context,
const ::google::protobuf::Empty*,
::google::protobuf::Empty*) override;
::grpc::Status StartPropertyValuesStream(
::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) override;
private:
// We keep long-lasting connection for streaming the prop values.
// For us, each connection can be represented as a function to send the new value, and
// an ID to identify this connection
struct ConnectionDescriptor {
using ValueWriterType = std::function<bool(const vhal_proto::WrappedVehiclePropValue&)>;
explicit ConnectionDescriptor(ValueWriterType&& value_writer)
: mValueWriter(std::move(value_writer)),
mConnectionID(CONNECTION_ID_COUNTER.fetch_add(1)) {}
ConnectionDescriptor(const ConnectionDescriptor&) = delete;
ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete;
// This move constructor is NOT THREAD-SAFE, which means it cannot be moved
// while using. Since the connection descriptors are pretected by mConnectionMutex
// then we are fine here
ConnectionDescriptor(ConnectionDescriptor&& cd)
: mValueWriter(std::move(cd.mValueWriter)),
mConnectionID(cd.mConnectionID),
mIsAlive(cd.mIsAlive.load()) {
cd.mIsAlive.store(false);
}
ValueWriterType mValueWriter;
uint64_t mConnectionID;
std::atomic<bool> mIsAlive{true};
static std::atomic<uint64_t> CONNECTION_ID_COUNTER;
};
std::string mServiceAddr;
std::unique_ptr<::grpc::Server> mServer{nullptr};
VehiclePropValuePool mValueObjectPool;
std::unique_ptr<GarageModeServerSideHandler> mGarageModeHandler;
PowerStateListener mPowerStateListener;
std::thread mPowerStateListenerThread{};
mutable std::shared_mutex mConnectionMutex;
mutable std::shared_mutex mWriterMutex;
std::list<ConnectionDescriptor> mValueStreamingConnections;
};
std::atomic<uint64_t> GrpcVehicleServerImpl::ConnectionDescriptor::CONNECTION_ID_COUNTER = 0;
static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
// TODO(chenhaosjtuacm): get secured credentials here
return ::grpc::InsecureServerCredentials();
}
GrpcVehicleServerPtr makeGrpcVehicleServer(const VirtualizedVhalServerInfo& serverInfo) {
return std::make_unique<GrpcVehicleServerImpl>(serverInfo);
}
GrpcVehicleServer& GrpcVehicleServerImpl::Start() {
if (mServer) {
LOG(WARNING) << __func__ << ": GrpcVehicleServer has already started.";
return *this;
}
::grpc::ServerBuilder builder;
builder.RegisterService(this);
builder.AddListeningPort(mServiceAddr, getServerCredentials());
mServer = builder.BuildAndStart();
CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
<< "please make sure the configuration and permissions are correct";
mPowerStateListenerThread = std::thread([this]() { mPowerStateListener.Listen(); });
return *this;
}
void GrpcVehicleServerImpl::Wait() {
if (mServer) {
mServer->Wait();
}
if (mPowerStateListenerThread.joinable()) {
mPowerStateListenerThread.join();
}
mPowerStateListenerThread = {};
mServer.reset();
}
GrpcVehicleServer& GrpcVehicleServerImpl::Stop() {
if (!mServer) {
LOG(WARNING) << __func__ << ": GrpcVehicleServer has not started.";
return *this;
}
mServer->Shutdown();
mPowerStateListener.Stop();
return *this;
}
uint32_t GrpcVehicleServerImpl::NumOfActivePropertyValueStream() {
std::shared_lock read_lock(mConnectionMutex);
return mValueStreamingConnections.size();
}
void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value,
bool updateStatus) {
vhal_proto::WrappedVehiclePropValue wrappedPropValue;
proto_msg_converter::toProto(wrappedPropValue.mutable_value(), value);
wrappedPropValue.set_update_status(updateStatus);
std::shared_lock read_lock(mConnectionMutex);
bool has_terminated_connections = 0;
for (auto& connection : mValueStreamingConnections) {
auto writeOK = connection.mValueWriter(wrappedPropValue);
if (!writeOK) {
LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: "
<< connection.mConnectionID;
has_terminated_connections = true;
connection.mIsAlive.store(false);
}
}
if (!has_terminated_connections) {
return;
}
read_lock.unlock();
std::unique_lock write_lock(mConnectionMutex);
for (auto itr = mValueStreamingConnections.begin(); itr != mValueStreamingConnections.end();) {
if (!itr->mIsAlive.load()) {
itr = mValueStreamingConnections.erase(itr);
} else {
++itr;
}
}
}
StatusCode GrpcVehicleServerImpl::onSetProperty(const VehiclePropValue& value, bool updateStatus) {
if (value.prop == AP_POWER_STATE_REPORT &&
value.value.int32Values[0] == toInt(VehicleApPowerStateReport::SHUTDOWN_POSTPONE)) {
mGarageModeHandler->HandleHeartbeat();
}
return GrpcVehicleServer::onSetProperty(value, updateStatus);
}
::grpc::Status GrpcVehicleServerImpl::GetAllPropertyConfig(
::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) {
auto configs = onGetAllPropertyConfig();
for (auto& config : configs) {
vhal_proto::VehiclePropConfig protoConfig;
proto_msg_converter::toProto(&protoConfig, config);
if (!stream->Write(protoConfig)) {
return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
}
}
return ::grpc::Status::OK;
}
::grpc::Status GrpcVehicleServerImpl::SetProperty(
::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
vhal_proto::VehicleHalCallStatus* status) {
VehiclePropValue value;
proto_msg_converter::fromProto(&value, wrappedPropValue->value());
auto set_status = static_cast<int32_t>(onSetProperty(value, wrappedPropValue->update_status()));
if (!vhal_proto::VehicleHalStatusCode_IsValid(set_status)) {
return ::grpc::Status(::grpc::StatusCode::INTERNAL, "Unknown status code");
}
status->set_status_code(static_cast<vhal_proto::VehicleHalStatusCode>(set_status));
return ::grpc::Status::OK;
}
::grpc::Status GrpcVehicleServerImpl::SendAllPropertyValuesToStream(
::grpc::ServerContext* context, const ::google::protobuf::Empty*,
::google::protobuf::Empty*) {
sendAllValuesToClient();
return ::grpc::Status::OK;
}
::grpc::Status GrpcVehicleServerImpl::StartPropertyValuesStream(
::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) {
std::mutex terminateMutex;
std::condition_variable terminateCV;
std::unique_lock<std::mutex> terminateLock(terminateMutex);
bool terminated{false};
auto callBack = [stream, &terminateMutex, &terminateCV, &terminated,
this](const vhal_proto::WrappedVehiclePropValue& value) {
std::unique_lock lock(mWriterMutex);
if (!stream->Write(value)) {
std::unique_lock<std::mutex> terminateLock(terminateMutex);
terminated = true;
terminateLock.unlock();
terminateCV.notify_all();
return false;
}
return true;
};
// Register connection
std::unique_lock lock(mConnectionMutex);
auto& conn = mValueStreamingConnections.emplace_back(std::move(callBack));
lock.unlock();
// Never stop until connection lost
terminateCV.wait(terminateLock, [&terminated]() { return terminated; });
LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn.mConnectionID;
return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
}
} // namespace impl
} // namespace V2_0
} // namespace vehicle
} // namespace automotive
} // namespace hardware
} // namespace android