llm/tests/test_llm.py
2025-01-24 10:52:46 -08:00

723 lines
23 KiB
Python

from click.testing import CliRunner
import datetime
import llm
from llm.cli import cli
from llm.migrations import migrate
from llm.models import Usage
import json
import os
import pathlib
import pytest
import re
import sqlite_utils
import sys
from ulid import ULID
from unittest import mock
def test_version():
runner = CliRunner()
with runner.isolated_filesystem():
result = runner.invoke(cli, ["--version"])
assert result.exit_code == 0
assert result.output.startswith("cli, version ")
@pytest.fixture
def log_path(user_path):
log_path = str(user_path / "logs.db")
db = sqlite_utils.Database(log_path)
migrate(db)
start = datetime.datetime.now(datetime.timezone.utc)
db["responses"].insert_all(
{
"id": str(ULID()).lower(),
"system": "system",
"prompt": "prompt",
"response": 'response\n```python\nprint("hello word")\n```',
"model": "davinci",
"datetime_utc": (start + datetime.timedelta(seconds=i)).isoformat(),
"conversation_id": "abc123",
"input_tokens": 2,
"output_tokens": 5,
}
for i in range(100)
)
return log_path
datetime_re = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}")
@pytest.mark.parametrize("usage", (False, True))
def test_logs_text(log_path, usage):
runner = CliRunner()
args = ["logs", "-p", str(log_path)]
if usage:
args.append("-u")
result = runner.invoke(cli, args, catch_exceptions=False)
assert result.exit_code == 0
output = result.output
# Replace 2023-08-17T20:53:58 with YYYY-MM-DDTHH:MM:SS
output = datetime_re.sub("YYYY-MM-DDTHH:MM:SS", output)
expected = (
(
"# YYYY-MM-DDTHH:MM:SS conversation: abc123\n\n"
"Model: **davinci**\n\n"
"## Prompt:\n\n"
"prompt\n\n"
"## System:\n\n"
"system\n\n"
"## Response:\n\n"
'response\n```python\nprint("hello word")\n```\n\n'
)
+ ("## Token usage:\n\n2 input, 5 output\n\n" if usage else "")
+ (
"# YYYY-MM-DDTHH:MM:SS conversation: abc123\n\n"
"Model: **davinci**\n\n"
"## Prompt:\n\n"
"prompt\n\n"
"## Response:\n\n"
'response\n```python\nprint("hello word")\n```\n\n'
)
+ ("## Token usage:\n\n2 input, 5 output\n\n" if usage else "")
+ (
"# YYYY-MM-DDTHH:MM:SS conversation: abc123\n\n"
"Model: **davinci**\n\n"
"## Prompt:\n\n"
"prompt\n\n"
"## Response:\n\n"
'response\n```python\nprint("hello word")\n```\n\n'
)
+ ("## Token usage:\n\n2 input, 5 output\n\n" if usage else "")
)
assert output == expected
@pytest.mark.parametrize("n", (None, 0, 2))
def test_logs_json(n, log_path):
"Test that logs command correctly returns requested -n records"
runner = CliRunner()
args = ["logs", "-p", str(log_path), "--json"]
if n is not None:
args.extend(["-n", str(n)])
result = runner.invoke(cli, args, catch_exceptions=False)
assert result.exit_code == 0
logs = json.loads(result.output)
expected_length = 3
if n is not None:
if n == 0:
expected_length = 100
else:
expected_length = n
assert len(logs) == expected_length
@pytest.mark.parametrize(
"args", (["-r"], ["--response"], ["list", "-r"], ["list", "--response"])
)
def test_logs_response_only(args, log_path):
"Test that logs -r/--response returns just the last response"
runner = CliRunner()
result = runner.invoke(cli, ["logs"] + args, catch_exceptions=False)
assert result.exit_code == 0
assert result.output == 'response\n```python\nprint("hello word")\n```\n'
@pytest.mark.parametrize(
"args",
(
["-x"],
["--extract"],
["list", "-x"],
["list", "--extract"],
# Using -xr together should have same effect as just -x
["-xr"],
["-x", "-r"],
["--extract", "--response"],
),
)
def test_logs_extract_first_code(args, log_path):
"Test that logs -x/--extract returns the first code block"
runner = CliRunner()
result = runner.invoke(cli, ["logs"] + args, catch_exceptions=False)
assert result.exit_code == 0
assert result.output == 'print("hello word")\n\n'
@pytest.mark.parametrize(
"args",
(
["--xl"],
["--extract-last"],
["list", "--xl"],
["list", "--extract-last"],
["--xl", "-r"],
["-x", "--xl"],
),
)
def test_logs_extract_last_code(args, log_path):
"Test that logs --xl/--extract-last returns the last code block"
runner = CliRunner()
result = runner.invoke(cli, ["logs"] + args, catch_exceptions=False)
assert result.exit_code == 0
assert result.output == 'print("hello word")\n\n'
@pytest.mark.xfail(sys.platform == "win32", reason="Expected to fail on Windows")
@pytest.mark.parametrize("env", ({}, {"LLM_USER_PATH": "/tmp/llm-user-path"}))
def test_logs_path(monkeypatch, env, user_path):
for key, value in env.items():
monkeypatch.setenv(key, value)
runner = CliRunner()
result = runner.invoke(cli, ["logs", "path"])
assert result.exit_code == 0
if env:
expected = env["LLM_USER_PATH"] + "/logs.db"
else:
expected = str(user_path) + "/logs.db"
assert result.output.strip() == expected
@pytest.mark.parametrize("model", ("davinci", "curie"))
def test_logs_filtered(user_path, model):
log_path = str(user_path / "logs.db")
db = sqlite_utils.Database(log_path)
migrate(db)
db["responses"].insert_all(
{
"id": str(ULID()).lower(),
"system": "system",
"prompt": "prompt",
"response": "response",
"model": "davinci" if i % 2 == 0 else "curie",
}
for i in range(100)
)
runner = CliRunner()
result = runner.invoke(cli, ["logs", "list", "-m", model, "--json"])
assert result.exit_code == 0
records = json.loads(result.output.strip())
assert all(record["model"] == model for record in records)
@pytest.mark.parametrize(
"query,extra_args,expected",
(
# With no search term order should be by datetime
("", [], ["doc1", "doc2", "doc3"]),
# With a search it's order by rank instead
("llama", [], ["doc1", "doc3"]),
("alpaca", [], ["doc2"]),
# Model filter should work too
("llama", ["-m", "davinci"], ["doc1", "doc3"]),
("llama", ["-m", "davinci2"], []),
),
)
def test_logs_search(user_path, query, extra_args, expected):
log_path = str(user_path / "logs.db")
db = sqlite_utils.Database(log_path)
migrate(db)
def _insert(id, text):
db["responses"].insert(
{
"id": id,
"system": "system",
"prompt": text,
"response": "response",
"model": "davinci",
}
)
_insert("doc1", "llama")
_insert("doc2", "alpaca")
_insert("doc3", "llama llama")
runner = CliRunner()
result = runner.invoke(cli, ["logs", "list", "-q", query, "--json"] + extra_args)
assert result.exit_code == 0
records = json.loads(result.output.strip())
assert [record["id"] for record in records] == expected
def test_llm_prompt_creates_log_database(mocked_openai_chat, tmpdir, monkeypatch):
user_path = tmpdir / "user"
monkeypatch.setenv("LLM_USER_PATH", str(user_path))
runner = CliRunner()
result = runner.invoke(
cli,
["three names \nfor a pet pelican", "--no-stream", "--key", "x"],
catch_exceptions=False,
)
assert result.exit_code == 0
assert result.output == "Bob, Alice, Eve\n"
# Should have created user_path and put a logs.db in it
assert (user_path / "logs.db").exists()
assert sqlite_utils.Database(str(user_path / "logs.db"))["responses"].count == 1
@mock.patch.dict(os.environ, {"OPENAI_API_KEY": "X"})
@pytest.mark.parametrize("use_stdin", (True, False, "split"))
@pytest.mark.parametrize(
"logs_off,logs_args,should_log",
(
(True, [], False),
(False, [], True),
(False, ["--no-log"], False),
(False, ["--log"], True),
(True, ["-n"], False), # Short for --no-log
(True, ["--log"], True),
),
)
def test_llm_default_prompt(
mocked_openai_chat, use_stdin, user_path, logs_off, logs_args, should_log
):
# Reset the log_path database
log_path = user_path / "logs.db"
log_db = sqlite_utils.Database(str(log_path))
log_db["responses"].delete_where()
logs_off_path = user_path / "logs-off"
if logs_off:
# Turn off logging
assert not logs_off_path.exists()
CliRunner().invoke(cli, ["logs", "off"])
assert logs_off_path.exists()
else:
# Turn on logging
CliRunner().invoke(cli, ["logs", "on"])
assert not logs_off_path.exists()
# Run the prompt
runner = CliRunner()
prompt = "three names \nfor a pet pelican"
input = None
args = ["--no-stream"]
if use_stdin == "split":
input = "three names"
args.append("\nfor a pet pelican")
elif use_stdin:
input = prompt
else:
args.append(prompt)
args += logs_args
result = runner.invoke(cli, args, input=input, catch_exceptions=False)
assert result.exit_code == 0
assert result.output == "Bob, Alice, Eve\n"
last_request = mocked_openai_chat.get_requests()[-1]
assert last_request.headers["Authorization"] == "Bearer X"
# Was it logged?
rows = list(log_db["responses"].rows)
if not should_log:
assert len(rows) == 0
return
assert len(rows) == 1
expected = {
"model": "gpt-4o-mini",
"prompt": "three names \nfor a pet pelican",
"system": None,
"options_json": "{}",
"response": "Bob, Alice, Eve",
}
row = rows[0]
assert expected.items() <= row.items()
assert isinstance(row["duration_ms"], int)
assert isinstance(row["datetime_utc"], str)
assert json.loads(row["prompt_json"]) == {
"messages": [{"role": "user", "content": "three names \nfor a pet pelican"}]
}
assert json.loads(row["response_json"]) == {
"model": "gpt-4o-mini",
"choices": [{"message": {"content": "Bob, Alice, Eve"}}],
}
# Test "llm logs"
log_result = runner.invoke(
cli, ["logs", "-n", "1", "--json"], catch_exceptions=False
)
log_json = json.loads(log_result.output)
# Should have logged correctly:
assert (
log_json[0].items()
>= {
"model": "gpt-4o-mini",
"prompt": "three names \nfor a pet pelican",
"system": None,
"prompt_json": {
"messages": [
{"role": "user", "content": "three names \nfor a pet pelican"}
]
},
"options_json": {},
"response": "Bob, Alice, Eve",
"response_json": {
"model": "gpt-4o-mini",
"choices": [{"message": {"content": "Bob, Alice, Eve"}}],
},
# This doesn't have the \n after three names:
"conversation_name": "three names for a pet pelican",
"conversation_model": "gpt-4o-mini",
}.items()
)
@pytest.mark.parametrize(
"args,expect_just_code",
(
(["-x"], True),
(["--extract"], True),
(["-x", "--async"], True),
(["--extract", "--async"], True),
# Use --no-stream here to ensure it passes test same as -x/--extract cases
(["--no-stream"], False),
),
)
def test_extract_fenced_code(
mocked_openai_chat_returning_fenced_code, args, expect_just_code
):
runner = CliRunner()
result = runner.invoke(
cli,
["-m", "gpt-4o-mini", "--key", "x", "Write code"] + args,
catch_exceptions=False,
)
output = result.output
if expect_just_code:
assert "```" not in output
else:
assert "```" in output
def test_openai_chat_stream(mocked_openai_chat_stream, user_path):
runner = CliRunner()
result = runner.invoke(cli, ["-m", "gpt-3.5-turbo", "--key", "x", "Say hi"])
assert result.exit_code == 0
assert result.output == "Hi.\n"
def test_openai_completion(mocked_openai_completion, user_path):
log_path = user_path / "logs.db"
log_db = sqlite_utils.Database(str(log_path))
log_db["responses"].delete_where()
runner = CliRunner()
result = runner.invoke(
cli,
[
"-m",
"gpt-3.5-turbo-instruct",
"Say this is a test",
"--no-stream",
"--key",
"x",
],
catch_exceptions=False,
)
assert result.exit_code == 0
assert result.output == "\n\nThis is indeed a test\n"
# Should have requested 256 tokens
last_request = mocked_openai_completion.get_requests()[-1]
assert json.loads(last_request.content) == {
"model": "gpt-3.5-turbo-instruct",
"prompt": "Say this is a test",
"stream": False,
"max_tokens": 256,
}
# Check it was logged
rows = list(log_db["responses"].rows)
assert len(rows) == 1
expected = {
"model": "gpt-3.5-turbo-instruct",
"prompt": "Say this is a test",
"system": None,
"prompt_json": '{"messages": ["Say this is a test"]}',
"options_json": "{}",
"response": "\n\nThis is indeed a test",
}
row = rows[0]
assert expected.items() <= row.items()
def test_openai_completion_system_prompt_error():
runner = CliRunner()
result = runner.invoke(
cli,
[
"-m",
"gpt-3.5-turbo-instruct",
"Say this is a test",
"--no-stream",
"--key",
"x",
"--system",
"system prompts not allowed",
],
catch_exceptions=False,
)
assert result.exit_code == 1
assert (
result.output
== "Error: System prompts are not supported for OpenAI completion models\n"
)
def test_openai_completion_logprobs_stream(
mocked_openai_completion_logprobs_stream, user_path
):
log_path = user_path / "logs.db"
log_db = sqlite_utils.Database(str(log_path))
log_db["responses"].delete_where()
runner = CliRunner()
args = [
"-m",
"gpt-3.5-turbo-instruct",
"Say hi",
"-o",
"logprobs",
"2",
"--key",
"x",
]
result = runner.invoke(cli, args, catch_exceptions=False)
assert result.exit_code == 0
assert result.output == "\n\nHi.\n"
rows = list(log_db["responses"].rows)
assert len(rows) == 1
row = rows[0]
assert json.loads(row["response_json"]) == {
"content": "\n\nHi.",
"logprobs": [
{"text": "\n\n", "top_logprobs": [{"\n\n": -0.6, "\n": -1.9}]},
{"text": "Hi", "top_logprobs": [{"Hi": -1.1, "Hello": -0.7}]},
{"text": ".", "top_logprobs": [{".": -1.1, "!": -0.9}]},
{"text": "", "top_logprobs": []},
],
"id": "cmpl-80MdSaou7NnPuff5ZyRMysWBmgSPS",
"object": "text_completion",
"model": "gpt-3.5-turbo-instruct",
"created": 1695097702,
}
def test_openai_completion_logprobs_nostream(
mocked_openai_completion_logprobs, user_path
):
log_path = user_path / "logs.db"
log_db = sqlite_utils.Database(str(log_path))
log_db["responses"].delete_where()
runner = CliRunner()
args = [
"-m",
"gpt-3.5-turbo-instruct",
"Say hi",
"-o",
"logprobs",
"2",
"--key",
"x",
"--no-stream",
]
result = runner.invoke(cli, args, catch_exceptions=False)
assert result.exit_code == 0
assert result.output == "\n\nHi.\n"
rows = list(log_db["responses"].rows)
assert len(rows) == 1
row = rows[0]
assert json.loads(row["response_json"]) == {
"choices": [
{
"finish_reason": "stop",
"index": 0,
"logprobs": {
"text_offset": [16, 18, 20],
"token_logprobs": [-0.6, -1.1, -0.9],
"tokens": ["\n\n", "Hi", "1"],
"top_logprobs": [
{"\n": -1.9, "\n\n": -0.6},
{"Hello": -0.7, "Hi": -1.1},
{"!": -1.1, ".": -0.9},
],
},
"text": "\n\nHi.",
}
],
"created": 1695097747,
"id": "cmpl-80MeBfKJutM0uMNJkRrebJLeP3bxL",
"model": "gpt-3.5-turbo-instruct",
"object": "text_completion",
"usage": {"completion_tokens": 3, "prompt_tokens": 5, "total_tokens": 8},
}
EXTRA_MODELS_YAML = """
- model_id: orca
model_name: orca-mini-3b
api_base: "http://localai.localhost"
- model_id: completion-babbage
model_name: babbage
api_base: "http://localai.localhost"
completion: 1
"""
def test_openai_localai_configuration(mocked_localai, user_path):
log_path = user_path / "logs.db"
sqlite_utils.Database(str(log_path))
# Write the configuration file
config_path = user_path / "extra-openai-models.yaml"
config_path.write_text(EXTRA_MODELS_YAML, "utf-8")
# Run the prompt
runner = CliRunner()
prompt = "three names \nfor a pet pelican"
result = runner.invoke(cli, ["--no-stream", "--model", "orca", prompt])
assert result.exit_code == 0
assert result.output == "Bob, Alice, Eve\n"
last_request = mocked_localai.get_requests()[-1]
assert json.loads(last_request.content) == {
"model": "orca-mini-3b",
"messages": [{"role": "user", "content": "three names \nfor a pet pelican"}],
"stream": False,
}
# And check the completion model too
result2 = runner.invoke(cli, ["--no-stream", "--model", "completion-babbage", "hi"])
assert result2.exit_code == 0
assert result2.output == "Hello\n"
last_request2 = mocked_localai.get_requests()[-1]
assert json.loads(last_request2.content) == {
"model": "babbage",
"prompt": "hi",
"stream": False,
}
EXPECTED_OPTIONS = """
OpenAI Chat: gpt-4o (aliases: 4o)
Options:
temperature: float
What sampling temperature to use, between 0 and 2. Higher values like
0.8 will make the output more random, while lower values like 0.2 will
make it more focused and deterministic.
max_tokens: int
Maximum number of tokens to generate.
top_p: float
An alternative to sampling with temperature, called nucleus sampling,
where the model considers the results of the tokens with top_p
probability mass. So 0.1 means only the tokens comprising the top 10%
probability mass are considered. Recommended to use top_p or
temperature but not both.
frequency_penalty: float
Number between -2.0 and 2.0. Positive values penalize new tokens based
on their existing frequency in the text so far, decreasing the model's
likelihood to repeat the same line verbatim.
presence_penalty: float
Number between -2.0 and 2.0. Positive values penalize new tokens based
on whether they appear in the text so far, increasing the model's
likelihood to talk about new topics.
stop: str
A string where the API will stop generating further tokens.
logit_bias: dict, str
Modify the likelihood of specified tokens appearing in the completion.
Pass a JSON string like '{"1712":-100, "892":-100, "1489":-100}'
seed: int
Integer seed to attempt to sample deterministically
json_object: boolean
Output a valid JSON object {...}. Prompt must mention JSON.
Attachment types:
image/gif, image/jpeg, image/png, image/webp
"""
def test_llm_models_options(user_path):
runner = CliRunner()
result = runner.invoke(cli, ["models", "--options"], catch_exceptions=False)
assert result.exit_code == 0
assert EXPECTED_OPTIONS.strip() in result.output
assert "AsyncMockModel: mock" not in result.output
def test_llm_models_async(user_path):
runner = CliRunner()
result = runner.invoke(cli, ["models", "--async"], catch_exceptions=False)
assert result.exit_code == 0
assert "AsyncMockModel: mock" in result.output
@pytest.mark.parametrize("option", ("-q", "--query"))
def test_llm_models_query(user_path, option):
runner = CliRunner()
result = runner.invoke(cli, ["models", option, "mockmodel"], catch_exceptions=False)
assert result.exit_code == 0
assert result.output == "MockModel: mock\n"
def test_llm_user_dir(tmpdir, monkeypatch):
user_dir = str(tmpdir / "u")
monkeypatch.setenv("LLM_USER_PATH", user_dir)
assert not os.path.exists(user_dir)
user_dir2 = llm.user_dir()
assert user_dir == str(user_dir2)
assert os.path.exists(user_dir)
def test_model_defaults(tmpdir, monkeypatch):
user_dir = str(tmpdir / "u")
monkeypatch.setenv("LLM_USER_PATH", user_dir)
config_path = pathlib.Path(user_dir) / "default_model.txt"
assert not config_path.exists()
assert llm.get_default_model() == "gpt-4o-mini"
assert llm.get_model().model_id == "gpt-4o-mini"
llm.set_default_model("gpt-4o")
assert config_path.exists()
assert llm.get_default_model() == "gpt-4o"
assert llm.get_model().model_id == "gpt-4o"
def test_get_models():
models = llm.get_models()
assert all(isinstance(model, llm.Model) for model in models)
model_ids = [model.model_id for model in models]
assert "gpt-4o-mini" in model_ids
# Ensure no model_ids are duplicated
# https://github.com/simonw/llm/issues/667
assert len(model_ids) == len(set(model_ids))
def test_get_async_models():
models = llm.get_async_models()
assert all(isinstance(model, llm.AsyncModel) for model in models)
model_ids = [model.model_id for model in models]
assert "gpt-4o-mini" in model_ids
def test_mock_model(mock_model):
mock_model.enqueue(["hello world"])
mock_model.enqueue(["second"])
model = llm.get_model("mock")
response = model.prompt(prompt="hello")
assert response.text() == "hello world"
assert str(response) == "hello world"
assert model.history[0][0].prompt == "hello"
assert response.usage() == Usage(input=1, output=1, details=None)
response2 = model.prompt(prompt="hello again")
assert response2.text() == "second"
assert response2.usage() == Usage(input=2, output=1, details=None)
def test_sync_on_done(mock_model):
mock_model.enqueue(["hello world"])
model = llm.get_model("mock")
response = model.prompt(prompt="hello")
caught = []
def done(response):
caught.append(response)
response.on_done(done)
assert len(caught) == 0
str(response)
assert len(caught) == 1