通过 MQDescriptorSync 实现 HIDL 大数据传递的最佳实践
以下内容来自 Audio HIDL 播放流程,经过了部分修改,但尚未经过测试。
HIDL
struct WriteStatus {
Result retval;
union Reply {
uint64_t written; // WRITE command, amount of bytes written, >= 0.
} reply;
};
prepareWriting(uint32_t frameSize, uint32_t framesCount)
generates (Result retval, fmq_sync<uint8_t> dataMQ, fmq_sync<WriteStatus> statusMQ);
接收端
接收端需要创建 Data MessageQueue和 Status MessageQueue,并将它们通过回调传递给发送端。
typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ;
typedef MessageQueue<WriteStatus, kSynchronizedReadWrite> StatusMQ;
class WriteThread : public Thread {
public:
// WriteThread's lifespan never exceeds Device's lifespan.
WriteThread(audio_hw_device_t* device, StreamOut::DataMQ* dataMQ,
StreamOut::StatusMQ* statusMQ, EventFlag* efGroup)
: Thread(false /*canCallJava*/),
mDevice(device),
mDataMQ(dataMQ),
mStatusMQ(statusMQ),
mEfGroup(efGroup),
mBuffer(nullptr) {}
bool init() {
mBuffer.reset(new (std::nothrow) uint8_t[mDataMQ->getQuantumCount()]);
return mBuffer != nullptr;
}
virtual ~WriteThread() {}
private:
audio_hw_device_t* mDevice;
StreamOut::DataMQ* mDataMQ;
StreamOut::StatusMQ* mStatusMQ;
EventFlag* mEfGroup;
std::unique_ptr<uint8_t[]> mBuffer;
IDevice::WriteStatus mStatus;
bool threadLoop() override;
void doWrite();
};
void WriteThread::doWrite() {
const size_t availToRead = mDataMQ->availableToRead();
mStatus.retval = Result::OK;
mStatus.reply.written = 0;
if (mDataMQ->read(&mBuffer[0], availToRead)) {
ssize_t writeResult = mDevice->virtualMicWrite(&mBuffer[0], availToRead);
if (writeResult >= 0) {
mStatus.reply.written = writeResult;
} else {
mStatus.retval = Device::analyzeStatus("virtualMicWrite", writeResult);
}
}
}
bool WriteThread::threadLoop() {
// This implementation doesn't return control back to the Thread until it
// decides to stop,
// as the Thread uses mutexes, and this can lead to priority inversion.
while (true) {
uint32_t efState = 0;
mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState);
if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
continue; // Nothing to do.
}
doWrite();
if (!mStatusMQ->write(&mStatus)) {
ALOGE("status message queue write failed");
}
mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
}
return false;
}
Return<void> Device::prepareForWriting(uint32_t frameSize, uint32_t framesCount,
prepareForWriting_cb _hidl_cb) {
status_t status;
// Wrap the _hidl_cb to return an error
auto sendError = [&_hidl_cb] (Result result) {
_hidl_cb(result, DataMQ::Descriptor(), StatusMQ::Descriptor()); };
if (mDataMQ) {
ALOGE("the client attempts to call prepareForWriting twice");
sendError(Result::INVALID_STATE);
return Void();
}
// Check frameSize and framesCount
if (frameSize == 0 || framesCount == 0) {
ALOGE("Null frameSize (%u) or framesCount (%u)", frameSize, framesCount);
sendError(Result::INVALID_ARGUMENTS);
return Void();
}
if (frameSize > Stream::MAX_BUFFER_SIZE / framesCount) {
ALOGE("Buffer too big: %u*%u bytes > MAX_BUFFER_SIZE (%u)", frameSize, framesCount,
Stream::MAX_BUFFER_SIZE);
sendError(Result::INVALID_ARGUMENTS);
return Void();
}
std::unique_ptr<DataMQ> tempDataMQ(new DataMQ(frameSize * framesCount, true /* EventFlag */));
std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
sendError(Result::INVALID_ARGUMENTS);
return Void();
}
EventFlag* tempRawEfGroup{};
status = EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &tempRawEfGroup);
std::unique_ptr<EventFlag, void (*)(EventFlag*)> tempElfGroup(
tempRawEfGroup, [](auto* ef) { EventFlag::deleteEventFlag(&ef); });
if (status != OK || !tempElfGroup) {
ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
sendError(Result::INVALID_ARGUMENTS);
return Void();
}
// Create and launch the thread.
auto tempWriteThread = sp<WriteThread>::make(
mDevice, tempDataMQ.get(), tempStatusMQ.get(), tempElfGroup.get());
if (!tempWriteThread->init()) {
ALOGW("failed to start writer thread: %s", strerror(-status));
sendError(Result::INVALID_ARGUMENTS);
return Void();
}
status = tempWriteThread->run("writer", PRIORITY_URGENT_AUDIO);
if (status != OK) {
ALOGW("failed to start writer thread: %s", strerror(-status));
sendError(Result::INVALID_ARGUMENTS);
return Void();
}
mDataMQ = std::move(tempDataMQ);
mStatusMQ = std::move(tempStatusMQ);
mWriteThread = tempWriteThread;
mEfGroup = tempElfGroup.release();
_hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc());
return Void();
}
发送端
这是 BpBinder 的部分回调代码,返回的是 MQDescriptorSync 对象。
const ::android::hardware::MQDescriptorSync<uint8_t>* _hidl_out_dataMQ;
const ::android::hardware::MQDescriptorSync<::android::hardware::audio::V7_0::IStreamOut::WriteStatus>* _hidl_out_statusMQ;
size_t _hidl__hidl_out_dataMQ_parent;
_hidl_err = _hidl_reply.readBuffer(sizeof(*_hidl_out_dataMQ), &_hidl__hidl_out_dataMQ_parent, reinterpret_cast<const void **>(&_hidl_out_dataMQ));
if (_hidl_err != ::android::OK) { return; }
_hidl_err = ::android::hardware::readEmbeddedFromParcel(const_cast<::android::hardware::MQDescriptorSync<uint8_t> &>(*_hidl_out_dataMQ), _hidl_reply, _hidl__hidl_out_dataMQ_parent, 0 /* parentOffset */);
if (_hidl_err != ::android::OK) { return; }
size_t _hidl__hidl_out_statusMQ_parent;
_hidl_err = _hidl_reply.readBuffer(sizeof(*_hidl_out_statusMQ), &_hidl__hidl_out_statusMQ_parent, reinterpret_cast<const void **>(&_hidl_out_statusMQ));
if (_hidl_err != ::android::OK) { return; }
_hidl_err = ::android::hardware::readEmbeddedFromParcel(const_cast<::android::hardware::MQDescriptorSync<::android::hardware::audio::V7_0::IStreamOut::WriteStatus> &>(*_hidl_out_statusMQ), _hidl_reply, _hidl__hidl_out_statusMQ_parent, 0 /* parentOffset */);
if (_hidl_err != ::android::OK) { return; }
status_t DeviceHalHidl::virtualMicWrite(const void *buffer, size_t bytes, size_t *written) {
// TIME_CHECK(); // TODO(b/243839867) reenable only when optimized.
if (mDevice == 0) return NO_INIT;
*written = 0;
if (bytes == 0 && !mDataMQ) {
// Can't determine the size for the MQ buffer. Wait for a non-empty write request.
ALOGW("First call to async write with 0 bytes");
return OK;
}
status_t status;
if (!mDataMQ) {
// In case if playback starts close to the end of a compressed track, the bytes
// that need to be written is less than the actual buffer size. Need to use
// full buffer size for the MQ since otherwise after seeking back to the middle
// data will be truncated.
if ((status = prepareForWriting(bufferSize)) != OK) {
return status;
}
}
status = callWriterThread("write", static_cast<const uint8_t*>(buffer), bytes,
[&] (const WriteStatus& writeStatus) {
*written = writeStatus.reply.written;
// Diagnostics of the cause of b/35813113.
ALOGE_IF(*written > bytes,
"hal reports more bytes written than asked for: %lld > %lld",
(long long)*written, (long long)bytes);
});
return status;
}
status_t DeviceHalHidl::callWriterThread(const char* cmdName,
const uint8_t* data, size_t dataSize, StreamOutHalHidl::WriterCallback callback) {
if (data != nullptr) {
size_t availableToWrite = mDataMQ->availableToWrite();
if (dataSize > availableToWrite) {
ALOGW("truncating write data from %lld to %lld due to insufficient data queue space",
(long long)dataSize, (long long)availableToWrite);
dataSize = availableToWrite;
}
if (!mDataMQ->write(data, dataSize)) {
ALOGE("data message queue write failed for \"%s\"", cmdName);
}
}
mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
// TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
uint32_t efState = 0;
retry:
status_t ret = mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState);
if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)) {
WriteStatus writeStatus;
writeStatus.retval = Result::NOT_INITIALIZED;
if (!mStatusMQ->read(&writeStatus)) {
ALOGE("status message read failed for \"%s\"", cmdName);
}
if (writeStatus.retval == Result::OK) {
ret = OK;
callback(writeStatus);
} else {
ret = processReturn(cmdName, writeStatus.retval);
}
return ret;
}
if (ret == -EAGAIN || ret == -EINTR) {
// Spurious wakeup. This normally retries no more than once.
goto retry;
}
return ret;
}
status_t DeviceHalHidl::prepareForWriting(size_t bufferSize) {
std::unique_ptr<DataMQ> tempDataMQ;
std::unique_ptr<StatusMQ> tempStatusMQ;
Result retval;
Return<void> ret = mDevice->prepareForWriting(
1, bufferSize,
[&](Result r, const DataMQ::Descriptor& dataMQ, const StatusMQ::Descriptor& statusMQ) {
retval = r;
if (retval == Result::OK) {
tempDataMQ.reset(new DataMQ(dataMQ));
tempStatusMQ.reset(new StatusMQ(statusMQ));
if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
}
}
});
if (!ret.isOk() || retval != Result::OK) {
return processReturn("prepareForWriting", ret, retval);
}
if (!tempDataMQ || !tempDataMQ->isValid() ||
!tempStatusMQ || !tempStatusMQ->isValid() || !mEfGroup) {
ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for writing");
ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for writing is invalid");
ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for writing");
ALOGE_IF(tempStatusMQ && !tempStatusMQ->isValid(),
"Status message queue for writing is invalid");
ALOGE_IF(!mEfGroup, "Event flag creation for writing failed");
return NO_INIT;
}
mDataMQ = std::move(tempDataMQ);
mStatusMQ = std::move(tempStatusMQ);
return OK;
}
https://source.android.google.cn/docs/core/architecture/hidl/fmq?hl=zh-cn
原文地址:https://blog.csdn.net/daixiang789/article/details/142388898
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!