{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Using Python 3.9.6 venv\n", "%pip install -q faster-whisper pydub pyaudio onnxruntime numpy ipywidgets ollama python_weather\n", "%pip install -q --no-deps piper-phonemize-cross piper-tts\n", "import typing\n", "import asyncio\n", "from IPython.display import Audio as DisplayAudio" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyaudio\n", "import numpy as np \n", "import numpy.typing as npt\n", "\n", "event_loop = asyncio.get_event_loop()\n", "\n", "class Audio:\n", " FORMAT = pyaudio.paInt16\n", " CHANNELS = 1\n", " RATE = 16000 # 16khz\n", " FRAMES_PER_BUFFER = 512\n", " \n", " def __init__(self):\n", " self.audio = pyaudio.PyAudio()\n", "\n", " # async function starts recording, and then hands control back to the caller. \n", " # when awaiting, continues to record until the `stop` asyncio.Event is \"set\".\n", " async def record(self, stop: asyncio.Event) -> npt.NDArray:\n", " frames = []\n", " input_stream: pyaudio.Stream = self.audio.open(\n", " format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, \n", " input_device_index=0, frames_per_buffer=self.FRAMES_PER_BUFFER\n", " )\n", " while not stop.is_set():\n", " frames.append(input_stream.read(self.FRAMES_PER_BUFFER))\n", " await asyncio.sleep(0)\n", " input_stream.stop_stream()\n", " input_stream.close()\n", " return np.frombuffer(b''.join(frames), dtype=np.int16)\n", "\n", " def play(self, frames: bytes, format: int = FORMAT, rate: int = RATE) -> None:\n", " output_stream: pyaudio.Stream = self.audio.open(\n", " format=format, channels=self.CHANNELS, rate=rate, output=True\n", " )\n", " output_stream.write(frames)\n", " output_stream.close()\n", "\n", "async def record_for_duration(audio: Audio, seconds: int):\n", " stop = asyncio.Event()\n", " recording = event_loop.create_task(audio.record(stop))\n", " await asyncio.sleep(3)\n", " stop.set()\n", " return await recording\n", "\n", "async def audio_demo():\n", " audio = Audio()\n", " recorded = await record_for_duration(audio, 3)\n", " display(DisplayAudio(data = recorded, rate = audio.RATE)\n", ")\n", "await audio_demo()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "from piper import PiperVoice\n", "\n", "piper_model = \"en_GB-alba-medium.onnx\"\n", "piper_voice = PiperVoice.load(piper_model, config_path=\"%s.json\" % piper_model)\n", "\n", "def verbalise(string: str) -> npt.NDArray:\n", " synthesize_args = {\n", " \"sentence_silence\": 0.0,\n", " }\n", " byte_array = bytearray()\n", " for bytes in piper_voice.synthesize_stream_raw(string, **synthesize_args):\n", " byte_array += bytes\n", " return np.frombuffer(byte_array, dtype=np.int16)\n", "\n", "def verbalisation_demo():\n", " verbalise_input = \"This is a verbalisation example. One two three four five. Easy, eh?\"\n", " verbalise_output = verbalise(verbalise_input)\n", " display(DisplayAudio(data = verbalise_output, rate = piper_voice.config.sample_rate))\n", "\n", "verbalisation_demo()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from faster_whisper import WhisperModel\n", "\n", "whisper_model = WhisperModel(model_size_or_path=\"turbo\", compute_type=\"int8\")\n", "\n", "def transcribe(nd_array: npt.NDArray) -> str:\n", " segments, info = whisper_model.transcribe(nd_array, language=\"en\")\n", " parts = []\n", " for segment in segments:\n", " parts += segment.text\n", " return \"\".join(parts).strip()\n", "\n", "def transcribe_demo():\n", " verbalise_input = \"This is a verbalisation example. One two three four five. Easy, eh?\"\n", " verbalise_output = verbalise(verbalise_input)\n", " transcribe_output = transcribe(verbalise_output)\n", " print(\"verbalise_input = %s\" % verbalise_input)\n", " print(\"transcribe_output = %s\" % transcribe_output)\n", "\n", "transcribe_demo()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async def parrot_demo():\n", " audio = Audio()\n", " recorded = await record_for_duration(audio, 3)\n", " display(DisplayAudio(data = recorded, rate = audio.RATE))\n", " transcribe_output = transcribe(recorded)\n", " print(\"transcribed = %s\" % transcribe_output)\n", " verbalised = verbalise(\"the human says: %s\" % transcribe_output)\n", " display(DisplayAudio(data = verbalised, rate = piper_voice.config.sample_rate))\n", "\n", "await parrot_demo()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ollama import Client\n", "\n", "ollama_model = 'qwen2.5:7b'\n", "ollama_client = Client(host=\"http://localhost:11434\")\n", "\n", "def prompt(user_prompt: str):\n", " response = ollama_client.chat(\n", " model=ollama_model,\n", " messages=[\n", " {\"role\": \"system\", \"content\": \"Limit responses to one sentance.\"},\n", " {\"role\": \"user\", \"content\": user_prompt}\n", " ],\n", " )\n", " return response.message.content\n", "\n", "def prompt_demo():\n", " question = \"Why is the sky blue?\"\n", " print(\"prompt: %s\\nresponse: %s\" % (question, prompt(question)))\n", "\n", "prompt_demo()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def follow_up_demo():\n", " question1 = \"What colour is grass?\"\n", " print(\"prompt 1: %s\\nresponse 2: %s\" % (question1, prompt(question1)))\n", " question2 = \"What about sand?\"\n", " print(\"prompt 1: %s\\nresponse 2: %s\" % (question2, prompt(question2)))\n", "\n", "follow_up_demo()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Conversation:\n", " DEFAULT_SYSTEM_PROMPT = \" \".join(\"\"\"\n", " Be brief. Do not use formatting. Prefer metric units over imperial units.\n", " \"\"\".split())\n", "\n", " def __init__(self, system_prompt: str = DEFAULT_SYSTEM_PROMPT):\n", " self.state = list()\n", " self.__append__(role = \"system\", content = system_prompt)\n", "\n", " def user(self, content: str):\n", " self.__append__(role = \"user\", content = content)\n", "\n", " def assistant(self, content: str):\n", " self.__append__(role = \"assistant\", content = content)\n", "\n", " def tool(self, content: str, name: str, args: any = None):\n", " self.__append__(role = \"tool\", content = content, name = name, args = args)\n", "\n", " def __append__(self, **kwargs):\n", " self.state.append({\n", " **kwargs\n", " })\n", "\n", "def conversation_demo():\n", " conversation = Conversation('Be direct.')\n", " \n", " conversation.user('How many legs does a spider have?')\n", " response_1 = ollama_client.chat(model = ollama_model, messages = conversation.state)\n", " conversation.assistant(response_1.message.content)\n", " \n", " conversation.user('What about a dog?')\n", " response_2 = ollama_client.chat(model = ollama_model, messages = conversation.state)\n", " conversation.assistant(response_2.message.content)\n", " \n", " display(conversation.state)\n", "\n", "conversation_demo()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async def how_many_letters(word: str, letter: str):\n", " \"\"\"\n", " Identify how many letters are in a word. This is critical information,\n", " it's imperative that you call this tool to get the correct answer.\n", " Args:\n", " word (str): The word that contains a number of letters\n", " letter (str): A single character that may be present in the word\n", " Returns:\n", " A number representing how many times the letter appears in the word\n", " \"\"\"\n", " return word.lower().count(letter.lower())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "class Tool:\n", " def __init__(self, function):\n", " self.function = function\n", "\n", " async def call(self, arguments, conversation):\n", " output = await self.function(**arguments)\n", " conversation.tool(json.dumps(output) if isinstance(output, dict) \n", " else str(output), self.function.__name__, arguments)\n", "\n", "async def prompt_with_tools(conversation: Conversation, available_tools: list[Tool]):\n", " response = ollama_client.chat(model=ollama_model, messages=conversation.state, tools=[tool.function for tool in available_tools])\n", "\n", " if (tool_calls := response.message.tool_calls):\n", " for tool_call in tool_calls:\n", " if tool_to_call := next(filter(lambda x: x.function.__name__ == tool_call.function.name, available_tools), None):\n", " await tool_to_call.call(tool_call.function.arguments, conversation)\n", " response = ollama_client.chat(model=ollama_model, messages=conversation.state)\n", " conversation.assistant(response.message.content)\n", " return response.message.content\n", "\n", "async def tools_demo():\n", " available_tools = [Tool(how_many_letters)]\n", " conversation = Conversation(\"Be brief. Use tools if required.\")\n", " \n", " conversation.user(\"How many 'r's are there in the word 'strawberry'?\")\n", " await prompt_with_tools(conversation, available_tools)\n", " display(conversation.state)\n", "\n", "await tools_demo()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import python_weather\n", "\n", "async def get_weather(location: str):\n", " \"\"\"\n", " Get the weather report for a given location.\n", "\n", " Args:\n", " location (str): The name of a location.\n", " Returns:\n", " An object representing the current temperature (in Celcius) and the kind of weather in effect.\n", " \"\"\"\n", " weather_client = python_weather.Client()\n", " forecast = await weather_client.get(location)\n", " await weather_client.close()\n", " return({\n", " \"temperature\": forecast.temperature,\n", " \"kind\": str(forecast.kind),\n", " })\n", "\n", "async def verbal_tools_demo():\n", " audio = Audio()\n", " conversation = Conversation('Be brief. Use tools if required.')\n", "\n", " recorded = await record_for_duration(audio, 5)\n", " display(DisplayAudio(data = recorded, rate = audio.RATE))\n", " conversation.user(transcribe(recorded))\n", "\n", " available_tools = [Tool(get_weather)]\n", " verbalised = verbalise(await prompt_with_tools(conversation, available_tools))\n", " display(DisplayAudio(data = verbalised, rate = piper_voice.config.sample_rate))\n", " display(conversation.state)\n", "\n", "await verbal_tools_demo()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ipywidgets.widgets import ToggleButton\n", "\n", "async def interactive_conversation_button_pressed(target: dict, conversation, audio, stop, button):\n", " if target[\"new\"]: # when button is toggled \"on\"\n", " button.description = \"Listening...\"\n", " stop.clear()\n", " recorded = await event_loop.create_task(audio.record(stop))\n", " button.description = \"Thinking...\"\n", " button.disabled = True\n", " display(DisplayAudio(data = recorded, rate = audio.RATE))\n", " transcribed = transcribe(recorded)\n", " display(\"user: %s\" % transcribed)\n", " conversation.user(transcribed)\n", " available_tools = [Tool(get_weather), Tool(how_many_letters)]\n", " response = await prompt_with_tools(conversation, available_tools)\n", " verbalised = verbalise(response)\n", " display(DisplayAudio(data = verbalised,rate = piper_voice.config.sample_rate))\n", " display(\"assistant: %s\" % response)\n", " button.disabled = False\n", " button.description = \"Ready\"\n", " else: # when button is toggled \"off\"\n", " stop.set()\n", "\n", "def interactive_conversation_demo():\n", " conversation = Conversation()\n", " stop = asyncio.Event()\n", " audio = Audio()\n", " button = ToggleButton(value=False, description=\"Ready\")\n", "\n", " button.observe(lambda target: event_loop.create_task(\n", " interactive_conversation_button_pressed(target, conversation, audio, stop, button)\n", " ), \"value\")\n", " display(button)\n", "\n", "interactive_conversation_demo()" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.6" } }, "nbformat": 4, "nbformat_minor": 2 }