node.js - 使用 websocket 使用 Node.js 进行 Amazon Transcribe 流式传输
问题描述
我正在开发一个 whatsapp 聊天机器人,我从 Whatsapp 接收音频文件(ogg 格式)文件 url,我得到缓冲区并将该文件上传到 S3(sample.ogg)现在想要使用 AWS Transcribe Streaming 所以我正在创建 readStream文件并发送到 AWS 转录我正在使用 websocket 但我收到空响应有时当我 Mhm mm mm 响应时。请谁能告诉我在我的代码中做错了什么
const express = require('express')
const app = express()
const fs = require('fs');
const crypto = require('crypto'); // tot sign our pre-signed URL
const v4 = require('./aws-signature-v4'); // to generate our pre-signed URL
const marshaller = require("@aws-sdk/eventstream-marshaller"); // for converting binary event stream messages to and from JSON
const util_utf8_node = require("@aws-sdk/util-utf8-node");
var WebSocket = require('ws') //for opening a web socket
// our converter between binary event streams messages and JSON
const eventStreamMarshaller = new marshaller.EventStreamMarshaller(util_utf8_node.toUtf8, util_utf8_node.fromUtf8);
// our global variables for managing state
let languageCode;
let region = 'ap-south-1';
let sampleRate;
let inputSampleRate;
let transcription = "";
let socket;
let micStream;
let socketError = false;
let transcribeException = false;
// let languageCode = 'en-us'
app.listen(8081, (error, data) => {
if(!error) {
console.log(`running at 8080----->>>>`)
}
})
let handleEventStreamMessage = function (messageJson) {
let results = messageJson.Transcript.Results;
if (results.length > 0) {
if (results[0].Alternatives.length > 0) {
let transcript = results[0].Alternatives[0].Transcript;
// fix encoding for accented characters
transcript = decodeURIComponent(escape(transcript));
console.log(`Transcpted is----->>${transcript}`)
}
}
}
function downsampleBuffer (buffer, inputSampleRate = 44100, outputSampleRate = 16000){
if (outputSampleRate === inputSampleRate) {
return buffer;
}
var sampleRateRatio = inputSampleRate / outputSampleRate;
var newLength = Math.round(buffer.length / sampleRateRatio);
var result = new Float32Array(newLength);
var offsetResult = 0;
var offsetBuffer = 0;
while (offsetResult < result.length) {
var nextOffsetBuffer = Math.round((offsetResult + 1) * sampleRateRatio);
var accum = 0,
count = 0;
for (var i = offsetBuffer; i < nextOffsetBuffer && i < buffer.length; i++ ) {
accum += buffer[i];
count++;
}
result[offsetResult] = accum / count;
offsetResult++;
offsetBuffer = nextOffsetBuffer;
}
return result;
}
function pcmEncode(input) {
var offset = 0;
var buffer = new ArrayBuffer(input.length * 2);
var view = new DataView(buffer);
for (var i = 0; i < input.length; i++, offset += 2) {
var s = Math.max(-1, Math.min(1, input[i]));
view.setInt16(offset, s < 0 ? s * 0x8000 : s * 0x7FFF, true);
}
return buffer;
}
function getAudioEventMessage(buffer) {
// wrap the audio data in a JSON envelope
return {
headers: {
':message-type': {
type: 'string',
value: 'event'
},
':event-type': {
type: 'string',
value: 'AudioEvent'
}
},
body: buffer
};
}
function convertAudioToBinaryMessage(raw) {
if (raw == null)
return;
// downsample and convert the raw audio bytes to PCM
let downsampledBuffer = downsampleBuffer(raw, inputSampleRate);
let pcmEncodedBuffer = pcmEncode(downsampledBuffer);
setTimeout(function() {}, 1);
// add the right JSON headers and structure to the message
let audioEventMessage = getAudioEventMessage(Buffer.from(pcmEncodedBuffer));
//convert the JSON object + headers into a binary event stream message
let binary = eventStreamMarshaller.marshall(audioEventMessage);
return binary;
}
function createPresignedUrl() {
let endpoint = "transcribestreaming." + "us-east-1" + ".amazonaws.com:8443";
// get a preauthenticated URL that we can use to establish our WebSocket
return v4.createPresignedURL(
'GET',
endpoint,
'/stream-transcription-websocket',
'transcribe',
crypto.createHash('sha256').update('', 'utf8').digest('hex'), {
'key': <AWS_KEY>,
'secret': <AWS_SECRET_KEY>,
'protocol': 'wss',
'expires': 15,
'region': 'us-east-1',
'query': "language-code=" + 'en-US' + "&media-encoding=pcm&sample-rate=" + 8000
}
);
}
function showError(message) {
console.log("Error: ",message)
}
app.get('/convert', (req, res) => {
var file = 'recorded.mp3'
const eventStreamMarshaller = new marshaller.EventStreamMarshaller(util_utf8_node.toUtf8, util_utf8_node.fromUtf8);
let url = createPresignedUrl();
let socket = new WebSocket(url);
socket.binaryType = "arraybuffer";
let output = '';
const readStream = fs.createReadStream(file, { highWaterMark: 32 * 256 })
readStream.setEncoding('binary')
//let sampleRate = 0;
let inputSampleRate = 44100
readStream.on('end', function() {
console.log('finished reading----->>>>');
// write to file here.
// Send an empty frame so that Transcribe initiates a closure of the WebSocket after submitting all transcripts
let emptyMessage = getAudioEventMessage(Buffer.from(new Buffer([])));
let emptyBuffer = eventStreamMarshaller.marshall(emptyMessage);
socket.send(emptyBuffer);
})
// when we get audio data from the mic, send it to the WebSocket if possible
socket.onopen = function() {
readStream.on('data', function(chunk) {
let binary = convertAudioToBinaryMessage(chunk);
if (socket.readyState === socket.OPEN) {
console.log(`sending to steaming API------->>>>`)
socket.send(binary);
}
});
// the audio stream is raw audio bytes. Transcribe expects PCM with additional metadata, encoded as binary
}
// the audio stream is raw audio bytes. Transcribe expects PCM with additional metadata, encoded as binary
socket.onerror = function () {
socketError = true;
showError('WebSocket connection error. Try again.');
};
// handle inbound messages from Amazon Transcribe
socket.onmessage = function (message) {
//convert the binary event stream message to JSON
let messageWrapper = eventStreamMarshaller.unmarshall(Buffer(message.data));
//console.log(`messag -->>${JSON.stringify(messageWrapper)}`)
let messageBody = JSON.parse(String.fromCharCode.apply(String, messageWrapper.body));
console.log("results:.. ",JSON.stringify(messageBody))
if (messageWrapper.headers[":message-type"].value === "event") {
handleEventStreamMessage(messageBody);
}
else {
transcribeException = true;
showError(messageBody.Message);
}
}
let closeSocket = function () {
if (socket.OPEN) {
// Send an empty frame so that Transcribe initiates a closure of the WebSocket after submitting all transcripts
let emptyMessage = getAudioEventMessage(Buffer.from(new Buffer([])));
let emptyBuffer = eventStreamMarshaller.marshall(emptyMessage);
socket.send(emptyBuffer);
}
}
})
解决方案
推荐阅读
- cookies - 神秘的 Cookie 有效期`1969-12-31T23:59:59.000Z`
- python - 如何discord.Invite(查看给定邀请中有多少成员)?
- angularjs - 引导模式。关闭模式时屏幕变灰
- javascript - 单击下拉会打开错误的 dtop down
- reactjs - react-router 如何在页面刷新时保持位置状态?
- caffe - Caffe: Check failed: error == cudaSuccess (48 vs. 0) no kernel image is available for execution on the device error on Jetson TX1
- git - git reset --soft 用大文件挽回面子
- asynchronous - 重建读取端 DB 时事件乱序
- android - 错误:包 android.v7.app 不存在
- ms-access - 如何根据 MS Access 中的记录输出多个 PDF 文件?