|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
import argparse
import numpy as np
import shlex
import subprocess
import sys
import wave
import json
import os
from bottle import route, run, template, request, response
from deepspeech import Model, printVersions
from timeit import default_timer as timer
try:
from shhlex import quote
except ImportError:
from pipes import quote
model = "deepspeech-0.6.0-models/output_graph.pbmm"
lm = "deepspeech-0.6.0-models/lm.binary"
trie = "deepspeech-0.6.0-models/trie"
beam_width = 500
lm_alpha = 0.75
lm_beta = 1.85
desired_sample_rate = 16000
ds = {}
def convert_samplerate(audio_path, desired_sample_rate):
sox_cmd = 'sox {} --type raw --bits 16 --channels 1 --rate {} --encoding signed-integer --endian little --compression 0.0 --no-dither - '.format(quote(audio_path), desired_sample_rate)
try:
output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise RuntimeError('SoX returned non-zero status: {}'.format(e.stderr))
except OSError as e:
raise OSError(e.errno, 'SoX not found, use {}hz files or install it: {}'.format(desired_sample_rate, e.strerror))
return desired_sample_rate, np.frombuffer(output, np.int16)
def metadata_to_string(metadata):
return ''.join(item.character for item in metadata.items)
def words_from_metadata(metadata):
word = ""
word_list = []
word_start_time = 0
# Loop through each character
for i in range(0, metadata.num_items):
item = metadata.items[i]
# Append character to word if it's not a space
if item.character != " ":
word = word + item.character
# Word boundary is either a space or the last character in the array
if item.character == " " or i == metadata.num_items - 1:
word_duration = item.start_time - word_start_time
if word_duration < 0:
word_duration = 0
each_word = dict()
each_word["word"] = word
each_word["start_time "] = round(word_start_time, 4)
each_word["duration"] = round(word_duration, 4)
word_list.append(each_word)
# Reset
word = ""
word_start_time = 0
else:
if len(word) == 1:
# Log the start time of the new word
word_start_time = item.start_time
return word_list
def metadata_json_output(metadata):
json_result = dict()
json_result["words"] = words_from_metadata(metadata)
json_result["confidence"] = metadata.confidence
return json.dumps(json_result)
class VersionAction(argparse.Action):
def __init__(self, *args, **kwargs):
super(VersionAction, self).__init__(nargs=0, *args, **kwargs)
def __call__(self, *args, **kwargs):
printVersions()
exit(0)
@route('/upload', method='POST')
def do_upload():
print('received a request')
response.content_type = 'application/json'
upload = request.POST['file']
name, ext = os.path.splitext(upload.filename)
if ext not in ('.wav'):
return "File extension not allowed."
save_path = "temp_audio"
if not os.path.exists(save_path):
os.makedirs(save_path)
file_path = "{path}/{file}".format(path=save_path, file=upload.filename)
print(file_path)
upload.save(file_path, overwrite=True)
data = process(file_path)
return data
#return { "message": "File successfully saved to '{0}'.".format(save_path)}
def start():
global ds
print('Loading model from file {}'.format(model), file=sys.stderr)
model_load_start = timer()
ds = Model(model, beam_width)
model_load_end = timer() - model_load_start
print('Loaded model in {:.3}s.'.format(model_load_end), file=sys.stderr)
#desired_sample_rate = ds.sampleRate()
if lm and trie:
print('Loading language model from files {} {}'.format(lm, trie), file=sys.stderr)
lm_load_start = timer()
ds.enableDecoderWithLM(lm, trie, lm_alpha, lm_beta)
lm_load_end = timer() - lm_load_start
print('Loaded language model in {:.3}s.'.format(lm_load_end), file=sys.stderr)
def process(audio):
global ds
fin = wave.open(audio, 'rb')
fs = fin.getframerate()
if fs != desired_sample_rate:
print('Warning: original sample rate ({}) is different than {}hz. Resampling might produce erratic speech recognition.'.format(fs, desired_sample_rate), file=sys.stderr)
fs, audio = convert_samplerate(audio, desired_sample_rate)
else:
audio = np.frombuffer(fin.readframes(fin.getnframes()), np.int16)
audio_length = fin.getnframes() * (1/fs)
fin.close()
print('Running inference.', file=sys.stderr)
inference_start = timer()
data = metadata_json_output(ds.sttWithMetadata(audio))
inference_end = timer() - inference_start
print('Inference took %0.3fs for %0.3fs audio file.' % (inference_end, audio_length), file=sys.stderr)
print(data)
return data
if __name__ == '__main__':
start()
run(host='localhost', port=8050)
if __name__ == '__main__':
main()
|