自学内容网 自学内容网

通过 MQDescriptorSync 实现 HIDL 大数据传递的最佳实践

以下内容来自 Audio HIDL 播放流程,经过了部分修改,但尚未经过测试。

HIDL

    struct WriteStatus {
        Result retval;
        union Reply {
            uint64_t written;  // WRITE command, amount of bytes written, >= 0.
        } reply;
    };
    prepareWriting(uint32_t frameSize, uint32_t framesCount)
            generates (Result retval, fmq_sync<uint8_t> dataMQ, fmq_sync<WriteStatus> statusMQ);

接收端

接收端需要创建 Data MessageQueue和 Status MessageQueue,并将它们通过回调传递给发送端。

typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ;
typedef MessageQueue<WriteStatus, kSynchronizedReadWrite> StatusMQ;
class WriteThread : public Thread {
   public:
    // WriteThread's lifespan never exceeds Device's lifespan.
    WriteThread(audio_hw_device_t* device, StreamOut::DataMQ* dataMQ,
                StreamOut::StatusMQ* statusMQ, EventFlag* efGroup)
        : Thread(false /*canCallJava*/),
          mDevice(device),
          mDataMQ(dataMQ),
          mStatusMQ(statusMQ),
          mEfGroup(efGroup),
          mBuffer(nullptr) {}
    bool init() {
        mBuffer.reset(new (std::nothrow) uint8_t[mDataMQ->getQuantumCount()]);
        return mBuffer != nullptr;
    }
    virtual ~WriteThread() {}

   private:
    audio_hw_device_t* mDevice;
    StreamOut::DataMQ* mDataMQ;
    StreamOut::StatusMQ* mStatusMQ;
    EventFlag* mEfGroup;
    std::unique_ptr<uint8_t[]> mBuffer;
    IDevice::WriteStatus mStatus;

    bool threadLoop() override;

    void doWrite();
};

void WriteThread::doWrite() {
    const size_t availToRead = mDataMQ->availableToRead();
    mStatus.retval = Result::OK;
    mStatus.reply.written = 0;
    if (mDataMQ->read(&mBuffer[0], availToRead)) {
        ssize_t writeResult = mDevice->virtualMicWrite(&mBuffer[0], availToRead);
        if (writeResult >= 0) {
            mStatus.reply.written = writeResult;
        } else {
            mStatus.retval = Device::analyzeStatus("virtualMicWrite", writeResult);
        }
    }
}

bool WriteThread::threadLoop() {
    // This implementation doesn't return control back to the Thread until it
    // decides to stop,
    // as the Thread uses mutexes, and this can lead to priority inversion.
    while (true) {
        uint32_t efState = 0;
        mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState);
        if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) {
            continue;  // Nothing to do.
        }
        doWrite();
        if (!mStatusMQ->write(&mStatus)) {
            ALOGE("status message queue write failed");
        }
        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
    }

    return false;
}

Return<void> Device::prepareForWriting(uint32_t frameSize, uint32_t framesCount,
                                          prepareForWriting_cb _hidl_cb) {
    status_t status;

    // Wrap the _hidl_cb to return an error
    auto sendError = [&_hidl_cb] (Result result) {
        _hidl_cb(result, DataMQ::Descriptor(), StatusMQ::Descriptor()); };

    if (mDataMQ) {
        ALOGE("the client attempts to call prepareForWriting twice");
        sendError(Result::INVALID_STATE);
        return Void();
    }

    // Check frameSize and framesCount
    if (frameSize == 0 || framesCount == 0) {
        ALOGE("Null frameSize (%u) or framesCount (%u)", frameSize, framesCount);
        sendError(Result::INVALID_ARGUMENTS);
        return Void();
    }
    if (frameSize > Stream::MAX_BUFFER_SIZE / framesCount) {
        ALOGE("Buffer too big: %u*%u bytes > MAX_BUFFER_SIZE (%u)", frameSize, framesCount,
              Stream::MAX_BUFFER_SIZE);
        sendError(Result::INVALID_ARGUMENTS);
        return Void();
    }
    std::unique_ptr<DataMQ> tempDataMQ(new DataMQ(frameSize * framesCount, true /* EventFlag */));

    std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1));
    if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) {
        ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid");
        ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid");
        sendError(Result::INVALID_ARGUMENTS);
        return Void();
    }
    EventFlag* tempRawEfGroup{};
    status = EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &tempRawEfGroup);
    std::unique_ptr<EventFlag, void (*)(EventFlag*)> tempElfGroup(
        tempRawEfGroup, [](auto* ef) { EventFlag::deleteEventFlag(&ef); });
    if (status != OK || !tempElfGroup) {
        ALOGE("failed creating event flag for data MQ: %s", strerror(-status));
        sendError(Result::INVALID_ARGUMENTS);
        return Void();
    }

    // Create and launch the thread.
    auto tempWriteThread = sp<WriteThread>::make(
            mDevice, tempDataMQ.get(), tempStatusMQ.get(), tempElfGroup.get());
    if (!tempWriteThread->init()) {
        ALOGW("failed to start writer thread: %s", strerror(-status));
        sendError(Result::INVALID_ARGUMENTS);
        return Void();
    }
    status = tempWriteThread->run("writer", PRIORITY_URGENT_AUDIO);
    if (status != OK) {
        ALOGW("failed to start writer thread: %s", strerror(-status));
        sendError(Result::INVALID_ARGUMENTS);
        return Void();
    }

    mDataMQ = std::move(tempDataMQ);
    mStatusMQ = std::move(tempStatusMQ);
    mWriteThread = tempWriteThread;
    mEfGroup = tempElfGroup.release();
    _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc());
    return Void();
}

发送端

这是 BpBinder 的部分回调代码,返回的是 MQDescriptorSync 对象。

const ::android::hardware::MQDescriptorSync<uint8_t>* _hidl_out_dataMQ;
const ::android::hardware::MQDescriptorSync<::android::hardware::audio::V7_0::IStreamOut::WriteStatus>* _hidl_out_statusMQ;

size_t _hidl__hidl_out_dataMQ_parent;
_hidl_err = _hidl_reply.readBuffer(sizeof(*_hidl_out_dataMQ), &_hidl__hidl_out_dataMQ_parent,  reinterpret_cast<const void **>(&_hidl_out_dataMQ));
if (_hidl_err != ::android::OK) { return; }

_hidl_err = ::android::hardware::readEmbeddedFromParcel(const_cast<::android::hardware::MQDescriptorSync<uint8_t> &>(*_hidl_out_dataMQ), _hidl_reply, _hidl__hidl_out_dataMQ_parent, 0 /* parentOffset */);
if (_hidl_err != ::android::OK) { return; }

size_t _hidl__hidl_out_statusMQ_parent;
_hidl_err = _hidl_reply.readBuffer(sizeof(*_hidl_out_statusMQ), &_hidl__hidl_out_statusMQ_parent,  reinterpret_cast<const void **>(&_hidl_out_statusMQ));
if (_hidl_err != ::android::OK) { return; }

_hidl_err = ::android::hardware::readEmbeddedFromParcel(const_cast<::android::hardware::MQDescriptorSync<::android::hardware::audio::V7_0::IStreamOut::WriteStatus> &>(*_hidl_out_statusMQ), _hidl_reply, _hidl__hidl_out_statusMQ_parent, 0 /* parentOffset */);
if (_hidl_err != ::android::OK) { return; }
status_t DeviceHalHidl::virtualMicWrite(const void *buffer, size_t bytes, size_t *written) {
    // TIME_CHECK();  // TODO(b/243839867) reenable only when optimized.
    if (mDevice == 0) return NO_INIT;
    *written = 0;

    if (bytes == 0 && !mDataMQ) {
        // Can't determine the size for the MQ buffer. Wait for a non-empty write request.
        ALOGW("First call to async write with 0 bytes");
        return OK;
    }

    status_t status;
    if (!mDataMQ) {
        // In case if playback starts close to the end of a compressed track, the bytes
        // that need to be written is less than the actual buffer size. Need to use
        // full buffer size for the MQ since otherwise after seeking back to the middle
        // data will be truncated.
        if ((status = prepareForWriting(bufferSize)) != OK) {
            return status;
        }
    }

    status = callWriterThread("write", static_cast<const uint8_t*>(buffer), bytes,
            [&] (const WriteStatus& writeStatus) {
                *written = writeStatus.reply.written;
                // Diagnostics of the cause of b/35813113.
                ALOGE_IF(*written > bytes,
                        "hal reports more bytes written than asked for: %lld > %lld",
                        (long long)*written, (long long)bytes);
            });
    return status;
}

status_t DeviceHalHidl::callWriterThread(const char* cmdName,
        const uint8_t* data, size_t dataSize, StreamOutHalHidl::WriterCallback callback) {
    if (data != nullptr) {
        size_t availableToWrite = mDataMQ->availableToWrite();
        if (dataSize > availableToWrite) {
            ALOGW("truncating write data from %lld to %lld due to insufficient data queue space",
                    (long long)dataSize, (long long)availableToWrite);
            dataSize = availableToWrite;
        }
        if (!mDataMQ->write(data, dataSize)) {
            ALOGE("data message queue write failed for \"%s\"", cmdName);
        }
    }
    mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));

    // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
    uint32_t efState = 0;
retry:
    status_t ret = mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState);
    if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)) {
        WriteStatus writeStatus;
        writeStatus.retval = Result::NOT_INITIALIZED;
        if (!mStatusMQ->read(&writeStatus)) {
            ALOGE("status message read failed for \"%s\"", cmdName);
        }
        if (writeStatus.retval == Result::OK) {
            ret = OK;
            callback(writeStatus);
        } else {
            ret = processReturn(cmdName, writeStatus.retval);
        }
        return ret;
    }
    if (ret == -EAGAIN || ret == -EINTR) {
        // Spurious wakeup. This normally retries no more than once.
        goto retry;
    }
    return ret;
}

status_t DeviceHalHidl::prepareForWriting(size_t bufferSize) {
    std::unique_ptr<DataMQ> tempDataMQ;
    std::unique_ptr<StatusMQ> tempStatusMQ;
    Result retval;
    Return<void> ret = mDevice->prepareForWriting(
            1, bufferSize,
            [&](Result r, const DataMQ::Descriptor& dataMQ, const StatusMQ::Descriptor& statusMQ) {
                retval = r;
                if (retval == Result::OK) {
                    tempDataMQ.reset(new DataMQ(dataMQ));
                    tempStatusMQ.reset(new StatusMQ(statusMQ));
                    if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
                        EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
                    }
                }
            });
    if (!ret.isOk() || retval != Result::OK) {
        return processReturn("prepareForWriting", ret, retval);
    }
    if (!tempDataMQ || !tempDataMQ->isValid() ||
            !tempStatusMQ || !tempStatusMQ->isValid() || !mEfGroup) {
        ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for writing");
        ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for writing is invalid");
        ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for writing");
        ALOGE_IF(tempStatusMQ && !tempStatusMQ->isValid(),
                "Status message queue for writing is invalid");
        ALOGE_IF(!mEfGroup, "Event flag creation for writing failed");
        return NO_INIT;
    }
    mDataMQ = std::move(tempDataMQ);
    mStatusMQ = std::move(tempStatusMQ);
    return OK;
}

https://source.android.google.cn/docs/core/architecture/hidl/fmq?hl=zh-cn


原文地址:https://blog.csdn.net/daixiang789/article/details/142388898

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!