I would like to know if there is a better way to convert base64-format audio files into .wav files without storage them on current directory.
The thing is that I get base64-format audio files from user uploading whith a POST request with FastAPI, then I decode them and convert them into .wav files because I need to pass the files over some functions that I created in order to preprocess and transcript the files and those functions use the wave module for .wav files. Due to I created .wav files for nothing more than transcripting them, so I don't need to store them and I finally delete them with os.unlink function.
import fastapi_server.preprocessing_f as pr
app = FastAPI()
class AudioBase64(BaseModel):
audio_name: str = Field(..., min_length=1, example="my-audio")
data_base64: str = Field(..., min_length=1)
@app.post(
path="/upload-base64-audios/",
status_code=status.HTTP_200_OK
)
async def upload_base64_audios(audios: list[AudioBase64] = Body(...)):
model: str = "~/models"
dir_name = os.path.expanduser(model)
output_graph, scorer = pr.resolve_models(dir_name)
model_retval: List[str] = pr.load_model(output_graph, scorer)
all_names: list[str] = []
all_datas: list[str] = []
all_decode: list[str] = []
aggresive = 1
transcriptions: list[str] = []
new_data: list[str] = []
final_data: list[str] = []
header: list[str] = ["audio_name", "transcriptions"]
for i in range(len(audios)):
name = audios[i].audio_name
data = audios[i].data_base64
decode = base64.b64decode(data)
all_names.append(name)
all_datas.append(data)
all_decode.append(decode)
filename = "%s.wav" % name
with open(filename, "wb") as f:
f.write(decode)
cwd = os.getcwd()
files = glob.glob(cwd + "/" + name + ".wav")
segments, sample_rate, audio_length = pr.vad_segment_generator(
files[0], aggresive
)
for k, segment in enumerate(segments):
audio = np.frombuffer(segment, dtype=np.int16)
output = pr.stt(model_retval[0], audio)
transcript = output[0]
transcriptions.append(transcript)
new_data = [all_names[i], transcriptions[i]]
final_data.append(new_data)
dir_files = glob.glob(cwd + "/*.wav")
for file in dir_files:
os.unlink(file)
new_df = pd.DataFrame(final_data, columns=header)
stream = io.StringIO()
new_df.to_csv(stream, index=False)
response: Response = StreamingResponse(
iter([stream.getvalue()]), media_type="text/csv"
)
response.headers["Content-Disposition"] = "attachment; filename=my-file.csv"
return response
As suggested by @martineau,
try writing intermediately to an io.BytesIO
, but after writing, call .seek(0)
to return the steam position to the start, rather than calling getbuffer()
(after writing, the stream position will be at the end, ready for more data)
with io.BytesIO() as buffer:
buffer.write(decode)
buffer.seek(0) # rewind stream
...
segments, sample_rate, audio_length = pr.vad_segment_generator(
buffer, aggresive)