首页 > 解决方案 > 从 HTTP 响应流流式传输到 Azure 存储 Blob

问题描述

概括

我需要从 Azure 函数(无服务器)从文本到语音 (tts) 流式传输到 Azure 存储 Blob。该代码有效,但未按我期望的顺序完成。

功能说明

该代码通过流将文本转换为音频 (.mp3) 文件。文本到语音 (tts) 转换是通过TextToSpeech类中的 REST API 进行的,该类与流一起使用。tts 调用将音频流写入可写流 - 这是 Azure 存储 blob。

预计

我认为对创建 blob 的函数的调用应该在 blob 完成写入后完成,但事实并非如此。这段代码是否正确,如果不正确,问题出在哪里?

我希望输出是:

收到

接收到的输出(显示在底部)是:

无服务器功能的代码:

require('dotenv').config();
const TextToSpeech = require("./textToSpeech");
const azure = require('azure-storage');

const fn = async () => {

    const blobService = azure.createBlobService(process.env.STORAGECONNECTIONSTRING);

    const textToSpeech = new TextToSpeech({
        accessTokenHost: process.env.SPEECHACCESSTOKENHOST,
        ttsHost: process.env.SPEECHRESOURCETTSHOST,
        ttsKey: process.env.SPEECHRESOURCETTSKEY
    });

    const userName = "diberry";
    const container = "function-blob-tts";
    const directory = userName;
    const transformConfig = {"filenameandpath": 'test.mp3'};

    const blobName = directory + "/" + transformConfig.filenameandpath;

    // DOCS: https://azure.github.io/azure-storage-node/BlobService.html#createWriteStreamToBlockBlob__anchor
    const writableStream = blobService.createWriteStreamToBlockBlob(container, blobName, { blockIdPrefix: 'block' });

    await textToSpeech.transform(transformConfig, "This is a brand new world.", writableStream);

    // blob properties
    return await blobService.getBlobProperties(container, blobName, (err, results)=>{
        if (err) throw err;
        console.log(results);
        if (results) return results;
    });

}

调用无服务器 fn 的代码:

fn().then(results => {
    console.log("then");
}).catch(err => {
    console.log("err received");
    console.log(err);
})

TextToSpeech 类:

    const rp = require('requestretry');

class TextToSpeech {

    /**
     *
     * @param config - {key:"",endpoint:""}
     */
    constructor(config) {
        this.config = config;
        this.delayMS = 500;
        this.retry = 5;
    }

    // retry request if error or 429 received
    retryStrategy(err, response) {
        let shouldRetry = err || response.statusCode === 429;

        return shouldRetry;
    };

    // Gets an access token.
    async getAccessToken() {
        const options = {
            method: 'POST',
            uri: `https://${this.config.accessTokenHost}/sts/v1.0/issueToken`,
            headers: {
                'Ocp-Apim-Subscription-Key': this.config.ttsKey,
            },
        };
        const response = await rp(options);

        return response.body;
    };
    // Make sure to update User-Agent with the name of your resource.
    // You can also change the voice and output formats. See:
    // https://docs.microsoft.com/azure/cognitive-services/speech-service/language-support#text-to-speech
    /**
     *
     * @param accessToken - good for 10 minutes, used immediately
     * @param transformConfig - ttsConfigs
     * @param text
     * @param writableStream
     */
    async textToSpeech(accessToken, transformConfig, text, writableStream) {
        try {
            transformConfig.selectedVoice = {
                gender: 'female',
                locale: 'en-us',
                code: 'Jessa24KRUS',
            };

            // Create the SSML request.
            let body = `<?xml version="1.0"?><speak version="1.0" xml:lang="en-us"><voice xml:lang="en-us" name="Microsoft Server Speech Text to Speech Voice (${transformConfig.selectedVoice.locale}, ${transformConfig.selectedVoice.code})"><prosody rate="-20.00%">${text}</prosody></voice></speak>`;

            let options = {
                method: 'POST',
                baseUrl: `https://${this.config.ttsHost}/`,
                url: '/cognitiveservices/v1',
                headers: {
                    Authorization: 'Bearer ' + accessToken,
                    'cache-control': 'no-cache',
                    'User-Agent': 'YOUR_RESOURCE_NAME',
                    'X-Microsoft-OutputFormat': 'audio-24khz-48kbitrate-mono-mp3',
                    'Content-Type': 'application/ssml+xml',
                },
                //timeout: 120000,
                body: body,
                maxAttempts: this.retry,
                retryDelay: this.delayMS,
                retryStrategy: this.retryStrategy,
            };

            // request has binary audio file
            await rp(options)
                .on('response', async (response) => {
                    if (response.statusCode === 200) {
                        writableStream.on('finish', () => {
                            console.log('The end!');
                        });
                        response.pipe(writableStream);
                    } else {
                        throw Error('Response statusCode ' + response.statusCode);
                    }
                })
                .on('error', err => {
                    throw err;
                });
        } catch (err) {
            throw err;
        }
    }

    /**
     *
     * @param transformConfig
     * @param text
     */
    async transform(transformConfig, text, writableStream) {
        try {
            // get token - access token is good for 10 minutes
            const accessToken = await this.getAccessToken();

            // get binary and return in in/out writableStream
            await this.textToSpeech(accessToken, transformConfig, text, writableStream);
        } catch (err) {
            throw err;
        }
    }
}


module.exports = TextToSpeech;

控制台输出

从控制台运行时:

then
BlobResult {
  container: 'function-blob-tts',
  name: 'diberry/test.mp3',
  metadata: {},
  lastModified: 'Sun, 25 Aug 2019 13:06:25 GMT',
  creationTime: 'Sun, 25 Aug 2019 12:38:50 GMT',
  etag: '"0x8D7295D08E34C0E"',
  blobType: 'BlockBlob',
  contentLength: '19008',
  serverEncrypted: 'true',
  requestId: 'caa7abc9-701e-00ff-0b47-5b694c000000',
  contentSettings:
   { contentType: 'application/octet-stream',
     contentMD5: 'FN99sCq5XC3DOnAucPHtCA==' },
  lease: { status: 'unlocked', state: 'available' } }
The end!

标签: node.jsasynchronousstreamazure-blob-storageazure-cognitive-services

解决方案


不得不更改代码以使订单按我的意愿工作。

不需要转换 - 我只是想看看发生了什么。

可写流的管道位于承诺内部,因此可以保持引擎滴答声而不是继续。转换将流的处理显示为同步操作。

作为回调的 blob 属性也弄乱了滴答声,因此将其转换为承诺 - 不要混合回调和承诺。

无服务器功能的代码

require('dotenv').config();
const TextToSpeech = require("./textToSpeech");
const azure = require('azure-storage');

const getBlobProperties = async(blobService, container, blobName) => {
    return new Promise((resolve, reject) => {
        try {

            // blob properties
            blobService.getBlobProperties(container, blobName, (err, results)=>{
                if (err) throw err;
                console.log(`getBlobProperties - ${JSON.stringify(results)}`);
                if (results) {
                    console.log(`getBlobProperties - done`);
                    resolve(results);
                }
            });

        } catch (err) {
            reject(err);
        }
    });
}

const fn = async () => {

    try{

        const blobService = azure.createBlobService(process.env.STORAGECONNECTIONSTRING);

        const textToSpeech = new TextToSpeech({
            accessTokenHost: process.env.SPEECHACCESSTOKENHOST,
            ttsHost: process.env.SPEECHRESOURCETTSHOST,
            ttsKey: process.env.SPEECHRESOURCETTSKEY
        });

        const userName = "diberry";
        const container = "function-blob-tts";
        const directory = userName;
        const transformConfig = {"filenameandpath": '6-test.mp3'};

        const blobName = directory + "/" + transformConfig.filenameandpath;

        // DOCS: https://azure.github.io/azure-storage-node/BlobService.html#createWriteStreamToBlockBlob__anchor
        const writableStream = blobService.createWriteStreamToBlockBlob(container, blobName, { blockIdPrefix: 'block' });

        await textToSpeech.transform(transformConfig, "This is a brand new world.", writableStream);

        console.log(`N-2 textToSpeech.transform done`);

        await getBlobProperties(blobService, container, blobName);
        console.log(`N-1 blob properties done`);

    }catch(err){
        console.log(`function error - ${err}`);
    }

}

调用无服务器 fn 的代码:

fn().then(results => {
    console.log("N function done");
}).catch(err => {
    console.log("function err received");
    console.log(err);
})

TextToSpeech 类:

const rp = require('requestretry');

class TextToSpeech {

    /**
     *
     * @param config - {key:"",endpoint:""}
     */
    constructor(config) {
        this.config = config;
        this.delayMS = 500;
        this.retry = 5;
    }

    // retry request if error or 429 received
    retryStrategy(err, response) {
        let shouldRetry = err || response.statusCode === 429;

        return shouldRetry;
    };

    // Gets an access token.
    async getAccessToken() {
        const options = {
            method: 'POST',
            uri: `https://${this.config.accessTokenHost}/sts/v1.0/issueToken`,
            headers: {
                'Ocp-Apim-Subscription-Key': this.config.ttsKey,
            },
        };
        const response = await rp(options);

        return response.body;
    };
    // Make sure to update User-Agent with the name of your resource.
    // You can also change the voice and output formats. See:
    // https://docs.microsoft.com/azure/cognitive-services/speech-service/language-support#text-to-speech
    /**
     *
     * @param accessToken - good for 10 minutes, used immediately
     * @param transformConfig - ttsConfigs
     * @param text
     * @param writableStream
     */
    async textToSpeech(accessToken, transformConfig, text, writableStream) {

        return new Promise((resolve, reject) => {
            try {
                transformConfig.selectedVoice = {
                    gender: 'female',
                    locale: 'en-us',
                    code: 'Jessa24KRUS',
                };

                // Create the SSML request.
                let body = `<?xml version="1.0"?><speak version="1.0" xml:lang="en-us"><voice xml:lang="en-us" name="Microsoft Server Speech Text to Speech Voice (${transformConfig.selectedVoice.locale}, ${transformConfig.selectedVoice.code})"><prosody rate="-20.00%">${text}</prosody></voice></speak>`;

                let options = {
                    method: 'POST',
                    baseUrl: `https://${this.config.ttsHost}/`,
                    url: '/cognitiveservices/v1',
                    headers: {
                        Authorization: 'Bearer ' + accessToken,
                        'cache-control': 'no-cache',
                        'User-Agent': 'YOUR_RESOURCE_NAME',
                        'X-Microsoft-OutputFormat': 'audio-24khz-48kbitrate-mono-mp3',
                        'Content-Type': 'application/ssml+xml',
                    },
                    //timeout: 120000,
                    body: body,
                    maxAttempts: this.retry,
                    retryDelay: this.delayMS,
                    retryStrategy: this.retryStrategy,
                };

                const { Transform } = require('stream');

                const reportProgress = new Transform({
                transform(chunk, encoding, callback) {
                    process.stdout.write('.');
                    callback(null, chunk);
                }
                });

                // request has binary audio file
                rp(options)
                .pipe(reportProgress)
                .pipe(writableStream)
                .on('finish', () => {
                    console.log('Done');
                    resolve();
                });



            } catch (err) {
                reject(err);
            }
        });
    }


    /**
     *
     * @param transformConfig
     * @param text
     */
    async transform(transformConfig, text, writableStream) {
        try {
            // get token - access token is good for 10 minutes
            const accessToken = await this.getAccessToken();

            // get binary and return in in/out writableStream
            await this.textToSpeech(accessToken, transformConfig, text, writableStream);

            console.log("transform done");
        } catch (err) {
            console.log(`transform error - ${err}`);
            throw err;
        }
    }
}


module.exports = TextToSpeech;

推荐阅读