首页 > 解决方案 > 使用 websocket 使用 Node.js 进行 Amazon Transcribe 流式传输

问题描述

我正在开发一个 whatsapp 聊天机器人,我从 Whatsapp 接收音频文件(ogg 格式)文件 url,我得到缓冲区并将该文件上传到 S3(sample.ogg)现在想要使用 AWS Transcribe Streaming 所以我正在创建 readStream文件并发送到 AWS 转录我正在使用 websocket 但我收到空响应有时当我 Mhm mm mm 响应时。请谁能告诉我在我的代码中做错了什么

const express = require('express')
const app = express()
const fs = require('fs');
const crypto = require('crypto'); // tot sign our pre-signed URL
const v4  = require('./aws-signature-v4'); // to generate our pre-signed URL
const marshaller  = require("@aws-sdk/eventstream-marshaller"); // for converting binary event stream messages to and from JSON
const util_utf8_node = require("@aws-sdk/util-utf8-node");
var WebSocket = require('ws') //for opening a web socket
// our converter between binary event streams messages and JSON
const eventStreamMarshaller = new marshaller.EventStreamMarshaller(util_utf8_node.toUtf8, util_utf8_node.fromUtf8);

   // our global variables for managing state
   let languageCode;
   let region = 'ap-south-1';
   let sampleRate;
   let inputSampleRate;
   let transcription = "";
   let socket;
   let micStream;
   let socketError = false;
   let transcribeException = false;
  // let languageCode = 'en-us'

app.listen(8081, (error, data) => {
    if(!error) {
        console.log(`running at 8080----->>>>`)
    }
})

let handleEventStreamMessage = function (messageJson) {
    let results = messageJson.Transcript.Results;

    if (results.length > 0) {
        if (results[0].Alternatives.length > 0) {
            let transcript = results[0].Alternatives[0].Transcript;

            // fix encoding for accented characters
            transcript = decodeURIComponent(escape(transcript));

         console.log(`Transcpted is----->>${transcript}`)
        }
    }
}

function downsampleBuffer (buffer, inputSampleRate = 44100, outputSampleRate = 16000){
    if (outputSampleRate === inputSampleRate) {
        return buffer;
    }

    var sampleRateRatio = inputSampleRate / outputSampleRate;
    var newLength = Math.round(buffer.length / sampleRateRatio);
    var result = new Float32Array(newLength);
    var offsetResult = 0;
    var offsetBuffer = 0;

    while (offsetResult < result.length) {

        var nextOffsetBuffer = Math.round((offsetResult + 1) * sampleRateRatio);

        var accum = 0,
        count = 0;

        for (var i = offsetBuffer; i < nextOffsetBuffer && i < buffer.length; i++ ) {
            accum += buffer[i];
            count++;
        }

        result[offsetResult] = accum / count;
        offsetResult++;
        offsetBuffer = nextOffsetBuffer;

    }

    return result;
}


function pcmEncode(input) {
    var offset = 0;
    var buffer = new ArrayBuffer(input.length * 2);
    var view = new DataView(buffer);
    for (var i = 0; i < input.length; i++, offset += 2) {
        var s = Math.max(-1, Math.min(1, input[i]));
        view.setInt16(offset, s < 0 ? s * 0x8000 : s * 0x7FFF, true);
    }
    return buffer;
}

function getAudioEventMessage(buffer) {
    // wrap the audio data in a JSON envelope
    return {
        headers: {
            ':message-type': {
                type: 'string',
                value: 'event'
            },
            ':event-type': {
                type: 'string',
                value: 'AudioEvent'
            }
        },
        body: buffer
    };
}


function convertAudioToBinaryMessage(raw) {

    if (raw == null)
        return;

    // downsample and convert the raw audio bytes to PCM
    let downsampledBuffer = downsampleBuffer(raw, inputSampleRate);
    let pcmEncodedBuffer =  pcmEncode(downsampledBuffer);
    setTimeout(function() {}, 1);
    // add the right JSON headers and structure to the message
    let audioEventMessage = getAudioEventMessage(Buffer.from(pcmEncodedBuffer));

    //convert the JSON object + headers into a binary event stream message
    let binary = eventStreamMarshaller.marshall(audioEventMessage);

    return binary;
}

function createPresignedUrl() {
    let endpoint = "transcribestreaming." + "us-east-1" + ".amazonaws.com:8443";

    // get a preauthenticated URL that we can use to establish our WebSocket
    return v4.createPresignedURL(
        'GET',
        endpoint,
        '/stream-transcription-websocket',
        'transcribe',
        crypto.createHash('sha256').update('', 'utf8').digest('hex'), {
            'key': <AWS_KEY>,
            'secret': <AWS_SECRET_KEY>,
            'protocol': 'wss',
            'expires': 15,
            'region': 'us-east-1',
            'query': "language-code=" + 'en-US' + "&media-encoding=pcm&sample-rate=" + 8000
        }
    );
}

function showError(message) {
    console.log("Error: ",message)
 }


app.get('/convert', (req, res) => {
        var file = 'recorded.mp3'
        const eventStreamMarshaller = new marshaller.EventStreamMarshaller(util_utf8_node.toUtf8, util_utf8_node.fromUtf8);
        let url = createPresignedUrl();
        let socket = new WebSocket(url);
        socket.binaryType = "arraybuffer";
        let output = '';
        const readStream = fs.createReadStream(file, { highWaterMark: 32 * 256 })
        readStream.setEncoding('binary')
        //let sampleRate = 0;
        let inputSampleRate = 44100
        readStream.on('end', function() {
            console.log('finished reading----->>>>');
            // write to file here.
             // Send an empty frame so that Transcribe initiates a closure of the WebSocket after submitting all transcripts
                  let emptyMessage = getAudioEventMessage(Buffer.from(new Buffer([])));
                  let emptyBuffer = eventStreamMarshaller.marshall(emptyMessage);
                  socket.send(emptyBuffer);
          })

    // when we get audio data from the mic, send it to the WebSocket if possible
      socket.onopen = function() {
        readStream.on('data', function(chunk) {
         let binary = convertAudioToBinaryMessage(chunk);
         if (socket.readyState === socket.OPEN) {
             console.log(`sending to steaming API------->>>>`)
             socket.send(binary);
         }     
        });
            // the audio stream is raw audio bytes. Transcribe expects PCM with additional metadata, encoded as binary
        }
                // the audio stream is raw audio bytes. Transcribe expects PCM with additional metadata, encoded as binary


        socket.onerror = function () {
            socketError = true;
            showError('WebSocket connection error. Try again.');

        };

          // handle inbound messages from Amazon Transcribe
    socket.onmessage = function (message) {
        //convert the binary event stream message to JSON
        let messageWrapper = eventStreamMarshaller.unmarshall(Buffer(message.data));
        //console.log(`messag -->>${JSON.stringify(messageWrapper)}`)
        let messageBody = JSON.parse(String.fromCharCode.apply(String, messageWrapper.body));
        console.log("results:.. ",JSON.stringify(messageBody))
        if (messageWrapper.headers[":message-type"].value === "event") {
            handleEventStreamMessage(messageBody);
        }
        else {
            transcribeException = true;
            showError(messageBody.Message);

    }
  }

  let closeSocket = function () {
    if (socket.OPEN) {

        // Send an empty frame so that Transcribe initiates a closure of the WebSocket after submitting all transcripts
        let emptyMessage = getAudioEventMessage(Buffer.from(new Buffer([])));
        let emptyBuffer = eventStreamMarshaller.marshall(emptyMessage);
        socket.send(emptyBuffer);
    }
}


})

标签: node.jsamazon-web-servicesstreamingaws-transcribe

解决方案


推荐阅读