2828# [START import_libraries]
2929from __future__ import division
3030
31+ import argparse
3132import collections
3233import itertools
3334import re
3435import sys
36+ import threading
37+ import time
3538
3639from google .cloud import speech
3740from google .cloud .speech import enums
4043import grpc
4144import pyaudio
4245from six .moves import queue
46+ import six
47+
48+ import transcribe_streaming_mic
4349# [END import_libraries]
4450
45- # Audio recording parameters
46- RATE = 16000
47- CHUNK = int ( RATE / 10 ) # 100ms
51+
52+ def duration_to_secs ( duration ):
53+ return duration . seconds + ( duration . nanos / float ( 1e9 ))
4854
4955
50- class MicrophoneStream ( object ):
56+ class ResumableMicrophoneStream ( transcribe_streaming_mic . MicrophoneStream ):
5157 """Opens a recording stream as a generator yielding the audio chunks."""
5258 def __init__ (self , rate , chunk_size , max_replay_secs = 5 ):
53- self ._rate = rate
54- self ._chunk_size = chunk_size
59+ super (ResumableMicrophoneStream , self ).__init__ (rate , chunk_size )
5560 self ._max_replay_secs = max_replay_secs
5661
57- # Create a thread-safe buffer of audio data
58- self ._buff = queue .Queue ()
59- self .closed = True
60-
61- def __enter__ (self ):
62- num_channels = 1
63- self ._audio_interface = pyaudio .PyAudio ()
64- self ._audio_stream = self ._audio_interface .open (
65- format = pyaudio .paInt16 ,
66- # The API currently only supports 1-channel (mono) audio
67- # https://goo.gl/z757pE
68- channels = num_channels , rate = self ._rate ,
69- input = True , frames_per_buffer = self ._chunk_size ,
70- # Run the audio stream asynchronously to fill the buffer object.
71- # This is necessary so that the input device's buffer doesn't
72- # overflow while the calling thread makes network requests, etc.
73- stream_callback = self ._fill_buffer ,
74- )
75-
76- self .closed = False
62+ # Some useful numbers
63+ # 2 bytes in 16 bit samples
64+ self ._bytes_per_sample = 2 * self ._num_channels
65+ self ._bytes_per_second = self ._rate * self ._bytes_per_sample
7766
78- bytes_per_sample = 2 * num_channels # 2 bytes in 16 bit samples
79- self ._bytes_per_second = self ._rate * bytes_per_sample
80-
81- bytes_per_chunk = (self ._chunk_size * bytes_per_sample )
82- chunks_per_second = self ._bytes_per_second / bytes_per_chunk
67+ self ._bytes_per_chunk = (self ._chunk_size * self ._bytes_per_sample )
68+ self ._chunks_per_second = (
69+ self ._bytes_per_second / self ._bytes_per_chunk )
8370 self ._untranscribed = collections .deque (
84- maxlen = self ._max_replay_secs * chunks_per_second )
85-
86- return self
87-
88- def __exit__ (self , type , value , traceback ):
89- self ._audio_stream .stop_stream ()
90- self ._audio_stream .close ()
91- self .closed = True
92- # Signal the generator to terminate so that the client's
93- # streaming_recognize method will not block the process termination.
94- self ._buff .put (None )
95- self ._audio_interface .terminate ()
96-
97- def _fill_buffer (self , in_data , frame_count , time_info , status_flags ):
98- """Continuously collect data from the audio stream, into the buffer."""
99- self ._buff .put (in_data )
100- return None , pyaudio .paContinue
71+ maxlen = self ._max_replay_secs * self ._chunks_per_second )
10172
10273 def on_transcribe (self , end_time ):
10374 while self ._untranscribed and end_time > self ._untranscribed [0 ][1 ]:
@@ -106,145 +77,160 @@ def on_transcribe(self, end_time):
10677 def generator (self , resume = False ):
10778 total_bytes_sent = 0
10879 if resume :
80+ # Make a copy, in case on_transcribe is called while yielding them
81+ catchup = list (self ._untranscribed )
10982 # Yield all the untranscribed chunks first
110- for chunk , _ in self . _untranscribed :
83+ for chunk , _ in catchup :
11184 yield chunk
112- while not self .closed :
113- # Use a blocking get() to ensure there's at least one chunk of
114- # data, and stop iteration if the chunk is None, indicating the
115- # end of the audio stream.
116- chunk = self ._buff .get ()
117- if chunk is None :
118- return
119- data = [chunk ]
120-
121- # Now consume whatever other data's still buffered.
122- while True :
123- try :
124- chunk = self ._buff .get (block = False )
125- if chunk is None :
126- return
127- data .append (chunk )
128- except queue .Empty :
129- break
130-
131- byte_data = b'' .join (data )
13285
86+ for byte_data in super (ResumableMicrophoneStream , self ).generator ():
13387 # Populate the replay buffer of untranscribed audio bytes
13488 total_bytes_sent += len (byte_data )
13589 chunk_end_time = total_bytes_sent / self ._bytes_per_second
13690 self ._untranscribed .append ((byte_data , chunk_end_time ))
13791
13892 yield byte_data
139- # [END audio_stream]
14093
14194
142- def duration_to_secs (duration ):
143- return duration .seconds + (duration .nanos / float (1e9 ))
95+ class SimulatedMicrophoneStream (ResumableMicrophoneStream ):
96+ def __init__ (self , audio_src , * args , ** kwargs ):
97+ super (SimulatedMicrophoneStream , self ).__init__ (* args , ** kwargs )
98+ self ._audio_src = audio_src
14499
100+ def _delayed (self , get_data ):
101+ total_bytes_read = 0
102+ start_time = time .time ()
145103
146- def listen_print_loop (responses , stream ):
147- """Iterates through server responses and prints them.
104+ chunk = get_data (self ._bytes_per_chunk )
148105
149- The responses passed is a generator that will block until a response
150- is provided by the server.
106+ while chunk and not self .closed :
107+ total_bytes_read += len (chunk )
108+ expected_yield_time = start_time + (
109+ total_bytes_read / self ._bytes_per_second )
110+ now = time .time ()
111+ if expected_yield_time > now :
112+ time .sleep (expected_yield_time - now )
151113
152- Each response may contain multiple results, and each result may contain
153- multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we
154- print only the transcription for the top alternative of the top result.
114+ yield chunk
155115
156- In this case, responses are provided for interim results as well. If the
157- response is an interim one, print a line feed at the end of it, to allow
158- the next result to overwrite it, until the response is a final one. For the
159- final one, print a newline to preserve the finalized transcription.
160- """
161- num_chars_printed = 0
162- for response in responses :
163- if not response .results :
164- continue
165-
166- # The `results` list is consecutive. For streaming, we only care about
167- # the first result being considered, since once it's `is_final`, it
168- # moves on to considering the next utterance.
169- result = response .results [0 ]
170- if not result .alternatives :
171- continue
172-
173- top_alternative = result .alternatives [0 ]
174- # Display the transcription of the top alternative.
175- transcript = top_alternative .transcript
176-
177- # Display interim results, but with a carriage return at the end of the
178- # line, so subsequent lines will overwrite them.
179- #
180- # If the previous result was longer than this one, we need to print
181- # some extra spaces to overwrite the previous result
182- overwrite_chars = ' ' * (num_chars_printed - len (transcript ))
183-
184- if not result .is_final :
185- sys .stdout .write (transcript + overwrite_chars + '\r ' )
186- sys .stdout .flush ()
187-
188- num_chars_printed = len (transcript )
189- else :
190- print (transcript + overwrite_chars )
191-
192- # Exit recognition if any of the transcribed phrases could be
193- # one of our keywords.
194- if re .search (r'\b(exit|quit)\b' , transcript , re .I ):
195- print ('Exiting..' )
196- break
116+ chunk = get_data (self ._bytes_per_chunk )
117+
118+ def _stream_from_file (self , audio_src ):
119+ with open (audio_src , 'rb' ) as f :
120+ for chunk in self ._delayed (
121+ lambda b_per_chunk : f .read (b_per_chunk )):
122+ yield chunk
123+
124+ # Continue sending silence - 10s worth
125+ trailing_silence = six .StringIO (
126+ b'\0 ' * self ._bytes_per_second * 10 )
127+ for chunk in self ._delayed (trailing_silence .read ):
128+ yield chunk
129+
130+ def _thread (self ):
131+ for chunk in self ._stream_from_file (self ._audio_src ):
132+ self ._fill_buffer (chunk )
133+ self ._fill_buffer (None )
134+
135+ def __enter__ (self ):
136+ self .closed = False
137+
138+ threading .Thread (target = self ._thread ).start ()
139+
140+ return self
141+
142+ def __exit__ (self , type , value , traceback ):
143+ self .closed = True
197144
198- num_chars_printed = 0
199145
146+ def _record_keeper (responses , stream ):
147+ """Calls the stream's on_transcribe callback for each final response.
148+
149+ Args:
150+ responses - a generator of responses. The responses must already be
151+ filtered for ones with results and alternatives.
152+ stream - a ResumableMicrophoneStream.
153+ """
154+ for r in responses :
155+ result = r .results [0 ]
156+ if result .is_final :
157+ top_alternative = result .alternatives [0 ]
200158 # Keep track of what transcripts we've received, so we can resume
201159 # intelligently when we hit the deadline
202160 stream .on_transcribe (duration_to_secs (
203161 top_alternative .words [- 1 ].end_time ))
162+ yield r
163+
204164
165+ def listen_print_loop (responses , stream ):
166+ """Iterates through server responses and prints them.
205167
206- def main ():
168+ Same as in transcribe_streaming_mic, but keeps track of when a sent
169+ audio_chunk has been transcribed.
170+ """
171+ with_results = (r for r in responses if (
172+ r .results and r .results [0 ].alternatives ))
173+ transcribe_streaming_mic .listen_print_loop (
174+ _record_keeper (with_results , stream ))
175+
176+
177+ def main (sample_rate , audio_src ):
207178 # See http://g.co/cloud/speech/docs/languages
208179 # for a list of supported languages.
209180 language_code = 'en-US' # a BCP-47 language tag
210181
211182 client = speech .SpeechClient ()
212183 config = types .RecognitionConfig (
213184 encoding = enums .RecognitionConfig .AudioEncoding .LINEAR16 ,
214- sample_rate_hertz = RATE ,
185+ sample_rate_hertz = sample_rate ,
215186 language_code = language_code ,
216187 max_alternatives = 1 ,
217188 enable_word_time_offsets = True )
218189 streaming_config = types .StreamingRecognitionConfig (
219190 config = config ,
220191 interim_results = True )
221192
222- with MicrophoneStream (RATE , CHUNK ) as stream :
193+ if audio_src :
194+ mic_manager = SimulatedMicrophoneStream (
195+ audio_src , sample_rate , int (sample_rate / 10 ))
196+ else :
197+ mic_manager = ResumableMicrophoneStream (
198+ sample_rate , int (sample_rate / 10 ))
199+
200+ with mic_manager as stream :
223201 resume = False
224202 while True :
225203 audio_generator = stream .generator (resume = resume )
226204 requests = (types .StreamingRecognizeRequest (audio_content = content )
227205 for content in audio_generator )
228206
229- responses = client .streaming_recognize (
230- streaming_config , requests ,
231- options = gax .CallOptions (timeout = (60 * 4 )))
207+ responses = client .streaming_recognize (streaming_config , requests )
232208
233209 try :
234210 # Now, put the transcription responses to use.
235211 listen_print_loop (responses , stream )
236212 break
237- except grpc .RpcError , e : # TODO: wrong exception
238- if e .code () != grpc .StatusCode .INVALID_ARGUMENT :
213+ except grpc .RpcError , e :
214+ if e .code () not in (grpc .StatusCode .INVALID_ARGUMENT ,
215+ grpc .StatusCode .OUT_OF_RANGE ):
239216 raise
240-
241217 details = e .details ()
242- if 'deadline too short' not in details :
243- raise
218+ if e .code () == grpc .StatusCode .INVALID_ARGUMENT :
219+ if 'deadline too short' not in details :
220+ raise
221+ else :
222+ if 'maximum allowed stream duration' not in details :
223+ raise
244224
245225 print ('Resuming..' )
246226 resume = True
247227
248228
249229if __name__ == '__main__' :
250- main ()
230+ parser = argparse .ArgumentParser (
231+ description = __doc__ ,
232+ formatter_class = argparse .RawDescriptionHelpFormatter )
233+ parser .add_argument ('--rate' , default = 16000 , help = 'Sample rate.' , type = int )
234+ parser .add_argument ('--audio_src' , help = 'File to simulate streaming of.' )
235+ args = parser .parse_args ()
236+ main (args .rate , args .audio_src )
0 commit comments