# -*- encoding:utf-8 -*-
import pyaudio
import hashlib
from hashlib import sha1
import hmac
import base64
from socket import *
import json, time, threading
from websocket import create_connection
import websocket
from urllib.parse import quote
import logging
logging.basicConfig()
base_url = "ws://rtasr.xfyun.cn/v1/ws"
app_id = "在讯飞控制台获取"
api_key = "在讯飞控制台获取"
file_path = "./test_1.pcm"
end_tag = "{\"end\": true}"
class Client():
    def __init__(self):
        # 生成鉴权参数
        ts = str(int(time.time()))
        tmp = app_id + ts
        hl = hashlib.md5()
        hl.update(tmp.encode(encoding='utf-8'))
        b_hl = bytes(hl.hexdigest(), encoding='utf-8')
        b_api_key = bytes(api_key, encoding='utf-8')
        my_sign = hmac.new(b_api_key, b_hl, sha1).digest()
        signa = base64.b64encode(my_sign)
        self.ws = create_connection(base_url + "?appid=" + app_id + "&ts=" + ts + "&signa=" + quote(signa))
        self.trecv = threading.Thread(target=self.recv)
        self.trecv.start()
    def start(self):
        FORMAT = pyaudio.paInt16
        CHANNELS = 1
        RATE = 16000  # 采样率
        CHUNK = 640  # 记录帧数
        RECORD_SECONDS = 30000  # 记录秒数
        # 调用PyAudio模块录音
        p = pyaudio.PyAudio()
        stream = p.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        frames_per_buffer=CHUNK)
        print("开始录音...")
        while 1:  # 死循环一直录音
            data = stream.read(CHUNK)
            self.ws.send(data)  # 将数据发送给科大讯飞
            time.sleep(0.04)  # 发送太快可能导致错误,设置延时
        # stream.stop_stream()
        # stream.close()
        # p.terminate()
    def recv(self):
        try:
            while self.ws.connected:
                result = str(self.ws.recv())
                if len(result) == 0:
                    print("receive result end")
                    break
                result_dict = json.loads(result)
                # 解析结果
                if result_dict["action"] == "started":
                    print("handshake success, result: " + result)
                if result_dict["action"] == "result":  # 返回的结果
                    dic = json.loads(result_dict['data'])
                    x = ''
                    for i in dic['cn']['st']['rt']:
                        for g in i['ws']:
                            x += g['cw'][0]['w']
                    print(x)
                if result_dict["action"] == "error":  # 出错
                    print("rtasr error: " + result)
                    self.ws.close()
                    return
        except websocket.WebSocketConnectionClosedException:
            print("receive result end")
    def close(self):
        self.ws.close()
        print("connection closed")
if __name__ == '__main__':
    client = Client()
    client.start()
原文:https://www.cnblogs.com/qq752059037/p/11890918.html