h = httplib.HTTPSConnection(host, port)
h.set_debuglevel(0)
headers = {
"Content-Type": "multipart/form-data; boundary=%s" % (boundary,),
"Connection": "Keep-Alive",
}
h.request('POST', uri, body, headers)
res = h.getresponse()
#print res.read()
data = """MIME-Version: 1.0
Content-Type: multipart/mixed; boundary=--Nuance_NMSP_vutc5w1XobDdefsYG3wq
""" + res.read()
msg = email.message_from_string(data)
#print msg
for index, part in enumerate(msg.walk(), start=1):
content_type = part.get_content_type()
#print content_type
payload = part.get_payload()
print res.getheaders()
if content_type == "audio/x-wav" and len(payload):
with open('output.pcm'.format(index), 'wb') as f_pcm:
print f_pcm.write(payload)
I am sending a request to the server and the server is sending a response back to the client as above in the form of .txt
. The .txt
contains an information header on the top and header at the bottom, which is of text format and the rest is binary.
How to write and parse the text and write it into a separate .txt
file, and the binary into .pcm
file?
The following kind of approach is recommended using Python's email
library to try and decode the MIME:
import ssl
import os
import json
import email
import uuid
from io import BytesIO
import httplib
input_folder = os.path.dirname(os.path.abspath(__file__))
output_folder = os.path.join(input_folder, 'output')
def get_filename(ext, base, sub_folder):
filename = '{}.{}'.format(base, ext)
return os.path.join(output_folder, sub_folder, filename)
def compare_files(file1, file2):
with open(file1, 'rb') as f_file1, open(file2, 'rb') as f_file2:
if f_file1.read() == f_file2.read():
print 'Same:\n {}\n {}'.format(file1, file2)
else:
print 'Different:\n {}\n {}'.format(file1, file2)
class Part(object):
"""Represent a part in a multipart messsage"""
def __init__(self, name, contentType, data, paramName=None):
super(Part, self).__init__()
self.name = name
self.paramName = paramName
self.contentType = contentType
self.data = data
def encode(self):
body = BytesIO()
if self.paramName:
body.write('Content-Disposition: form-data; name="%s"; paramName="%s"\r\n' % (self.name, self.paramName))
else:
body.write('Content-Disposition: form-data; name="%s"\r\n' % (self.name,))
body.write("Content-Type: %s\r\n" % (self.contentType,))
body.write("\r\n")
body.write(self.data)
return body.getvalue()
class Request(object):
"""A handy class for creating a request"""
def __init__(self):
super(Request, self).__init__()
self.parameters = []
def add_json_parameter(self, name, paramName, data):
self.parameters.append(Part(name=name, paramName=paramName, contentType="application/json; charset=utf-8", data=data))
def add_audio_parameter(self, name, paramName, data):
self.parameters.append(Part(name=name, paramName=paramName, contentType="audio/x-wav;codec=pcm;bit=16;rate=16000", data=data))
def encode(self):
boundary = uuid.uuid4().hex
body = BytesIO()
for parameter in self.parameters:
body.write("--%s\r\n" % (boundary,))
body.write(parameter.encode())
body.write("\r\n")
body.write("--%s--\r\n" % (boundary,))
return body.getvalue(), boundary
def get_tts(required_text, LNG):
required_text = required_text.strip()
output_filename = "".join([x if x.isalnum() else "_" for x in required_text[:80]])
host = "mtldev08.nuance.com"
port = 443
uri = "/NmspServlet/"
if LNG == "ENG":
parameters = {'lang' : 'eng_GBR', 'location' : '47.4925, 19.0513'}
if LNG == "GED":
parameters = {'lang' : 'deu-DEU', 'location' : '48.396231, 9.972909'}
RequestData = """{
"appKey": "9c9fa7201e90d3d96718bc3f36ce4cfe1781f2e82f4e5792996623b3b474fee2c77699eb5354f2136063e1ff19c378f0f6dd984471a38ca5c393801bffb062d6",
"appId": "NMDPTRIAL_AutomotiveTesting_NCS61HTTP",
"uId": "Alexander",
"inCodec": "PCM_16_8K",
"outCodec": "PCM_16_8K",
"cmdName": "NVC_TTS_CMD",
"appName": "Python",
"appVersion": "1",
"language": "%(lang)s",
"carrier": "carrier",
"deviceModel": "deviceModel",
"cmdDict": {
"tts_voice": "Serena",
"tts_language": "%(lang)s",
"locale": "canada",
"application_name": "Testing Python Script",
"organization_id": "NUANCE",
"phone_OS": "4.0",
"phone_network": "wifi",
"audio_source": "SpeakerAndMicrophone",
"location": "%(location)s",
"application_session_id": "1234567890",
"utterance_number": "5",
"ui_langugage": "en",
"phone_submodel": "nmPhone2,1",
"application_state_id": "45"
}
}""" % (parameters)
TEXT_TO_READ = """{
"tts_type": "text"
}"""
TEXT_TO_READ = json.loads(TEXT_TO_READ)
TEXT_TO_READ["tts_input"] = required_text
TEXT_TO_READ = json.dumps(TEXT_TO_READ)
request = Request()
request.add_json_parameter("RequestData", None, RequestData)
request.add_json_parameter("TtsParameter", "TEXT_TO_READ", TEXT_TO_READ)
#ssl._create_default_https_context = ssl._create_unverified_context
body, boundary = request.encode()
h = httplib.HTTPSConnection(host, port)
#h.set_debuglevel(1)
headers = {
"Content-Type": "multipart/form-data; boundary=%s" % (boundary,),
"Connection": "Keep-Alive",
}
h.request('POST', uri, body, headers)
res = h.getresponse()
data = """MIME-Version: 1.0
Content-Type: multipart/mixed; boundary=--Nuance_NMSP_vutc5w1XobDdefsYG3wq
""" + res.read()
msg = email.message_from_string(data)
for part in msg.walk():
content_type = part.get_content_type()
payload = part.get_payload()
if content_type == "audio/x-wav" and len(payload):
ref_filename = get_filename('pcm', output_filename + '_ref', LNG)
if not os.path.exists(ref_filename):
with open(ref_filename, 'wb') as f_pcm:
f_pcm.write(payload)
cur_filename = get_filename('pcm', output_filename, LNG)
with open(cur_filename, 'wb') as f_pcm:
f_pcm.write(payload)
compare_files(ref_filename, cur_filename)
elif content_type == "application/json":
with open(get_filename('json', output_filename, LNG), 'w') as f_json:
f_json.write(payload)
filename = r'input.txt'
with open(filename) as f_input:
for line in f_input:
LNG, text = line.strip().split('|')
print "Getting {}: {}".format(LNG, text)
get_tts(text, LNG)
This assumes your input.txt
file has the following format:
ENG|I am tired
GED|Ich gehe nach hause
This will produce an output pcm and json file per line of text. It works with multiple files/languages.