You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
306 lines
11 KiB
306 lines
11 KiB
/*
|
|
* Copyright (C) 2019 The Android Open Source Project
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
#include "GrpcVehicleServer.h"
|
|
|
|
#include <condition_variable>
|
|
#include <mutex>
|
|
#include <shared_mutex>
|
|
|
|
#include <android-base/logging.h>
|
|
#include <grpc++/grpc++.h>
|
|
|
|
#include "GarageModeServerSideHandler.h"
|
|
#include "PowerStateListener.h"
|
|
#include "VehicleServer.grpc.pb.h"
|
|
#include "VehicleServer.pb.h"
|
|
#include "vhal_v2_0/DefaultConfig.h"
|
|
#include "vhal_v2_0/ProtoMessageConverter.h"
|
|
|
|
namespace android {
|
|
namespace hardware {
|
|
namespace automotive {
|
|
namespace vehicle {
|
|
namespace V2_0 {
|
|
|
|
namespace impl {
|
|
|
|
class GrpcVehicleServerImpl : public GrpcVehicleServer, public vhal_proto::VehicleServer::Service {
|
|
public:
|
|
explicit GrpcVehicleServerImpl(const VirtualizedVhalServerInfo& serverInfo)
|
|
: mServiceAddr(serverInfo.getServerUri()),
|
|
mGarageModeHandler(makeGarageModeServerSideHandler(this, &mValueObjectPool,
|
|
serverInfo.powerStateMarkerFilePath)),
|
|
mPowerStateListener(serverInfo.powerStateSocket, serverInfo.powerStateMarkerFilePath) {
|
|
setValuePool(&mValueObjectPool);
|
|
}
|
|
|
|
// method from GrpcVehicleServer
|
|
GrpcVehicleServer& Start() override;
|
|
|
|
void Wait() override;
|
|
|
|
GrpcVehicleServer& Stop() override;
|
|
|
|
uint32_t NumOfActivePropertyValueStream() override;
|
|
|
|
// methods from IVehicleServer
|
|
void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override;
|
|
|
|
StatusCode onSetProperty(const VehiclePropValue& value, bool updateStatus) override;
|
|
|
|
// methods from vhal_proto::VehicleServer::Service
|
|
|
|
::grpc::Status GetAllPropertyConfig(
|
|
::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
|
|
::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) override;
|
|
|
|
::grpc::Status SetProperty(::grpc::ServerContext* context,
|
|
const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
|
|
vhal_proto::VehicleHalCallStatus* status) override;
|
|
|
|
::grpc::Status SendAllPropertyValuesToStream(::grpc::ServerContext* context,
|
|
const ::google::protobuf::Empty*,
|
|
::google::protobuf::Empty*) override;
|
|
|
|
::grpc::Status StartPropertyValuesStream(
|
|
::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
|
|
::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) override;
|
|
|
|
private:
|
|
// We keep long-lasting connection for streaming the prop values.
|
|
// For us, each connection can be represented as a function to send the new value, and
|
|
// an ID to identify this connection
|
|
struct ConnectionDescriptor {
|
|
using ValueWriterType = std::function<bool(const vhal_proto::WrappedVehiclePropValue&)>;
|
|
|
|
explicit ConnectionDescriptor(ValueWriterType&& value_writer)
|
|
: mValueWriter(std::move(value_writer)),
|
|
mConnectionID(CONNECTION_ID_COUNTER.fetch_add(1)) {}
|
|
|
|
ConnectionDescriptor(const ConnectionDescriptor&) = delete;
|
|
|
|
ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete;
|
|
|
|
// This move constructor is NOT THREAD-SAFE, which means it cannot be moved
|
|
// while using. Since the connection descriptors are pretected by mConnectionMutex
|
|
// then we are fine here
|
|
ConnectionDescriptor(ConnectionDescriptor&& cd)
|
|
: mValueWriter(std::move(cd.mValueWriter)),
|
|
mConnectionID(cd.mConnectionID),
|
|
mIsAlive(cd.mIsAlive.load()) {
|
|
cd.mIsAlive.store(false);
|
|
}
|
|
|
|
ValueWriterType mValueWriter;
|
|
uint64_t mConnectionID;
|
|
std::atomic<bool> mIsAlive{true};
|
|
|
|
static std::atomic<uint64_t> CONNECTION_ID_COUNTER;
|
|
};
|
|
|
|
std::string mServiceAddr;
|
|
std::unique_ptr<::grpc::Server> mServer{nullptr};
|
|
VehiclePropValuePool mValueObjectPool;
|
|
std::unique_ptr<GarageModeServerSideHandler> mGarageModeHandler;
|
|
PowerStateListener mPowerStateListener;
|
|
std::thread mPowerStateListenerThread{};
|
|
mutable std::shared_mutex mConnectionMutex;
|
|
mutable std::shared_mutex mWriterMutex;
|
|
std::list<ConnectionDescriptor> mValueStreamingConnections;
|
|
};
|
|
|
|
std::atomic<uint64_t> GrpcVehicleServerImpl::ConnectionDescriptor::CONNECTION_ID_COUNTER = 0;
|
|
|
|
static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
|
|
// TODO(chenhaosjtuacm): get secured credentials here
|
|
return ::grpc::InsecureServerCredentials();
|
|
}
|
|
|
|
GrpcVehicleServerPtr makeGrpcVehicleServer(const VirtualizedVhalServerInfo& serverInfo) {
|
|
return std::make_unique<GrpcVehicleServerImpl>(serverInfo);
|
|
}
|
|
|
|
GrpcVehicleServer& GrpcVehicleServerImpl::Start() {
|
|
if (mServer) {
|
|
LOG(WARNING) << __func__ << ": GrpcVehicleServer has already started.";
|
|
return *this;
|
|
}
|
|
|
|
::grpc::ServerBuilder builder;
|
|
builder.RegisterService(this);
|
|
builder.AddListeningPort(mServiceAddr, getServerCredentials());
|
|
mServer = builder.BuildAndStart();
|
|
|
|
CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
|
|
<< "please make sure the configuration and permissions are correct";
|
|
|
|
mPowerStateListenerThread = std::thread([this]() { mPowerStateListener.Listen(); });
|
|
return *this;
|
|
}
|
|
|
|
void GrpcVehicleServerImpl::Wait() {
|
|
if (mServer) {
|
|
mServer->Wait();
|
|
}
|
|
|
|
if (mPowerStateListenerThread.joinable()) {
|
|
mPowerStateListenerThread.join();
|
|
}
|
|
|
|
mPowerStateListenerThread = {};
|
|
mServer.reset();
|
|
}
|
|
|
|
GrpcVehicleServer& GrpcVehicleServerImpl::Stop() {
|
|
if (!mServer) {
|
|
LOG(WARNING) << __func__ << ": GrpcVehicleServer has not started.";
|
|
return *this;
|
|
}
|
|
|
|
mServer->Shutdown();
|
|
mPowerStateListener.Stop();
|
|
return *this;
|
|
}
|
|
|
|
uint32_t GrpcVehicleServerImpl::NumOfActivePropertyValueStream() {
|
|
std::shared_lock read_lock(mConnectionMutex);
|
|
return mValueStreamingConnections.size();
|
|
}
|
|
|
|
void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value,
|
|
bool updateStatus) {
|
|
vhal_proto::WrappedVehiclePropValue wrappedPropValue;
|
|
proto_msg_converter::toProto(wrappedPropValue.mutable_value(), value);
|
|
wrappedPropValue.set_update_status(updateStatus);
|
|
std::shared_lock read_lock(mConnectionMutex);
|
|
|
|
bool has_terminated_connections = 0;
|
|
|
|
for (auto& connection : mValueStreamingConnections) {
|
|
auto writeOK = connection.mValueWriter(wrappedPropValue);
|
|
if (!writeOK) {
|
|
LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: "
|
|
<< connection.mConnectionID;
|
|
has_terminated_connections = true;
|
|
connection.mIsAlive.store(false);
|
|
}
|
|
}
|
|
|
|
if (!has_terminated_connections) {
|
|
return;
|
|
}
|
|
|
|
read_lock.unlock();
|
|
|
|
std::unique_lock write_lock(mConnectionMutex);
|
|
|
|
for (auto itr = mValueStreamingConnections.begin(); itr != mValueStreamingConnections.end();) {
|
|
if (!itr->mIsAlive.load()) {
|
|
itr = mValueStreamingConnections.erase(itr);
|
|
} else {
|
|
++itr;
|
|
}
|
|
}
|
|
}
|
|
|
|
StatusCode GrpcVehicleServerImpl::onSetProperty(const VehiclePropValue& value, bool updateStatus) {
|
|
if (value.prop == AP_POWER_STATE_REPORT &&
|
|
value.value.int32Values[0] == toInt(VehicleApPowerStateReport::SHUTDOWN_POSTPONE)) {
|
|
mGarageModeHandler->HandleHeartbeat();
|
|
}
|
|
return GrpcVehicleServer::onSetProperty(value, updateStatus);
|
|
}
|
|
|
|
::grpc::Status GrpcVehicleServerImpl::GetAllPropertyConfig(
|
|
::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
|
|
::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) {
|
|
auto configs = onGetAllPropertyConfig();
|
|
for (auto& config : configs) {
|
|
vhal_proto::VehiclePropConfig protoConfig;
|
|
proto_msg_converter::toProto(&protoConfig, config);
|
|
if (!stream->Write(protoConfig)) {
|
|
return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
|
|
}
|
|
}
|
|
|
|
return ::grpc::Status::OK;
|
|
}
|
|
|
|
::grpc::Status GrpcVehicleServerImpl::SetProperty(
|
|
::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
|
|
vhal_proto::VehicleHalCallStatus* status) {
|
|
VehiclePropValue value;
|
|
proto_msg_converter::fromProto(&value, wrappedPropValue->value());
|
|
|
|
auto set_status = static_cast<int32_t>(onSetProperty(value, wrappedPropValue->update_status()));
|
|
if (!vhal_proto::VehicleHalStatusCode_IsValid(set_status)) {
|
|
return ::grpc::Status(::grpc::StatusCode::INTERNAL, "Unknown status code");
|
|
}
|
|
|
|
status->set_status_code(static_cast<vhal_proto::VehicleHalStatusCode>(set_status));
|
|
|
|
return ::grpc::Status::OK;
|
|
}
|
|
|
|
::grpc::Status GrpcVehicleServerImpl::SendAllPropertyValuesToStream(
|
|
::grpc::ServerContext* context, const ::google::protobuf::Empty*,
|
|
::google::protobuf::Empty*) {
|
|
sendAllValuesToClient();
|
|
return ::grpc::Status::OK;
|
|
}
|
|
|
|
::grpc::Status GrpcVehicleServerImpl::StartPropertyValuesStream(
|
|
::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
|
|
::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) {
|
|
std::mutex terminateMutex;
|
|
std::condition_variable terminateCV;
|
|
std::unique_lock<std::mutex> terminateLock(terminateMutex);
|
|
bool terminated{false};
|
|
|
|
auto callBack = [stream, &terminateMutex, &terminateCV, &terminated,
|
|
this](const vhal_proto::WrappedVehiclePropValue& value) {
|
|
std::unique_lock lock(mWriterMutex);
|
|
if (!stream->Write(value)) {
|
|
std::unique_lock<std::mutex> terminateLock(terminateMutex);
|
|
terminated = true;
|
|
terminateLock.unlock();
|
|
terminateCV.notify_all();
|
|
return false;
|
|
}
|
|
return true;
|
|
};
|
|
|
|
// Register connection
|
|
std::unique_lock lock(mConnectionMutex);
|
|
auto& conn = mValueStreamingConnections.emplace_back(std::move(callBack));
|
|
lock.unlock();
|
|
|
|
// Never stop until connection lost
|
|
terminateCV.wait(terminateLock, [&terminated]() { return terminated; });
|
|
|
|
LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn.mConnectionID;
|
|
|
|
return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
|
|
}
|
|
|
|
} // namespace impl
|
|
|
|
} // namespace V2_0
|
|
} // namespace vehicle
|
|
} // namespace automotive
|
|
} // namespace hardware
|
|
} // namespace android
|