Compare commits

..

1 commit

Author SHA1 Message Date
a033871771
remove unneeded enumerate 2023-09-29 07:53:55 -05:00
34 changed files with 621 additions and 1810 deletions

48
.github/workflows/codeql.yml vendored Normal file
View file

@ -0,0 +1,48 @@
# SPDX-FileCopyrightText: 2022 Jeff Epler
#
# SPDX-License-Identifier: CC0-1.0
name: "CodeQL"
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
schedule:
- cron: "53 3 * * 5"
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ python ]
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install Dependencies (python)
run: pip3 install -r requirements-dev.txt
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
queries: +security-and-quality
- name: Autobuild
uses: github/codeql-action/autobuild@v2
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2
with:
category: "/language:${{ matrix.language }}"

View file

@ -18,12 +18,12 @@ jobs:
GITHUB_CONTEXT: ${{ toJson(github) }}
run: echo "$GITHUB_CONTEXT"
- uses: actions/checkout@v4
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v4
with:
python-version: 3.11
python-version: 3.9
- name: Install deps
run: |

View file

@ -17,10 +17,10 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v3
- name: pre-commit
uses: pre-commit/action@v3.0.1
uses: pre-commit/action@v3.0.0
- name: Make patch
if: failure()
@ -28,7 +28,7 @@ jobs:
- name: Upload patch
if: failure()
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v3
with:
name: patch
path: ~/pre-commit.patch
@ -36,24 +36,21 @@ jobs:
test-release:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v4
with:
python-version: 3.11
python-version: 3.9
- name: install deps
run: pip install -r requirements-dev.txt
- name: check types with mypy
run: make
- name: Build release
run: python -mbuild
- name: Upload artifacts
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v3
with:
name: dist
path: dist/*

2
.gitignore vendored
View file

@ -7,5 +7,3 @@ __pycache__
/build
/dist
/src/chap/__version__.py
/venv
/keys.log

View file

@ -6,29 +6,31 @@ default_language_version:
python: python3
repos:
- repo: https://github.com/psf/black
rev: 23.1.0
hooks:
- id: black
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v4.4.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
exclude: tests
- id: trailing-whitespace
exclude: tests
- repo: https://github.com/codespell-project/codespell
rev: v2.2.6
hooks:
- id: codespell
args: [-w]
- repo: https://github.com/fsfe/reuse-tool
rev: v2.1.0
rev: v1.1.2
hooks:
- id: reuse
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.1.6
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
# Run the linter.
- id: ruff
args: [ --fix, --preview ]
# Run the formatter.
- id: ruff-format
- id: isort
name: isort (python)
args: ['--profile', 'black']
- repo: https://github.com/pycqa/pylint
rev: v2.17.0
hooks:
- id: pylint
additional_dependencies: [click,dataclasses_json,httpx,lorem-text,simple-parsing,'textual>=0.18.0',tiktoken,websockets]
args: ['--source-roots', 'src']

12
.pylintrc Normal file
View file

@ -0,0 +1,12 @@
# SPDX-FileCopyrightText: 2023 Jeff Epler <jepler@gmail.com>
#
# SPDX-License-Identifier: MIT
[MESSAGES CONTROL]
disable=
duplicate-code,
invalid-name,
line-too-long,
missing-class-docstring,
missing-function-docstring,
missing-module-docstring,

View file

@ -1,153 +0,0 @@
<!--
SPDX-FileCopyrightText: 2023 Organization for Ethical Source
SPDX-FileContributor: Alva <alva@alva.email>
SPDX-FileContributor: Bantik <corey@idolhands.com>
SPDX-FileContributor: CoralineAda <coraline@idolhands.com>
SPDX-FileContributor: Dan Gillean <fiver-watson@users.noreply.github.com>
SPDX-FileContributor: Drew Gates <aranaur@users.noreply.github.com>
SPDX-FileContributor: Frederik Dudzik <4004blog@gmail.com>
SPDX-FileContributor: Ilya Vassilevsky <vassilevsky@gmail.com>
SPDX-FileContributor: Jeff Epler <jepler@gmail.com>
SPDX-FileContributor: Juanito Fatas <katehuang0320@gmail.com>
SPDX-FileContributor: Kerri Miller <kerrizor@kerrizor.com>
SPDX-FileContributor: Kurtis Rainbolt-Greene <me@kurtisrainboltgreene.name>
SPDX-FileContributor: Oliver Galvin <odg@riseup.net>
SPDX-FileContributor: Strand McCutchen <smccutchen@instructure.com>
SPDX-FileContributor: Strand McCutchen <strand@bettermistak.es>
SPDX-FileContributor: Vítor Galvão <info@vitorgalvao.com>
SPDX-License-Identifier: CC-BY-4.0
-->
# unpythonic.net Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, caste, color, religion, or sexual identity
and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the
overall community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or
advances of any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email
address, without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official e-mail address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at
*jepler@gmail.com*.
All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series
of actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or
permanent ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within
the community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.1, available at
[https://www.contributor-covenant.org/version/2/1/code_of_conduct.html][v2.1].
Community Impact Guidelines were inspired by
[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
For answers to common questions about this code of conduct, see the FAQ at
[https://www.contributor-covenant.org/faq][FAQ]. Translations are available
at [https://www.contributor-covenant.org/translations][translations].
[homepage]: https://www.contributor-covenant.org
[v2.1]: https://www.contributor-covenant.org/version/2/1/code_of_conduct.html
[Mozilla CoC]: https://github.com/mozilla/diversity
[FAQ]: https://www.contributor-covenant.org/faq
[translations]: https://www.contributor-covenant.org/translations

View file

@ -1,41 +0,0 @@
<!--
SPDX-FileCopyrightText: 2023 Jeff Epler <jepler@gmail.com>
SPDX-License-Identifier: MIT
-->
# Contributing
Please feel free to submit pull requests or open issues to improve this project.
There's no particular roadmap for chap, it was created to scratch the original developer's itch but has been useful to others as well.
If you're looking for a way to contribute, check out the open issues.
If you want to discuss a possible enhancement before beginning work, opening a fresh issue is a good way to start a dialog about your idea.
## Code style & linting with pre-commit
This project uses [pre-commit](https://pre-commit.com/) to maintain the code's style and perform some quality checks.
First, install pre-commit: `pip install pre-commit`.
Then, enable pre-commit via git "hooks" so that (most of the time) pre-commit checks are performed at the time a git commit is made: `pre-commit install`.
If necessary, you can run pre-commit checks manually for one or more files: `pre-commit run --all` or `pre-commit run --files src/chap/main.py`.
Some tools (e.g., black) will automatically update the working files with whatever changes they recommend.
Other tools (e.g., pylint) will just tell you what is wrong and require your intervention to fix it.
It is acceptable to use hints like `# pylint: ignore=diagnostic-kind` when it's preferable to actually resolving the reported problem
(e.g., if the report is spurious or a non-standard construct is used for well-considered reasons)
When you create a pull request, `pre-commit run --all` is run in a standardized environment, which occasionally catches things that were not seen locally.
That green checkmark in github actions is the final arbiter of whether the code is pre-commit clean.
## Type checking with mypy
This project uses [mypy](https://www.mypy-lang.org/) for type-checking.
If your system has `make`, you can type-check the project with: `make`.
Otherwise, you need to perform several setup steps:
* create a virtual environment at "venv": `python -mvenv venv`
* install dependencies in the virtual environment: `venv/bin/pip install -r requirements.txt 'mypy!=1.7.0'`
Then, type check with: `venv/bin/mypy`
When you create a pull request, mypy is run in a standardized environment, which occasionally catches things that were not seen locally.
That green checkmark in github actions is the final arbiter of whether the code is mypy clean.

View file

@ -1,156 +0,0 @@
Creative Commons Attribution 4.0 International
Creative Commons Corporation (“Creative Commons”) is not a law firm and does not provide legal services or legal advice. Distribution of Creative Commons public licenses does not create a lawyer-client or other relationship. Creative Commons makes its licenses and related information available on an “as-is” basis. Creative Commons gives no warranties regarding its licenses, any material licensed under their terms and conditions, or any related information. Creative Commons disclaims all liability for damages resulting from their use to the fullest extent possible.
Using Creative Commons Public Licenses
Creative Commons public licenses provide a standard set of terms and conditions that creators and other rights holders may use to share original works of authorship and other material subject to copyright and certain other rights specified in the public license below. The following considerations are for informational purposes only, are not exhaustive, and do not form part of our licenses.
Considerations for licensors: Our public licenses are intended for use by those authorized to give the public permission to use material in ways otherwise restricted by copyright and certain other rights. Our licenses are irrevocable. Licensors should read and understand the terms and conditions of the license they choose before applying it. Licensors should also secure all rights necessary before applying our licenses so that the public can reuse the material as expected. Licensors should clearly mark any material not subject to the license. This includes other CC-licensed material, or material used under an exception or limitation to copyright. More considerations for licensors.
Considerations for the public: By using one of our public licenses, a licensor grants the public permission to use the licensed material under specified terms and conditions. If the licensors permission is not necessary for any reasonfor example, because of any applicable exception or limitation to copyrightthen that use is not regulated by the license. Our licenses grant only permissions under copyright and certain other rights that a licensor has authority to grant. Use of the licensed material may still be restricted for other reasons, including because others have copyright or other rights in the material. A licensor may make special requests, such as asking that all changes be marked or described. Although not required by our licenses, you are encouraged to respect those requests where reasonable. More considerations for the public.
Creative Commons Attribution 4.0 International Public License
By exercising the Licensed Rights (defined below), You accept and agree to be bound by the terms and conditions of this Creative Commons Attribution 4.0 International Public License ("Public License"). To the extent this Public License may be interpreted as a contract, You are granted the Licensed Rights in consideration of Your acceptance of these terms and conditions, and the Licensor grants You such rights in consideration of benefits the Licensor receives from making the Licensed Material available under these terms and conditions.
Section 1 Definitions.
a. Adapted Material means material subject to Copyright and Similar Rights that is derived from or based upon the Licensed Material and in which the Licensed Material is translated, altered, arranged, transformed, or otherwise modified in a manner requiring permission under the Copyright and Similar Rights held by the Licensor. For purposes of this Public License, where the Licensed Material is a musical work, performance, or sound recording, Adapted Material is always produced where the Licensed Material is synched in timed relation with a moving image.
b. Adapter's License means the license You apply to Your Copyright and Similar Rights in Your contributions to Adapted Material in accordance with the terms and conditions of this Public License.
c. Copyright and Similar Rights means copyright and/or similar rights closely related to copyright including, without limitation, performance, broadcast, sound recording, and Sui Generis Database Rights, without regard to how the rights are labeled or categorized. For purposes of this Public License, the rights specified in Section 2(b)(1)-(2) are not Copyright and Similar Rights.
d. Effective Technological Measures means those measures that, in the absence of proper authority, may not be circumvented under laws fulfilling obligations under Article 11 of the WIPO Copyright Treaty adopted on December 20, 1996, and/or similar international agreements.
e. Exceptions and Limitations means fair use, fair dealing, and/or any other exception or limitation to Copyright and Similar Rights that applies to Your use of the Licensed Material.
f. Licensed Material means the artistic or literary work, database, or other material to which the Licensor applied this Public License.
g. Licensed Rights means the rights granted to You subject to the terms and conditions of this Public License, which are limited to all Copyright and Similar Rights that apply to Your use of the Licensed Material and that the Licensor has authority to license.
h. Licensor means the individual(s) or entity(ies) granting rights under this Public License.
i. Share means to provide material to the public by any means or process that requires permission under the Licensed Rights, such as reproduction, public display, public performance, distribution, dissemination, communication, or importation, and to make material available to the public including in ways that members of the public may access the material from a place and at a time individually chosen by them.
j. Sui Generis Database Rights means rights other than copyright resulting from Directive 96/9/EC of the European Parliament and of the Council of 11 March 1996 on the legal protection of databases, as amended and/or succeeded, as well as other essentially equivalent rights anywhere in the world.
k. You means the individual or entity exercising the Licensed Rights under this Public License. Your has a corresponding meaning.
Section 2 Scope.
a. License grant.
1. Subject to the terms and conditions of this Public License, the Licensor hereby grants You a worldwide, royalty-free, non-sublicensable, non-exclusive, irrevocable license to exercise the Licensed Rights in the Licensed Material to:
A. reproduce and Share the Licensed Material, in whole or in part; and
B. produce, reproduce, and Share Adapted Material.
2. Exceptions and Limitations. For the avoidance of doubt, where Exceptions and Limitations apply to Your use, this Public License does not apply, and You do not need to comply with its terms and conditions.
3. Term. The term of this Public License is specified in Section 6(a).
4. Media and formats; technical modifications allowed. The Licensor authorizes You to exercise the Licensed Rights in all media and formats whether now known or hereafter created, and to make technical modifications necessary to do so. The Licensor waives and/or agrees not to assert any right or authority to forbid You from making technical modifications necessary to exercise the Licensed Rights, including technical modifications necessary to circumvent Effective Technological Measures. For purposes of this Public License, simply making modifications authorized by this Section 2(a)(4) never produces Adapted Material.
5. Downstream recipients.
A. Offer from the Licensor Licensed Material. Every recipient of the Licensed Material automatically receives an offer from the Licensor to exercise the Licensed Rights under the terms and conditions of this Public License.
B. No downstream restrictions. You may not offer or impose any additional or different terms or conditions on, or apply any Effective Technological Measures to, the Licensed Material if doing so restricts exercise of the Licensed Rights by any recipient of the Licensed Material.
6. No endorsement. Nothing in this Public License constitutes or may be construed as permission to assert or imply that You are, or that Your use of the Licensed Material is, connected with, or sponsored, endorsed, or granted official status by, the Licensor or others designated to receive attribution as provided in Section 3(a)(1)(A)(i).
b. Other rights.
1. Moral rights, such as the right of integrity, are not licensed under this Public License, nor are publicity, privacy, and/or other similar personality rights; however, to the extent possible, the Licensor waives and/or agrees not to assert any such rights held by the Licensor to the limited extent necessary to allow You to exercise the Licensed Rights, but not otherwise.
2. Patent and trademark rights are not licensed under this Public License.
3. To the extent possible, the Licensor waives any right to collect royalties from You for the exercise of the Licensed Rights, whether directly or through a collecting society under any voluntary or waivable statutory or compulsory licensing scheme. In all other cases the Licensor expressly reserves any right to collect such royalties.
Section 3 License Conditions.
Your exercise of the Licensed Rights is expressly made subject to the following conditions.
a. Attribution.
1. If You Share the Licensed Material (including in modified form), You must:
A. retain the following if it is supplied by the Licensor with the Licensed Material:
i. identification of the creator(s) of the Licensed Material and any others designated to receive attribution, in any reasonable manner requested by the Licensor (including by pseudonym if designated);
ii. a copyright notice;
iii. a notice that refers to this Public License;
iv. a notice that refers to the disclaimer of warranties;
v. a URI or hyperlink to the Licensed Material to the extent reasonably practicable;
B. indicate if You modified the Licensed Material and retain an indication of any previous modifications; and
C. indicate the Licensed Material is licensed under this Public License, and include the text of, or the URI or hyperlink to, this Public License.
2. You may satisfy the conditions in Section 3(a)(1) in any reasonable manner based on the medium, means, and context in which You Share the Licensed Material. For example, it may be reasonable to satisfy the conditions by providing a URI or hyperlink to a resource that includes the required information.
3. If requested by the Licensor, You must remove any of the information required by Section 3(a)(1)(A) to the extent reasonably practicable.
4. If You Share Adapted Material You produce, the Adapter's License You apply must not prevent recipients of the Adapted Material from complying with this Public License.
Section 4 Sui Generis Database Rights.
Where the Licensed Rights include Sui Generis Database Rights that apply to Your use of the Licensed Material:
a. for the avoidance of doubt, Section 2(a)(1) grants You the right to extract, reuse, reproduce, and Share all or a substantial portion of the contents of the database;
b. if You include all or a substantial portion of the database contents in a database in which You have Sui Generis Database Rights, then the database in which You have Sui Generis Database Rights (but not its individual contents) is Adapted Material; and
c. You must comply with the conditions in Section 3(a) if You Share all or a substantial portion of the contents of the database.
For the avoidance of doubt, this Section 4 supplements and does not replace Your obligations under this Public License where the Licensed Rights include other Copyright and Similar Rights.
Section 5 Disclaimer of Warranties and Limitation of Liability.
a. Unless otherwise separately undertaken by the Licensor, to the extent possible, the Licensor offers the Licensed Material as-is and as-available, and makes no representations or warranties of any kind concerning the Licensed Material, whether express, implied, statutory, or other. This includes, without limitation, warranties of title, merchantability, fitness for a particular purpose, non-infringement, absence of latent or other defects, accuracy, or the presence or absence of errors, whether or not known or discoverable. Where disclaimers of warranties are not allowed in full or in part, this disclaimer may not apply to You.
b. To the extent possible, in no event will the Licensor be liable to You on any legal theory (including, without limitation, negligence) or otherwise for any direct, special, indirect, incidental, consequential, punitive, exemplary, or other losses, costs, expenses, or damages arising out of this Public License or use of the Licensed Material, even if the Licensor has been advised of the possibility of such losses, costs, expenses, or damages. Where a limitation of liability is not allowed in full or in part, this limitation may not apply to You.
c. The disclaimer of warranties and limitation of liability provided above shall be interpreted in a manner that, to the extent possible, most closely approximates an absolute disclaimer and waiver of all liability.
Section 6 Term and Termination.
a. This Public License applies for the term of the Copyright and Similar Rights licensed here. However, if You fail to comply with this Public License, then Your rights under this Public License terminate automatically.
b. Where Your right to use the Licensed Material has terminated under Section 6(a), it reinstates:
1. automatically as of the date the violation is cured, provided it is cured within 30 days of Your discovery of the violation; or
2. upon express reinstatement by the Licensor.
c. For the avoidance of doubt, this Section 6(b) does not affect any right the Licensor may have to seek remedies for Your violations of this Public License.
d. For the avoidance of doubt, the Licensor may also offer the Licensed Material under separate terms or conditions or stop distributing the Licensed Material at any time; however, doing so will not terminate this Public License.
e. Sections 1, 5, 6, 7, and 8 survive termination of this Public License.
Section 7 Other Terms and Conditions.
a. The Licensor shall not be bound by any additional or different terms or conditions communicated by You unless expressly agreed.
b. Any arrangements, understandings, or agreements regarding the Licensed Material not stated herein are separate from and independent of the terms and conditions of this Public License.
Section 8 Interpretation.
a. For the avoidance of doubt, this Public License does not, and shall not be interpreted to, reduce, limit, restrict, or impose conditions on any use of the Licensed Material that could lawfully be made without permission under this Public License.
b. To the extent possible, if any provision of this Public License is deemed unenforceable, it shall be automatically reformed to the minimum extent necessary to make it enforceable. If the provision cannot be reformed, it shall be severed from this Public License without affecting the enforceability of the remaining terms and conditions.
c. No term or condition of this Public License will be waived and no failure to comply consented to unless expressly agreed to by the Licensor.
d. Nothing in this Public License constitutes or may be interpreted as a limitation upon, or waiver of, any privileges and immunities that apply to the Licensor or You, including from the legal processes of any jurisdiction or authority.
Creative Commons is not a party to its public licenses. Notwithstanding, Creative Commons may elect to apply one of its public licenses to material it publishes and in those instances will be considered the “Licensor.” Except for the limited purpose of indicating that material is shared under a Creative Commons public license or as otherwise permitted by the Creative Commons policies published at creativecommons.org/policies, Creative Commons does not authorize the use of the trademark “Creative Commons” or any other trademark or logo of Creative Commons without its prior written consent including, without limitation, in connection with any unauthorized modifications to any of its public licenses or any other arrangements, understandings, or agreements concerning use of licensed material. For the avoidance of doubt, this paragraph does not form part of the public licenses.
Creative Commons may be contacted at creativecommons.org.

121
LICENSES/CC0-1.0.txt Normal file
View file

@ -0,0 +1,121 @@
Creative Commons Legal Code
CC0 1.0 Universal
CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
HEREUNDER.
Statement of Purpose
The laws of most jurisdictions throughout the world automatically confer
exclusive Copyright and Related Rights (defined below) upon the creator
and subsequent owner(s) (each and all, an "owner") of an original work of
authorship and/or a database (each, a "Work").
Certain owners wish to permanently relinquish those rights to a Work for
the purpose of contributing to a commons of creative, cultural and
scientific works ("Commons") that the public can reliably and without fear
of later claims of infringement build upon, modify, incorporate in other
works, reuse and redistribute as freely as possible in any form whatsoever
and for any purposes, including without limitation commercial purposes.
These owners may contribute to the Commons to promote the ideal of a free
culture and the further production of creative, cultural and scientific
works, or to gain reputation or greater distribution for their Work in
part through the use and efforts of others.
For these and/or other purposes and motivations, and without any
expectation of additional consideration or compensation, the person
associating CC0 with a Work (the "Affirmer"), to the extent that he or she
is an owner of Copyright and Related Rights in the Work, voluntarily
elects to apply CC0 to the Work and publicly distribute the Work under its
terms, with knowledge of his or her Copyright and Related Rights in the
Work and the meaning and intended legal effect of CC0 on those rights.
1. Copyright and Related Rights. A Work made available under CC0 may be
protected by copyright and related or neighboring rights ("Copyright and
Related Rights"). Copyright and Related Rights include, but are not
limited to, the following:
i. the right to reproduce, adapt, distribute, perform, display,
communicate, and translate a Work;
ii. moral rights retained by the original author(s) and/or performer(s);
iii. publicity and privacy rights pertaining to a person's image or
likeness depicted in a Work;
iv. rights protecting against unfair competition in regards to a Work,
subject to the limitations in paragraph 4(a), below;
v. rights protecting the extraction, dissemination, use and reuse of data
in a Work;
vi. database rights (such as those arising under Directive 96/9/EC of the
European Parliament and of the Council of 11 March 1996 on the legal
protection of databases, and under any national implementation
thereof, including any amended or successor version of such
directive); and
vii. other similar, equivalent or corresponding rights throughout the
world based on applicable law or treaty, and any national
implementations thereof.
2. Waiver. To the greatest extent permitted by, but not in contravention
of, applicable law, Affirmer hereby overtly, fully, permanently,
irrevocably and unconditionally waives, abandons, and surrenders all of
Affirmer's Copyright and Related Rights and associated claims and causes
of action, whether now known or unknown (including existing as well as
future claims and causes of action), in the Work (i) in all territories
worldwide, (ii) for the maximum duration provided by applicable law or
treaty (including future time extensions), (iii) in any current or future
medium and for any number of copies, and (iv) for any purpose whatsoever,
including without limitation commercial, advertising or promotional
purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
member of the public at large and to the detriment of Affirmer's heirs and
successors, fully intending that such Waiver shall not be subject to
revocation, rescission, cancellation, termination, or any other legal or
equitable action to disrupt the quiet enjoyment of the Work by the public
as contemplated by Affirmer's express Statement of Purpose.
3. Public License Fallback. Should any part of the Waiver for any reason
be judged legally invalid or ineffective under applicable law, then the
Waiver shall be preserved to the maximum extent permitted taking into
account Affirmer's express Statement of Purpose. In addition, to the
extent the Waiver is so judged Affirmer hereby grants to each affected
person a royalty-free, non transferable, non sublicensable, non exclusive,
irrevocable and unconditional license to exercise Affirmer's Copyright and
Related Rights in the Work (i) in all territories worldwide, (ii) for the
maximum duration provided by applicable law or treaty (including future
time extensions), (iii) in any current or future medium and for any number
of copies, and (iv) for any purpose whatsoever, including without
limitation commercial, advertising or promotional purposes (the
"License"). The License shall be deemed effective as of the date CC0 was
applied by Affirmer to the Work. Should any part of the License for any
reason be judged legally invalid or ineffective under applicable law, such
partial invalidity or ineffectiveness shall not invalidate the remainder
of the License, and in such case Affirmer hereby affirms that he or she
will not (i) exercise any of his or her remaining Copyright and Related
Rights in the Work or (ii) assert any associated claims and causes of
action with respect to the Work, in either case contrary to Affirmer's
express Statement of Purpose.
4. Limitations and Disclaimers.
a. No trademark or patent rights held by Affirmer are waived, abandoned,
surrendered, licensed or otherwise affected by this document.
b. Affirmer offers the Work as-is and makes no representations or
warranties of any kind concerning the Work, express, implied,
statutory or otherwise, including without limitation warranties of
title, merchantability, fitness for a particular purpose, non
infringement, or the absence of latent or other defects, accuracy, or
the present or absence of errors, whether or not discoverable, all to
the greatest extent permissible under applicable law.
c. Affirmer disclaims responsibility for clearing rights of other persons
that may apply to the Work or any use thereof, including without
limitation any person's Copyright and Related Rights in the Work.
Further, Affirmer disclaims responsibility for obtaining any necessary
consents, permissions or other rights required for any use of the
Work.
d. Affirmer understands and acknowledges that Creative Commons is not a
party to this document and has no duty or obligation with respect to
this CC0 or use of the Work.

View file

@ -1,16 +0,0 @@
# SPDX-FileCopyrightText: 2023 Jeff Epler <jepler@gmail.com>
#
# SPDX-License-Identifier: MIT
.PHONY: mypy
mypy: venv/bin/mypy
venv/bin/mypy
# Update CONTRIBUTING.md if these commands change
venv/bin/mypy:
python -mvenv venv
venv/bin/pip install -r requirements.txt 'mypy!=1.7.0'
.PHONY: clean
clean:
rm -rf venv

164
README.md
View file

@ -7,132 +7,36 @@ SPDX-License-Identifier: MIT
[![Release chap](https://github.com/jepler/chap/actions/workflows/release.yml/badge.svg?event=release)](https://github.com/jepler/chap/actions/workflows/release.yml)
[![PyPI](https://img.shields.io/pypi/v/chap)](https://pypi.org/project/chap/)
# chap - A Python interface to chatgpt and other LLMs, including a terminal user interface (tui)
# chap - A Python interface to chatgpt, including a terminal user interface (tui)
![Chap screencast](https://raw.githubusercontent.com/jepler/chap/main/chap.gif)
![Chap screencast](https://github.com/jepler/chap/blob/main/chap.gif)
## System requirements
Chap is primarily developed on Linux with Python 3.11. Moderate effort will be made to support versions back to Python 3.9 (Debian oldstable).
Chap is developed on Linux with Python 3.11. Due to use of the `list[int]` style of type hints, it is known not to work on 3.8 and older; the target minimum Python version is 3.9 (debian oldstable).
## Installation
## installation
If you want `chap` available as a command, just install with `pipx install chap` or `pip install chap`.
Install with e.g., `pipx install chap`
Use a virtual environment unless you want it installed globally.
## Installation for development
Use one of the following two methods to run `chap` as a command, with the ability to edit the source files. You are welcome to submit valuable changes as [a pull request](https://github.com/jepler/chap/pulls).
### Via `pip install --editable .`
This is an "editable install", as [recommended by the Python Packaging Authority](https://setuptools.pypa.io/en/latest/userguide/development_mode.html).
Change directory to the root of the `chap` project.
Activate your virtual environment, then install `chap` in development mode:
```shell
pip install --editable .
```
In this mode, you get the `chap` command-line program installed, but you are able to edit the source files in the `src` directory in place.
### Via `chap-dev.py`
A simple shim script called `chap-dev.py` is included to demonstrate how to load and run the `chap` library without installing `chap` in development mode. This method may be more familiar to some developers.
Change directory to the root of the `chap` project.
Activate your virtual environment, then install requirements:
```shell
pip install -r requirements.txt
```
Run the shim script (with optional command flags as appropriate):
```shell
./chap-dev.py
```
In this mode, you can edit the source files in the `src` directory in place, and the shim script will pick up the changes via the `import` directive.
## Contributing
See [CONTRIBUTING.md](CONTRIBUTING.md).
## Code of Conduct
See [CODE\_OF\_CONDUCT.md](CODE_OF_CONDUCT.md).
## Configuration
## configuration
Put your OpenAI API key in the platform configuration directory for chap, e.g., on linux/unix systems at `~/.config/chap/openai_api_key`
## Command-line usage
## commandline usage
* `chap ask "What advice would you give a 20th century human visiting the 21st century for the first time?"`
* `chap render --last` / `chap cat --last`
* `chap render --last`
* `chap import chatgpt-style-chatlog.json` (for files from pionxzh/chatgpt-exporter)
* `chap grep needle`
## interactive terminal usage
* chap tui
## `@FILE` arguments
## Sessions & Commandline Parameters
It's useful to set a bunch of related arguments together, for instance to fully
configure a back-end. This functionality is implemented via `@FILE` arguments.
Before any other command-line argument parsing is performed, `@FILE` arguments are expanded:
* An `@FILE` argument is searched relative to the current directory
* An `@:FILE` argument is searched relative to the configuration directory (e.g., $HOME/.config/chap/presets)
* If an argument starts with a literal `@`, double it: `@@`
* `@.` stops processing any further `@FILE` arguments and leaves them unchanged.
The contents of an `@FILE` are parsed according to `shlex.split(comments=True)`.
Comments are supported.
A typical content might look like this:
```
# cfg/gpt-4o: Use more expensive gpt 4o and custom prompt
--backend openai-chatgpt
-B model:gpt-4o
-s :my-custom-system-message.txt
```
and you might use it with
```
chap @:cfg/gpt-4o ask what version of gpt is this
```
## Interactive terminal usage
The interactive terminal mode is accessed via `chap tui`.
There are a variety of keyboard shortcuts to be aware of:
* tab/shift-tab to move between the entry field and the conversation, or between conversation items
* While in the text box, F9 or (if supported by your terminal) alt+enter to submit multiline text
* while on a conversation item:
* ctrl+x to re-draft the message. This
* saves a copy of the session in an auto-named file in the conversations folder
* removes the conversation from this message to the end
* puts the user's message in the text box to edit
* ctrl+x to re-submit the message. This
* saves a copy of the session in an auto-named file in the conversations folder
* removes the conversation from this message to the end
* puts the user's message in the text box
* and submits it immediately
* ctrl+y to yank the message. This places the response part of the current
interaction in the operating system clipboard to be pasted (e..g, with
ctrl+v or command+v in other software)
* ctrl+q to toggle whether this message may be included in the contextual history for a future query.
The exact way history is submitted is determined by the back-end, often by
counting messages or tokens, but the ctrl+q toggle ensures this message (both the user
and assistant message parts) are not considered.
## Sessions & Command-line Parameters
Details of session handling & command-line arguments are in flux.
Details of session handling & commandline arguments are in flux.
By default, a new session is created. It is saved to the user's state directory
(e.g., `~/.local/state/chap` on linux/unix systems).
@ -144,47 +48,19 @@ an existing session with `-s`. Or, you can continue the last session with
You can set the "system message" with the `-S` flag.
You can select the text generating backend with the `-b` flag:
* openai-chatgpt: the default, paid API, best quality results. Also works with compatible API implementations including llama-cpp when the correct backend URL is specified.
* llama-cpp: Works with [llama.cpp's http server](https://github.com/ggerganov/llama.cpp/blob/master/examples/server/README.md) and can run locally with various models,
though it is [optimized for models that use the llama2-style prompting](https://huggingface.co/blog/llama2#how-to-prompt-llama-2). Consider using llama.cpp's OpenAI compatible API with the openai-chatgpt backend instead, in which case the server can apply the chat template.
* textgen: Works with https://github.com/oobabooga/text-generation-webui and can run locally with various models.
Needs the server URL in *$configuration_directory/textgen\_url*.
* mistral: Works with the [mistral paid API](https://docs.mistral.ai/).
* anthropic: Works with the [anthropic paid API](https://docs.anthropic.com/en/home).
* huggingface: Works with the [huggingface API](https://huggingface.co/docs/api-inference/index), which includes a free tier.
* openai\_chatgpt: the default, paid API, best quality results
* llama_cpp: Works with (llama.cpp's http server)[https://github.com/ggerganov/llama.cpp/blob/master/examples/server/README.md] and can run locally with various models. Set the server URL with `-B url:...`.
* textgen: Works with https://github.com/oobabooga/text-generation-webui and can run locally with various models. Needs the server URL in *$configuration_directory/textgen\_url*.
* lorem: local non-AI lorem generator for testing
Backends have settings such as URLs and where API keys are stored. use `chap --backend
<BACKEND> --help` to list settings for a particular backend.
## Environment variables
The backend can be set with the `CHAP_BACKEND` environment variable.
The backend can be set with `CHAP_BACKEND`.
Backend settings can be set with `CHAP_<backend_name>_<parameter_name>`, with `backend_name` and `parameter_name` all in caps.
For instance, `CHAP_LLAMA_CPP_URL=http://server.local:8080/completion` changes the default server URL for the llama-cpp backend.
For instance, `CHAP_LLAMA_CPP_URL=http://server.local:8080/completion` changes the default server URL for the llama_cpp back-end.
## Importing from ChatGPT
The userscript https://github.com/pionxzh/chatgpt-exporter can export chat logs from chat.openai.com in a JSON format.
This format is different than chap's, especially since `chap` currently only represents a single branch of conversation in one log.
You can use the `chap import` command to import all the branches of a chatgpt-style chatlog in JSON format into a series of `chap`-style chat logs.
## Plug-ins
Chap supports back-end and command plug-ins.
"Back-ends" add additional text generators.
"Commands" add new ways to interact with text generators, session data, and so forth.
Install a plugin with `pip install` or `pipx inject` (depending how you installed chap) and then use it as normal.
[chap-backend-replay](https://pypi.org/project/chap-backend-replay/) is an example back-end plug-in. It replays answers from a previous session.
[chap-command-explain](https://pypi.org/project/chap-command-explain/) is an example command plug-in. It is similar to `chap ask`.
At this time, there is no stability guarantee for the API of commands or backends.
The userscript https://github.com/pionxzh/chatgpt-exporter can export chat logs from chat.openai.com in a json format.
This format is different than chap's, especially since chap currently only represents a single branch of conversation in one log.
You can use the `chap import` command to import all the branches of a chatgpt-style chatlog in json format into a series of chap-style chat logs.

View file

@ -1,21 +0,0 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2023 Jeff Epler <jepler@gmail.com>
#
# SPDX-License-Identifier: MIT
# chap-dev.py - Simple shim script to demonstrate how to use the chap library.
import pathlib
import sys
# Ensure the 'src' directory is on the system path so we can import 'chap'
project_root = pathlib.Path(__file__).parent
src_path = project_root / "src"
sys.path.insert(0, str(src_path))
if __name__ == "__main__":
from chap.core import main
main()
else:
raise ImportError("'chap-dev.py' is meant for direct execution, not for import.")

View file

@ -4,7 +4,7 @@
[build-system]
requires = [
"setuptools>=68.2.2",
"setuptools>=61",
"setuptools_scm[toml]>=6.0",
]
build-backend = "setuptools.build_meta"
@ -19,20 +19,25 @@ where = ["src"]
name="chap"
authors = [{name = "Jeff Epler", email = "jepler@gmail.com"}]
description = "Interact with the OpenAI ChatGPT API (and other text generators)"
dynamic = ["readme","version","dependencies"]
requires-python = ">=3.9"
keywords = ["llm", "tui", "chatgpt"]
dynamic = ["readme","version"]
dependencies = [
"click",
"dataclasses_json",
"httpx",
"lorem-text",
"platformdirs",
"simple_parsing",
"textual>=0.18.0",
"tiktoken",
"websockets",
]
classifiers = [
"Development Status :: 3 - Alpha",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
"Programming Language :: Python :: Implementation :: CPython",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
[project.urls]
homepage = "https://github.com/jepler/chap"
@ -45,12 +50,3 @@ chap = "chap.__main__:main"
write_to = "src/chap/__version__.py"
[tool.setuptools.dynamic]
readme = {file = ["README.md"], content-type="text/markdown"}
dependencies = {file = "requirements.txt"}
[tool.setuptools.package-data]
"pkgname" = ["py.typed"]
[tool.mypy]
mypy_path = ["src"]
warn_unused_ignores = false
warn_redundant_casts = true
strict = true
packages = ["chap"]

View file

@ -3,6 +3,6 @@
# SPDX-License-Identifier: MIT
build
setuptools>=68.2.2
setuptools>=45
twine
wheel

View file

@ -1,13 +0,0 @@
# SPDX-FileCopyrightText: 2023 Jeff Epler
#
# SPDX-License-Identifier: Unlicense
click
httpx
lorem-text
platformdirs
pyperclip
simple_parsing
textual[syntax] >= 4
tiktoken
websockets

View file

@ -1,95 +0,0 @@
# SPDX-FileCopyrightText: 2024 Jeff Epler <jepler@gmail.com>
#
# SPDX-License-Identifier: MIT
import json
from dataclasses import dataclass
from typing import AsyncGenerator, Any
import httpx
from ..core import AutoAskMixin, Backend
from ..key import UsesKeyMixin
from ..session import Assistant, Role, Session, User
class Anthropic(AutoAskMixin, UsesKeyMixin):
@dataclass
class Parameters:
url: str = "https://api.anthropic.com"
model: str = "claude-3-5-sonnet-20240620"
max_new_tokens: int = 1000
api_key_name = "anthropic_api_key"
def __init__(self) -> None:
super().__init__()
self.parameters = self.Parameters()
system_message = """\
Answer each question accurately and thoroughly.
"""
def make_full_query(self, messages: Session, max_query_size: int) -> dict[str, Any]:
system = [m.content for m in messages if m.role == Role.SYSTEM]
messages = [m for m in messages if m.role != Role.SYSTEM and m.content]
del messages[:-max_query_size]
result = dict(
model=self.parameters.model,
max_tokens=self.parameters.max_new_tokens,
messages=[dict(role=str(m.role), content=m.content) for m in messages],
stream=True,
)
if system and system[0]:
result["system"] = system[0]
return result
async def aask(
self,
session: Session,
query: str,
*,
max_query_size: int = 5,
timeout: float = 180,
) -> AsyncGenerator[str, None]:
new_content: list[str] = []
params = self.make_full_query(session + [User(query)], max_query_size)
try:
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream(
"POST",
f"{self.parameters.url}/v1/messages",
json=params,
headers={
"x-api-key": self.get_key(),
"content-type": "application/json",
"anthropic-version": "2023-06-01",
"anthropic-beta": "messages-2023-12-15",
},
) as response:
if response.status_code == 200:
async for line in response.aiter_lines():
if line.startswith("data:"):
data = line.removeprefix("data:").strip()
j = json.loads(data)
content = j.get("delta", {}).get("text", "")
if content:
new_content.append(content)
yield content
else:
content = f"\nFailed with {response=!r}"
new_content.append(content)
yield content
async for line in response.aiter_lines():
new_content.append(line)
yield line
except httpx.HTTPError as e:
content = f"\nException: {e!r}"
new_content.append(content)
yield content
session.extend([User(query), Assistant("".join(new_content))])
def factory() -> Backend:
"""Uses the anthropic text-generation-interface web API"""
return Anthropic()

View file

@ -1,117 +0,0 @@
# SPDX-FileCopyrightText: 2023 Jeff Epler <jepler@gmail.com>
#
# SPDX-License-Identifier: MIT
import json
from dataclasses import dataclass
from typing import Any, AsyncGenerator
import httpx
from ..core import AutoAskMixin, Backend
from ..key import UsesKeyMixin
from ..session import Assistant, Role, Session, User
class HuggingFace(AutoAskMixin, UsesKeyMixin):
@dataclass
class Parameters:
url: str = "https://api-inference.huggingface.co"
model: str = "mistralai/Mistral-7B-Instruct-v0.1"
max_new_tokens: int = 250
start_prompt: str = """<s>[INST] <<SYS>>\n"""
after_system: str = "\n<</SYS>>\n\n"
after_user: str = """ [/INST] """
after_assistant: str = """ </s><s>[INST] """
stop_token_id = 2
api_key_name = "huggingface_api_token"
def __init__(self) -> None:
super().__init__()
self.parameters = self.Parameters()
system_message = """\
A dialog, where USER interacts with AI. AI is helpful, kind, obedient, honest, and knows its own limits.
"""
def make_full_query(self, messages: Session, max_query_size: int) -> str:
del messages[1:-max_query_size]
result = [self.parameters.start_prompt]
for m in messages:
content = (m.content or "").strip()
if not content:
continue
result.append(content)
if m.role == Role.SYSTEM:
result.append(self.parameters.after_system)
elif m.role == Role.ASSISTANT:
result.append(self.parameters.after_assistant)
elif m.role == Role.USER:
result.append(self.parameters.after_user)
full_query = "".join(result)
return full_query
async def chained_query(
self, inputs: Any, timeout: float
) -> AsyncGenerator[str, None]:
async with httpx.AsyncClient(timeout=timeout) as client:
while inputs:
params = {
"inputs": inputs,
"stream": True,
}
inputs = None
async with client.stream(
"POST",
f"{self.parameters.url}/models/{self.parameters.model}",
json=params,
headers={
"Authorization": f"Bearer {self.get_key()}",
},
) as response:
if response.status_code == 200:
async for line in response.aiter_lines():
if line.startswith("data:"):
data = line.removeprefix("data:").strip()
j = json.loads(data)
token = j.get("token", {})
inputs = j.get("generated_text", inputs)
if token.get("id") == self.parameters.stop_token_id:
return
yield token.get("text", "")
else:
yield f"\nFailed with {response=!r}"
return
async def aask(
self,
session: Session,
query: str,
*,
max_query_size: int = 5,
timeout: float = 180,
) -> AsyncGenerator[str, None]:
new_content: list[str] = []
inputs = self.make_full_query(session + [User(query)], max_query_size)
try:
async for content in self.chained_query(inputs, timeout=timeout):
if not new_content:
content = content.lstrip()
if content:
if not new_content:
content = content.lstrip()
if content:
new_content.append(content)
yield content
except httpx.HTTPError as e:
content = f"\nException: {e!r}"
new_content.append(content)
yield content
session.extend([User(query), Assistant("".join(new_content))])
def factory() -> Backend:
"""Uses the huggingface text-generation-interface web API"""
return HuggingFace()

View file

@ -2,71 +2,55 @@
#
# SPDX-License-Identifier: MIT
import asyncio
import json
from dataclasses import dataclass
from typing import AsyncGenerator
import httpx
from ..core import AutoAskMixin, Backend
from ..session import Assistant, Role, Session, User
from ..session import Assistant, User
class LlamaCpp(AutoAskMixin):
class LlamaCpp:
@dataclass
class Parameters:
url: str = "http://localhost:8080/completion"
"""The URL of a llama.cpp server's completion endpoint."""
start_prompt: str = "<|begin_of_text|>"
system_format: str = (
"<|start_header_id|>system<|end_header_id|>\n\n{}<|eot_id|>"
)
user_format: str = "<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|>"
assistant_format: str = (
"<|start_header_id|>assistant<|end_header_id|>\n\n{}<|eot_id|>"
)
end_prompt: str = "<|start_header_id|>assistant<|end_header_id|>\n\n"
stop: str | None = None
def __init__(self) -> None:
super().__init__()
def __init__(self):
self.parameters = self.Parameters()
system_message = """\
A dialog, where USER interacts with AI. AI is helpful, kind, obedient, honest, and knows its own limits.
"""
def make_full_query(self, messages: Session, max_query_size: int) -> str:
def make_full_query(self, messages, max_query_size):
del messages[1:-max_query_size]
result = [self.parameters.start_prompt]
formats = {
Role.SYSTEM: self.parameters.system_format,
Role.USER: self.parameters.user_format,
Role.ASSISTANT: self.parameters.assistant_format,
}
rows = []
for m in messages:
content = (m.content or "").strip()
if not content:
continue
result.append(formats[m.role].format(content))
full_query = "".join(result)
if m.role == "system":
rows.append(f"ASSISTANT'S RULE: {content}\n")
elif m.role == "assistant":
rows.append(f"ASSISTANT: {content}\n")
elif m.role == "user":
rows.append(f"USER: {content}")
rows.append("ASSISTANT: ")
full_query = ("\n".join(rows)).rstrip()
return full_query
async def aask(
self,
session: Session,
query: str,
*,
max_query_size: int = 5,
timeout: float = 180,
) -> AsyncGenerator[str, None]:
self, session, query, *, max_query_size=5, timeout=60
): # pylint: disable=unused-argument,too-many-locals,too-many-branches
params = {
"prompt": self.make_full_query(session + [User(query)], max_query_size),
"prompt": self.make_full_query(
session.session + [User(query)], max_query_size
),
"stream": True,
"stop": ["</s>", "<s>", "[INST]", "<|eot_id|>"],
}
new_content: list[str] = []
new_content = []
try:
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream(
@ -97,14 +81,15 @@ A dialog, where USER interacts with AI. AI is helpful, kind, obedient, honest, a
new_content.append(content)
yield content
session.extend([User(query), Assistant("".join(new_content))])
session.session.extend([User(query), Assistant("".join(new_content))])
def ask(self, session, query, *, max_query_size=5, timeout=60):
asyncio.run(
self.aask(session, query, max_query_size=max_query_size, timeout=timeout)
)
return session.session[-1].message
def factory() -> Backend:
"""Uses the llama.cpp completion web API
Note: Consider using the openai-chatgpt backend with a custom URL instead.
The llama.cpp server will automatically apply common chat templates with the
openai-chatgpt backend, while chat templates must be manually configured client side
with this backend."""
def factory():
"""Uses the llama.cpp completion web API"""
return LlamaCpp()

View file

@ -5,16 +5,13 @@
import asyncio
import random
from dataclasses import dataclass
from typing import AsyncGenerator, Iterable, cast
# lorem is not type annotated
from lorem_text import lorem # type: ignore
from lorem_text import lorem
from ..core import Backend
from ..session import Assistant, Session, User
from ..session import Assistant, User
def ipartition(s: str, sep: str = " ") -> Iterable[tuple[str, str]]:
def ipartition(s, sep=" "):
rest = s
while rest:
first, opt_sep, rest = rest.partition(sep)
@ -33,19 +30,15 @@ class Lorem:
paragraph_hi: int = 5
"""Maximum response paragraph count (inclusive)"""
def __init__(self) -> None:
def __init__(self):
self.parameters = self.Parameters()
system_message = (
"(It doesn't matter what you ask, this backend will respond with lorem)"
)
async def aask(
self,
session: Session,
query: str,
) -> AsyncGenerator[str, None]:
data = self.ask(session, query)
async def aask(self, session, query, *, max_query_size=5, timeout=60):
data = self.ask(session, query, max_query_size=max_query_size, timeout=timeout)
for word, opt_sep in ipartition(data):
yield word + opt_sep
await asyncio.sleep(
@ -53,22 +46,15 @@ class Lorem:
)
def ask(
self,
session: Session,
query: str,
) -> str:
new_content = cast(
str,
lorem.paragraphs(
random.randint(
self.parameters.paragraph_lo, self.parameters.paragraph_hi
)
),
self, session, query, *, max_query_size=5, timeout=60
): # pylint: disable=unused-argument
new_content = lorem.paragraphs(
random.randint(self.parameters.paragraph_lo, self.parameters.paragraph_hi)
).replace("\n", "\n\n")
session.extend([User(query), Assistant("".join(new_content))])
return session[-1].content
session.session.extend([User(query), Assistant("".join(new_content))])
return new_content
def factory() -> Backend:
def factory():
"""That just prints 'lorem' text. Useful for testing."""
return Lorem()

View file

@ -1,96 +0,0 @@
# SPDX-FileCopyrightText: 2024 Jeff Epler <jepler@gmail.com>
#
# SPDX-License-Identifier: MIT
import json
from dataclasses import dataclass
from typing import AsyncGenerator, Any
import httpx
from ..core import AutoAskMixin
from ..key import UsesKeyMixin
from ..session import Assistant, Session, User
class Mistral(AutoAskMixin, UsesKeyMixin):
@dataclass
class Parameters:
url: str = "https://api.mistral.ai"
model: str = "open-mistral-7b"
max_new_tokens: int = 1000
api_key_name = "mistral_api_key"
def __init__(self) -> None:
super().__init__()
self.parameters = self.Parameters()
system_message = """\
Answer each question accurately and thoroughly.
"""
def make_full_query(self, messages: Session, max_query_size: int) -> dict[str, Any]:
messages = [m for m in messages if m.content]
del messages[1:-max_query_size]
result = dict(
model=self.parameters.model,
max_tokens=self.parameters.max_new_tokens,
messages=[dict(role=str(m.role), content=m.content) for m in messages],
stream=True,
)
return result
async def aask(
self,
session: Session,
query: str,
*,
max_query_size: int = 5,
timeout: float = 180,
) -> AsyncGenerator[str, None]:
new_content: list[str] = []
params = self.make_full_query(session + [User(query)], max_query_size)
try:
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream(
"POST",
f"{self.parameters.url}/v1/chat/completions",
json=params,
headers={
"Authorization": f"Bearer {self.get_key()}",
"content-type": "application/json",
"accept": "application/json",
"model": "application/json",
},
) as response:
if response.status_code == 200:
async for line in response.aiter_lines():
if line.startswith("data:"):
data = line.removeprefix("data:").strip()
if data == "[DONE]":
break
j = json.loads(data)
content = (
j.get("choices", [{}])[0]
.get("delta", {})
.get("content", "")
)
if content:
new_content.append(content)
yield content
else:
content = f"\nFailed with {response=!r}"
new_content.append(content)
yield content
async for line in response.aiter_lines():
new_content.append(line)
yield line
except httpx.HTTPError as e:
content = f"\nException: {e!r}"
new_content.append(content)
yield content
session.extend([User(query), Assistant("".join(new_content))])
factory = Mistral

View file

@ -4,16 +4,13 @@
import functools
import json
import warnings
from dataclasses import dataclass
from typing import AsyncGenerator, cast
import httpx
import tiktoken
from ..core import Backend
from ..key import UsesKeyMixin
from ..session import Assistant, Message, Session, User, session_to_list
from ..key import get_key
from ..session import Assistant, Session, User
@dataclass(frozen=True)
@ -21,77 +18,71 @@ class EncodingMeta:
encoding: tiktoken.Encoding
tokens_per_message: int
tokens_per_name: int
tokens_overhead: int
@functools.lru_cache()
def encode(self, s: str) -> list[int]:
def encode(self, s):
return self.encoding.encode(s)
def num_tokens_for_message(self, message: Message) -> int:
def num_tokens_for_message(self, message):
# n.b. chap doesn't use message.name yet
return (
len(self.encode(message.role))
+ len(self.encode(message.content))
+ self.tokens_per_message
)
return len(self.encode(message.role)) + len(self.encode(message.content))
def num_tokens_for_messages(self, messages: Session) -> int:
return (
sum(self.num_tokens_for_message(message) for message in messages)
+ self.tokens_overhead
)
def num_tokens_for_messages(self, messages):
return sum(self.num_tokens_for_message(message) for message in messages) + 3
@classmethod
@functools.cache
def from_model(cls, model: str) -> "EncodingMeta":
def from_model(cls, model):
if model == "gpt-3.5-turbo":
# print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.")
model = "gpt-3.5-turbo-0613"
if model == "gpt-4":
# print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.")
model = "gpt-4-0613"
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
warnings.warn("Warning: model not found. Using cl100k_base encoding.")
print("Warning: model not found. Using cl100k_base encoding.")
encoding = tiktoken.get_encoding("cl100k_base")
tokens_per_message = 3
tokens_per_name = 1
tokens_overhead = 3
if model == "gpt-3.5-turbo-0301":
if model in {
"gpt-3.5-turbo-0613",
"gpt-3.5-turbo-16k-0613",
"gpt-4-0314",
"gpt-4-32k-0314",
"gpt-4-0613",
"gpt-4-32k-0613",
}:
tokens_per_message = 3
tokens_per_name = 1
elif model == "gpt-3.5-turbo-0301":
tokens_per_message = (
4 # every message follows <|start|>{role/name}\n{content}<|end|>\n
)
tokens_per_name = -1 # if there's a name, the role is omitted
return cls(encoding, tokens_per_message, tokens_per_name, tokens_overhead)
else:
raise NotImplementedError(
f"""EncodingMeta is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens."""
)
return cls(encoding, tokens_per_message, tokens_per_name)
class ChatGPT(UsesKeyMixin):
class ChatGPT:
@dataclass
class Parameters:
model: str = "gpt-4o-mini"
"""The model to use. The most common alternative value is 'gpt-4o'."""
model: str = "gpt-3.5-turbo"
"""The model to use. The most common alternative value is 'gpt-4'."""
max_request_tokens: int = 1024
"""The approximate greatest number of tokens to send in a request. When the session is long, the system prompt and 1 or more of the most recent interaction steps are sent."""
url: str = "https://api.openai.com/v1/chat/completions"
"""The URL of a chatgpt-compatible server's completion endpoint. Notably, llama.cpp's server is compatible with this backend, and can automatically apply common chat templates too."""
temperature: float | None = None
"""The model temperature for sampling"""
top_p: float | None = None
"""The model temperature for sampling"""
api_key_name: str = "openai_api_key"
"""The OpenAI API key"""
parameters: Parameters
def __init__(self) -> None:
def __init__(self):
self.parameters = self.Parameters()
system_message: str = "You're named Chap (they/them). Chap strives to answer truthfully and frankly. You have opinions, preferences, and emotions, you can say when you don't know, and you prefer to speak in plain language."
def make_full_prompt(self, all_history: Session) -> Session:
def make_full_prompt(self, all_history):
encoding = EncodingMeta.from_model(self.parameters.model)
result = [all_history[0]] # Assumed to be system prompt
left = self.parameters.max_request_tokens - encoding.num_tokens_for_messages(
@ -106,50 +97,52 @@ class ChatGPT(UsesKeyMixin):
else:
break
result.extend(reversed(parts))
return result
return Session(result)
def ask(self, session: Session, query: str, *, timeout: float = 60) -> str:
full_prompt = self.make_full_prompt(session + [User(query)])
def ask(self, session, query, *, timeout=60):
full_prompt = self.make_full_prompt(session.session + [User(query)])
response = httpx.post(
self.parameters.url,
"https://api.openai.com/v1/chat/completions",
json={
"model": self.parameters.model,
"messages": session_to_list(full_prompt),
},
"messages": full_prompt.to_dict()[ # pylint: disable=no-member
"session"
],
}, # pylint: disable=no-member
headers={
"Authorization": f"Bearer {self.get_key()}",
},
timeout=timeout,
)
if response.status_code != 200:
return f"Failure {response.text} ({response.status_code})"
print("Failure", response.status_code, response.text)
return None
try:
j = response.json()
result = cast(str, j["choices"][0]["message"]["content"])
result = j["choices"][0]["message"]["content"]
except (KeyError, IndexError, json.decoder.JSONDecodeError):
return f"Failure {response.text} ({response.status_code})"
print("Failure", response.status_code, response.text)
return None
session.extend([User(query), Assistant(result)])
session.session.extend([User(query), Assistant(result)])
return result
async def aask(
self, session: Session, query: str, *, timeout: float = 60
) -> AsyncGenerator[str, None]:
full_prompt = self.make_full_prompt(session + [User(query)])
async def aask(self, session, query, *, timeout=60):
full_prompt = self.make_full_prompt(session.session + [User(query)])
new_content = []
try:
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream(
"POST",
self.parameters.url,
"https://api.openai.com/v1/chat/completions",
headers={"authorization": f"Bearer {self.get_key()}"},
json={
"model": self.parameters.model,
"temperature": self.parameters.temperature,
"top_p": self.parameters.top_p,
"stream": True,
"messages": session_to_list(full_prompt),
"messages": full_prompt.to_dict()[ # pylint: disable=no-member
"session"
], # pylint: disable=no-member
},
) as response:
if response.status_code == 200:
@ -174,9 +167,13 @@ class ChatGPT(UsesKeyMixin):
new_content.append(content)
yield content
session.extend([User(query), Assistant("".join(new_content))])
session.session.extend([User(query), Assistant("".join(new_content))])
@classmethod
def get_key(cls):
return get_key("openai_api_key")
def factory() -> Backend:
def factory():
"""Uses the OpenAI chat completion API"""
return ChatGPT()

View file

@ -2,25 +2,19 @@
#
# SPDX-License-Identifier: MIT
import asyncio
import json
import uuid
from dataclasses import dataclass
from typing import AsyncGenerator
import websockets
from ..core import AutoAskMixin, Backend
from ..session import Assistant, Role, Session, User
from ..key import get_key
from ..session import Assistant, Session, User
class Textgen(AutoAskMixin):
@dataclass
class Parameters:
server_hostname: str = "localhost"
def __init__(self) -> None:
super().__init__()
self.parameters = self.Parameters()
class Textgen:
def __init__(self):
self.server = get_key("textgen_url", "textgen server URL")
system_message = """\
A dialog, where USER interacts with AI. AI is helpful, kind, obedient, honest, and knows its own limits.
@ -30,13 +24,8 @@ USER: Hello, AI.
AI: Hello! How can I assist you today?"""
async def aask(
self,
session: Session,
query: str,
*,
max_query_size: int = 5,
timeout: float = 60,
) -> AsyncGenerator[str, None]:
self, session, query, *, max_query_size=5, timeout=60
): # pylint: disable=unused-argument,too-many-locals,too-many-branches
params = {
"max_new_tokens": 200,
"do_sample": True,
@ -55,17 +44,20 @@ AI: Hello! How can I assist you today?"""
session_hash = str(uuid.uuid4())
role_map = {
Role.USER: "USER: ",
Role.ASSISTANT: "AI: ",
"user": "USER: ",
"assistant": "AI: ",
}
full_prompt = session + [User(query)]
del full_prompt[1:-max_query_size]
new_data = old_data = full_query = "\n".join(
f"{role_map.get(q.role,'')}{q.content}\n" for q in full_prompt
) + f"\n{role_map.get('assistant')}"
full_prompt = Session(session.session + [User(query)])
del full_prompt.session[1:-max_query_size]
new_data = old_data = full_query = (
"\n".join(
f"{role_map.get(q.role,'')}{q.content}\n" for q in full_prompt.session
)
+ f"\n{role_map.get('assistant')}"
)
try:
async with websockets.connect(
f"ws://{self.parameters.server_hostname}:7860/queue/join"
async with websockets.connect( # pylint: disable=no-member
f"ws://{self.server}:7860/queue/join"
) as websocket:
while content := json.loads(await websocket.recv()):
if content["msg"] == "send_hash":
@ -126,15 +118,21 @@ AI: Hello! How can I assist you today?"""
# stop generation by closing the websocket here
if content["msg"] == "process_completed":
break
except Exception as e:
except Exception as e: # pylint: disable=broad-exception-caught
content = f"\nException: {e!r}"
new_data += content
yield content
all_response = new_data[len(full_query) :]
session.extend([User(query), Assistant(all_response)])
session.session.extend([User(query), Assistant(all_response)])
def ask(self, session, query, *, max_query_size=5, timeout=60):
asyncio.run(
self.aask(session, query, max_query_size=max_query_size, timeout=timeout)
)
return session.session[-1].message
def factory() -> Backend:
def factory():
"""Uses the textgen completion API"""
return Textgen()

View file

@ -4,52 +4,42 @@
import asyncio
import sys
from typing import Iterable, Optional, Protocol
import click
import rich
from ..core import Backend, Obj, command_uses_new_session
from ..session import Session, session_to_file
from ..core import command_uses_new_session
bold = "\033[1m"
nobold = "\033[m"
def ipartition(s: str, sep: str) -> Iterable[tuple[str, str]]:
def ipartition(s, sep):
rest = s
while rest:
first, opt_sep, rest = rest.partition(sep)
yield (first, opt_sep)
class Printable(Protocol):
def raw(self, s: str) -> None:
"""Print a raw escape code"""
def add(self, s: str) -> None:
"""Add text to the output"""
class DumbPrinter:
def raw(self, s: str) -> None:
def raw(self, s):
pass
def add(self, s: str) -> None:
def add(self, s):
print(s, end="")
class WrappingPrinter:
def __init__(self, width: Optional[int] = None) -> None:
def __init__(self, width=None):
self._width = width or rich.get_console().width
self._column = 0
self._line = ""
self._sp = ""
def raw(self, s: str) -> None:
def raw(self, s):
print(s, end="")
def add(self, s: str) -> None:
def add(self, s):
for line, opt_nl in ipartition(s, "\n"):
for word, opt_sp in ipartition(line, " "):
newlen = len(self._line) + len(self._sp) + len(word)
@ -73,16 +63,15 @@ class WrappingPrinter:
self._sp = ""
def verbose_ask(api: Backend, session: Session, q: str, print_prompt: bool) -> str:
printer: Printable
def verbose_ask(api, session, q, print_prompt, **kw):
if sys.stdout.isatty():
printer = WrappingPrinter()
else:
printer = DumbPrinter()
tokens: list[str] = []
tokens = []
async def work() -> None:
async for token in api.aask(session, q):
async def work():
async for token in api.aask(session, q, **kw):
printer.add(token)
if print_prompt:
@ -100,34 +89,22 @@ def verbose_ask(api: Backend, session: Session, q: str, print_prompt: bool) -> s
@command_uses_new_session
@click.option("--print-prompt/--no-print-prompt", default=True)
@click.option("--stdin/--no-stdin", "use_stdin", default=False)
@click.argument("prompt", nargs=-1)
def main(obj: Obj, prompt: list[str], use_stdin: bool, print_prompt: bool) -> None:
@click.argument("prompt", nargs=-1, required=True)
def main(obj, prompt, print_prompt):
"""Ask a question (command-line argument is passed as prompt)"""
session = obj.session
assert session is not None
session_filename = obj.session_filename
assert session_filename is not None
api = obj.api
assert api is not None
if use_stdin:
if prompt:
raise click.UsageError("Can't use 'prompt' together with --stdin")
joined_prompt = sys.stdin.read()
else:
joined_prompt = " ".join(prompt)
# symlink_session_filename(session_filename)
response = verbose_ask(api, session, joined_prompt, print_prompt=print_prompt)
response = verbose_ask(api, session, " ".join(prompt), print_prompt=print_prompt)
print(f"Saving session to {session_filename}", file=sys.stderr)
if response is not None:
session_to_file(session, session_filename)
with open(session_filename, "w", encoding="utf-8") as f:
f.write(session.to_json())
if __name__ == "__main__":
main()
main() # pylint: disable=no-value-for-parameter

View file

@ -4,26 +4,23 @@
import click
from ..core import Obj, command_uses_existing_session
from ..session import Role
from ..core import command_uses_existing_session
@command_uses_existing_session
@click.option("--no-system", is_flag=True)
def main(obj: Obj, no_system: bool) -> None:
def main(obj, no_system):
"""Print session in plaintext"""
session = obj.session
if not session:
return
first = True
for row in session:
for row in session.session:
if not first:
print()
first = False
if row.role == Role.USER:
if row.role == "user":
decoration = "**"
elif row.role == Role.SYSTEM:
elif row.role == "system":
if no_system:
continue
decoration = "_"
@ -38,4 +35,4 @@ def main(obj: Obj, no_system: bool) -> None:
if __name__ == "__main__":
main()
main() # pylint: disable=no-value-for-parameter

View file

@ -6,32 +6,27 @@ from __future__ import annotations
import pathlib
import re
import sys
from typing import Iterable, Optional, Tuple
import click
import rich
from ..core import conversations_path as default_conversations_path
from ..session import Message, session_from_file
from ..session import Message, Session
from .render import to_markdown
def list_files_matching_rx(
rx: re.Pattern[str], conversations_path: Optional[pathlib.Path] = None
rx: re.Pattern, conversations_path: Optional[str] = None
) -> Iterable[Tuple[pathlib.Path, Message]]:
for conversation in (conversations_path or default_conversations_path).glob(
"*.json"
):
try:
session = session_from_file(conversation)
except Exception as e:
print(f"Failed to read {conversation}: {e}", file=sys.stderr)
continue
for message in session:
if isinstance(message.content, str) and rx.search(message.content):
yield conversation, message
with open(conversation, "r", encoding="utf-8") as f:
session = Session.from_json(f.read()) # pylint: disable=no-member
for message in session.session:
if isinstance(message.content, str) and rx.search(message.content):
yield conversation, message
@click.command
@ -39,9 +34,7 @@ def list_files_matching_rx(
@click.option("--files-with-matches", "-l", is_flag=True)
@click.option("--fixed-strings", "--literal", "-F", is_flag=True)
@click.argument("pattern", nargs=1, required=True)
def main(
ignore_case: bool, files_with_matches: bool, fixed_strings: bool, pattern: str
) -> None:
def main(ignore_case, files_with_matches, fixed_strings, pattern):
"""Search sessions for pattern"""
console = rich.get_console()
if fixed_strings:
@ -49,7 +42,7 @@ def main(
rx = re.compile(pattern, re.I if ignore_case else 0)
last_file = None
for f, m in list_files_matching_rx(rx, None):
for f, m in list_files_matching_rx(rx, ignore_case):
if f != last_file:
if files_with_matches:
print(f)
@ -67,4 +60,4 @@ def main(
if __name__ == "__main__":
main()
main() # pylint: disable=no-value-for-parameter

View file

@ -6,44 +6,41 @@ from __future__ import annotations
import json
import pathlib
from typing import Any, Iterator, TextIO
import click
import rich
from ..core import conversations_path, new_session_path
from ..session import Message, Role, Session, new_session, session_to_file
from ..session import Message, Session
console = rich.get_console()
def iter_sessions(
name: str, content: Any, session_in: Session, node_id: str
) -> Iterator[tuple[str, Session]]:
def iter_sessions(name, content, session_in, node_id):
node = content["mapping"][node_id]
session = session_in[:]
session = Session(session_in.session[:])
if "message" in node:
role = node["message"]["author"]["role"]
text_content = "".join(node["message"]["content"]["parts"])
session.append(Message(role=role, content=text_content))
session.session.append(Message(role=role, content=text_content))
if children := node.get("children"):
for c in children:
yield from iter_sessions(name, content, session, c)
else:
title = content.get("title") or "Untitled"
session[0] = Message(
Role.SYSTEM,
session.session[0] = Message(
"system",
f"# {title}\nChatGPT session imported from {name}, branch {node_id}.\n\n",
)
yield node_id, session
def do_import(output_directory: pathlib.Path, f: TextIO) -> None:
def do_import(output_directory, f):
stem = pathlib.Path(f.name).stem
content = json.load(f)
session = new_session()
session = Session.new_session()
default_branch = content["current_node"]
console.print(f"Importing [bold]{f.name}[nobold]")
@ -55,7 +52,8 @@ def do_import(output_directory: pathlib.Path, f: TextIO) -> None:
session_filename = new_session_path(
output_directory / (f"{stem}_{branch}.json")
)
session_to_file(session, session_filename)
with open(session_filename, "w", encoding="utf-8") as f_out:
f_out.write(session.to_json()) # pylint: disable=no-member
console.print(f" -> {session_filename}")
@ -69,7 +67,7 @@ def do_import(output_directory: pathlib.Path, f: TextIO) -> None:
@click.argument(
"files", nargs=-1, required=True, type=click.File("r", encoding="utf-8")
)
def main(output_directory: pathlib.Path, files: list[TextIO]) -> None:
def main(output_directory, files):
"""Import files from the ChatGPT webui
This understands the format produced by
@ -82,4 +80,4 @@ def main(output_directory: pathlib.Path, files: list[TextIO]) -> None:
if __name__ == "__main__":
main()
main() # pylint: disable=no-value-for-parameter

View file

@ -7,15 +7,14 @@ import rich
from markdown_it import MarkdownIt
from rich.markdown import Markdown
from ..core import Obj, command_uses_existing_session
from ..session import Message, Role
from ..core import command_uses_existing_session
def to_markdown(message: Message) -> Markdown:
def to_markdown(message):
role = message.role
if role == Role.USER:
if role == "user":
style = "bold"
elif role == Role.SYSTEM:
elif role == "system":
style = "italic"
else:
style = "none"
@ -28,21 +27,20 @@ def to_markdown(message: Message) -> Markdown:
@command_uses_existing_session
@click.option("--no-system", is_flag=True)
def main(obj: Obj, no_system: bool) -> None:
def main(obj, no_system):
"""Print session with formatting"""
session = obj.session
assert session is not None
console = rich.get_console()
first = True
for row in session:
for row in session.session:
if not first:
console.print()
first = False
if no_system and row.role == Role.SYSTEM:
if no_system and row.role == "system":
continue
console.print(to_markdown(row))
if __name__ == "__main__":
main()
main() # pylint: disable=no-value-for-parameter

View file

@ -4,15 +4,6 @@
* SPDX-License-Identifier: MIT
*/
.role_user.history_exclude, .role_assistant.history_exclude {
color: $text-disabled;
border-left: dashed $primary;
}
.role_assistant.history_exclude:focus-within {
color: $text-disabled;
border-left: dashed $secondary;
}
.role_system {
text-style: italic;
color: $text-muted;
@ -36,26 +27,13 @@ Markdown:focus-within {
Markdown {
border-left: heavy transparent;
}
MarkdownFence {
max-height: 9999;
}
Footer {
dock: top;
}
Input, #wait {
Input {
dock: bottom;
}
Input {
height: 3;
}
#wait { display: none; height: 3 }
#wait CancelButton { dock: right; border: none; margin: 0; height: 3; text-style: none}
Markdown {
margin: 0 1 0 0;
}
SubmittableTextArea { height: auto; min-height: 5; margin: 0; border: none; border-left: heavy $primary }

View file

@ -3,304 +3,180 @@
# SPDX-License-Identifier: MIT
import asyncio
import subprocess
import sys
from typing import Any, Optional, cast, TYPE_CHECKING
import click
from markdown_it import MarkdownIt
from textual import work
from textual._ansi_sequences import ANSI_SEQUENCES_KEYS
from textual.app import App, ComposeResult
from textual.app import App
from textual.binding import Binding
from textual.containers import Container, Horizontal, VerticalScroll
from textual.keys import Keys
from textual.widgets import Button, Footer, LoadingIndicator, Markdown, TextArea
from textual.containers import Container, VerticalScroll
from textual.widgets import Footer, Input, Markdown
from ..core import Backend, Obj, command_uses_new_session, get_api, new_session_path
from ..session import Assistant, Message, Session, User, new_session, session_to_file
from ..core import command_uses_new_session, get_api, new_session_path
from ..session import Assistant, Session, User
# workaround for pyperclip being un-typed
if TYPE_CHECKING:
def pyperclip_copy(data: str) -> None:
...
else:
from pyperclip import copy as pyperclip_copy
# Monkeypatch alt+enter as meaning "F9", WFM
# ignore typing here because ANSI_SEQUENCES_KEYS is a Mapping[] which is read-only as
# far as mypy is concerned.
ANSI_SEQUENCES_KEYS["\x1b\r"] = (Keys.F9,) # type: ignore
ANSI_SEQUENCES_KEYS["\x1b\n"] = (Keys.F9,) # type: ignore
class SubmittableTextArea(TextArea):
BINDINGS = [
Binding("f9", "app.submit", "Submit", show=True),
Binding("tab", "focus_next", show=False, priority=True), # no inserting tabs
]
def parser_factory() -> MarkdownIt:
def parser_factory():
parser = MarkdownIt()
parser.options["html"] = False
return parser
class ChapMarkdown(Markdown, can_focus=True, can_focus_children=False):
class Markdown(
Markdown, can_focus=True, can_focus_children=False
): # pylint: disable=function-redefined
BINDINGS = [
Binding("ctrl+c", "app.yank", "Copy text", show=True),
Binding("ctrl+r", "app.resubmit", "resubmit", show=True),
Binding("ctrl+x", "app.redraft", "redraft", show=True),
Binding("ctrl+q", "app.toggle_history", "history toggle", show=True),
Binding("ctrl+y", "yank", "Yank text", show=True),
Binding("ctrl+r", "resubmit", "resubmit", show=True),
Binding("ctrl+x", "delete", "delete to end", show=True),
Binding("ctrl+q", "toggle_history", "history toggle", show=True),
]
def markdown_for_step(step: Message) -> ChapMarkdown:
return ChapMarkdown(
def markdown_for_step(step):
return Markdown(
step.content.strip() or "",
classes="role_" + step.role,
parser_factory=parser_factory,
)
class CancelButton(Button):
BINDINGS = [
Binding("escape", "stop_generating", "Stop Generating", show=True),
]
class Tui(App[None]):
class Tui(App):
CSS_PATH = "tui.css"
BINDINGS = [
Binding("ctrl+q", "quit", "Quit", show=True, priority=True),
Binding("ctrl+c", "app.quit", "Quit", show=True, priority=True),
]
def __init__(
self, api: Optional[Backend] = None, session: Optional[Session] = None
) -> None:
def __init__(self, api=None, session=None):
super().__init__()
self.api = api or get_api(click.Context(click.Command("chap tui")), "lorem")
self.session = (
new_session(self.api.system_message) if session is None else session
)
self.api = api or get_api("lorem")
self.session = session or Session.new_session(self.api.system_message)
@property
def spinner(self) -> LoadingIndicator:
return self.query_one(LoadingIndicator)
def input(self):
return self.query_one(Input)
@property
def wait(self) -> VerticalScroll:
return cast(VerticalScroll, self.query_one("#wait"))
def container(self):
return self.query_one("#content")
@property
def input(self) -> SubmittableTextArea:
return self.query_one(SubmittableTextArea)
@property
def cancel_button(self) -> CancelButton:
return self.query_one(CancelButton)
@property
def container(self) -> VerticalScroll:
return cast(VerticalScroll, self.query_one("#content"))
def compose(self) -> ComposeResult:
def compose(self):
yield Footer()
yield VerticalScroll(
*[markdown_for_step(step) for step in self.session],
# The pad container helps reduce flickering when rendering fresh
# content and scrolling. (it's not clear why this makes a
# difference and it'd be nice to be rid of the workaround)
Container(id="pad"),
id="content",
)
s = SubmittableTextArea(language="markdown")
s.show_line_numbers = False
yield s
with Horizontal(id="wait"):
yield LoadingIndicator()
yield CancelButton(label="❌ Stop Generation", id="cancel", disabled=True)
yield Input(placeholder="Prompt")
yield VerticalScroll(Container(id="pad"), id="content")
async def on_mount(self) -> None:
self.container.scroll_end(animate=False)
await self.container.mount_all(
[markdown_for_step(step) for step in self.session.session], before="#pad"
)
# self.scrollview.scroll_y = self.scrollview.get_content_height()
self.scroll_end()
self.input.focus()
async def action_submit(self) -> None:
self.get_completion(self.input.text)
@work(exclusive=True)
async def get_completion(self, query: str) -> None:
async def on_input_submitted(self, event) -> None:
self.scroll_end()
self.input.styles.display = "none"
self.wait.styles.display = "block"
self.input.disabled = True
self.cancel_button.disabled = False
self.cancel_button.focus()
output = markdown_for_step(Assistant("*query sent*"))
await self.container.mount_all(
[markdown_for_step(User(query)), output], before="#pad"
[markdown_for_step(User(event.value)), output], before="#pad"
)
update: asyncio.Queue[bool] = asyncio.Queue(1)
for markdown in self.container.children:
markdown.disabled = True
tokens = []
update = asyncio.Queue(1)
# Construct a fake session with only select items
session = []
for si, wi in zip(self.session, self.container.children):
session = Session()
for si, wi in zip(self.session.session, self.container.children):
if not wi.has_class("history_exclude"):
session.append(si)
session.session.append(si)
message = Assistant("")
self.session.extend(
[
User(query),
message,
]
)
async def render_fun() -> None:
old_len = 0
async def render_fun():
while await update.get():
content = message.content
new_len = len(content)
new_content = content[old_len:new_len]
if new_content:
if old_len:
await output.append(new_content)
else:
output.update(content)
self.container.scroll_end()
old_len = new_len
await asyncio.sleep(0.01)
if tokens:
output.update("".join(tokens).strip())
self.container.scroll_end()
await asyncio.sleep(0.1)
async def get_token_fun() -> None:
async for token in self.api.aask(session, query):
message.content += token
async def get_token_fun():
async for token in self.api.aask(session, event.value):
tokens.append(token)
try:
update.put_nowait(True)
except asyncio.QueueFull:
# QueueFull exception is expected. If something's in the
# queue then render_fun will run soon.
pass
await update.put(False)
try:
await asyncio.gather(render_fun(), get_token_fun())
self.input.value = ""
finally:
self.input.clear()
all_output = self.session[-1].content
self.session.session.extend(session.session[-2:])
all_output = self.session.session[-1].content
output.update(all_output)
output._markdown = all_output
output._markdown = all_output # pylint: disable=protected-access
self.container.scroll_end()
for markdown in self.container.children:
markdown.disabled = False
self.input.styles.display = "block"
self.wait.styles.display = "none"
self.input.disabled = False
self.cancel_button.disabled = True
self.input.focus()
def scroll_end(self) -> None:
def scroll_end(self):
self.call_after_refresh(self.container.scroll_end)
def action_yank(self) -> None:
def action_yank(self):
widget = self.focused
if isinstance(widget, ChapMarkdown):
content = widget._markdown or ""
pyperclip_copy(content)
if isinstance(widget, Markdown):
content = widget._markdown # pylint: disable=protected-access
subprocess.run(["xsel", "-ib"], input=content.encode("utf-8"), check=False)
def action_toggle_history(self) -> None:
def action_toggle_history(self):
widget = self.focused
if not isinstance(widget, ChapMarkdown):
if not isinstance(widget, Markdown):
return
children = self.container.children
idx = children.index(widget)
if idx == 0:
return
while idx > 1 and not children[idx].has_class("role_user"):
while idx > 1 and not "role_user" in children[idx].classes:
idx -= 1
widget = children[idx]
for m in children[idx : idx + 2]:
m.toggle_class("history_exclude")
children[idx].toggle_class("history_exclude")
children[idx + 1].toggle_class("history_exclude")
async def action_stop_generating(self) -> None:
self.workers.cancel_all()
async def action_resubmit(self):
await self.delete_or_resubmit(True)
async def on_button_pressed(self, event: Button.Pressed) -> None:
self.workers.cancel_all()
async def action_delete(self):
await self.delete_or_resubmit(False)
async def action_quit(self) -> None:
self.workers.cancel_all()
self.exit()
async def action_resubmit(self) -> None:
await self.redraft_or_resubmit(True)
async def action_redraft(self) -> None:
await self.redraft_or_resubmit(False)
async def redraft_or_resubmit(self, resubmit: bool) -> None:
async def delete_or_resubmit(self, resubmit):
widget = self.focused
if not isinstance(widget, ChapMarkdown):
if not isinstance(widget, Markdown):
return
children = self.container.children
idx = children.index(widget)
if idx < 1:
return
while idx > 1 and not children[idx].has_class("role_user"):
idx -= 1
widget = children[idx]
# Save a copy of the discussion before this deletion
session_to_file(self.session, new_session_path())
with open(new_session_path(), "w", encoding="utf-8") as f:
f.write(self.session.to_json())
query = self.session[idx].content
self.input.load_text(query)
query = self.session.session[idx].content
self.input.value = query
del self.session[idx:]
del self.session.session[idx:]
for child in self.container.children[idx:-1]:
await child.remove()
self.input.focus()
self.on_text_area_changed()
if resubmit:
await self.action_submit()
def on_text_area_changed(self, event: Any = None) -> None:
height = self.input.document.get_size(self.input.indent_width)[1]
max_height = max(3, self.size.height - 6)
if height >= max_height:
self.input.styles.height = max_height
elif height <= 3:
self.input.styles.height = 3
else:
self.input.styles.height = height
await self.input.action_submit()
@command_uses_new_session
@click.option("--replace-system-prompt/--no-replace-system-prompt", default=False)
def main(obj: Obj, replace_system_prompt: bool) -> None:
def main(obj):
"""Start interactive terminal user interface session"""
api = obj.api
assert api is not None
session = obj.session
assert session is not None
session_filename = obj.session_filename
assert session_filename is not None
if replace_system_prompt:
session[0].content = (
api.system_message if obj.system_message is None else obj.system_message
)
tui = Tui(api, session)
tui.run()
@ -310,8 +186,9 @@ def main(obj: Obj, replace_system_prompt: bool) -> None:
print(f"Saving session to {session_filename}", file=sys.stderr)
session_to_file(session, session_filename)
with open(session_filename, "w", encoding="utf-8") as f:
f.write(session.to_json())
if __name__ == "__main__":
main()
main() # pylint: disable=no-value-for-parameter

View file

@ -1,124 +1,42 @@
# SPDX-FileCopyrightText: 2023 Jeff Epler <jepler@gmail.com>
#
# SPDX-License-Identifier: MIT
# pylint: disable=import-outside-toplevel
from collections.abc import Sequence
import asyncio
import datetime
import io
import importlib
import os
import pathlib
import pkgutil
import subprocess
import shlex
import textwrap
from dataclasses import MISSING, Field, dataclass, fields
from typing import (
Any,
AsyncGenerator,
Callable,
Optional,
Union,
IO,
cast,
get_origin,
get_args,
)
import sys
from dataclasses import MISSING, dataclass, fields
import click
import platformdirs
from simple_parsing.docstring import get_attribute_docstring
from typing_extensions import Protocol
from . import backends, commands
from .session import Message, Session, System, session_from_file
UnionType: type
if sys.version_info >= (3, 10):
from types import UnionType
else:
UnionType = type(Union[int, float])
from . import backends, commands # pylint: disable=no-name-in-module
from .session import Session
conversations_path = platformdirs.user_state_path("chap") / "conversations"
conversations_path.mkdir(parents=True, exist_ok=True)
configuration_path = platformdirs.user_config_path("chap")
preset_path = configuration_path / "preset"
class ABackend(Protocol):
def aask(self, session: Session, query: str) -> AsyncGenerator[str, None]:
"""Make a query, updating the session with the query and response, returning the query token by token"""
class Backend(ABackend, Protocol):
parameters: Any
system_message: str
def ask(self, session: Session, query: str) -> str:
"""Make a query, updating the session with the query and response, returning the query"""
class AutoAskMixin:
"""Mixin class for backends implementing aask"""
def ask(self, session: Session, query: str) -> str:
tokens: list[str] = []
async def inner() -> None:
# https://github.com/pylint-dev/pylint/issues/5761
async for token in self.aask(session, query): # type: ignore
tokens.append(token)
asyncio.run(inner())
return "".join(tokens)
def last_session_path() -> Optional[pathlib.Path]:
def last_session_path():
result = max(
conversations_path.glob("*.json"), key=lambda p: p.stat().st_mtime, default=None
)
print(result)
return result
def new_session_path(opt_path: Optional[pathlib.Path] = None) -> pathlib.Path:
def new_session_path(opt_path=None):
return opt_path or conversations_path / (
datetime.datetime.now().isoformat().replace(":", "_") + ".json"
)
def get_field_type(field: Field[Any]) -> Any:
field_type = field.type
if isinstance(field_type, str):
raise RuntimeError(
"parameters dataclass may not use 'from __future__ import annotations"
)
origin = get_origin(field_type)
if origin in (Union, UnionType):
for arg in get_args(field_type):
if arg is not None:
return arg
return field_type
def convert_str_to_field(ctx: click.Context, field: Field[Any], value: str) -> Any:
field_type = get_field_type(field)
try:
if field_type is bool:
tv = click.types.BoolParamType().convert(value, None, ctx)
else:
tv = field_type(value)
return tv
except ValueError as e:
raise click.BadParameter(
f"Invalid value for {field.name} with value {value}: {e}"
) from e
def configure_api_from_environment(
ctx: click.Context, api_name: str, api: Backend
) -> None:
def configure_api_from_environment(api_name, api):
if not hasattr(api, "parameters"):
return
@ -127,81 +45,74 @@ def configure_api_from_environment(
value = os.environ.get(envvar)
if value is None:
continue
tv = convert_str_to_field(ctx, field, value)
try:
tv = field.type(value)
except ValueError as e:
raise click.BadParameter(
f"Invalid value for {field.name} with value {value}: {e}"
) from e
setattr(api.parameters, field.name, tv)
def get_api(ctx: click.Context | None = None, name: str | None = None) -> Backend:
if ctx is None:
ctx = click.Context(click.Command("chap"))
if name is None:
name = os.environ.get("CHAP_BACKEND", "openai_chatgpt")
name = name.replace("-", "_")
backend = cast(
Backend, importlib.import_module(f"{__package__}.backends.{name}").factory()
)
configure_api_from_environment(ctx, name, backend)
return backend
def get_api(name="openai_chatgpt"):
result = importlib.import_module(f"{__package__}.backends.{name}").factory()
configure_api_from_environment(name, result)
return result
def do_session_continue(
ctx: click.Context, param: click.Parameter, value: Optional[pathlib.Path]
) -> None:
def ask(*args, **kw):
return get_api().ask(*args, **kw)
def aask(*args, **kw):
return get_api().aask(*args, **kw)
def do_session_continue(ctx, param, value):
if value is None:
return
if ctx.obj.session is not None:
raise click.BadParameter(
"--continue-session, --last and --new-session are mutually exclusive",
param=param,
param, "--continue-session, --last and --new-session are mutually exclusive"
)
ctx.obj.session = session_from_file(value)
with open(value, "r", encoding="utf-8") as f:
ctx.obj.session = Session.from_json(f.read()) # pylint: disable=no-member
ctx.obj.session_filename = value
def do_session_last(ctx: click.Context, param: click.Parameter, value: bool) -> None:
def do_session_last(ctx, param, value): # pylint: disable=unused-argument
if not value:
return
do_session_continue(ctx, param, last_session_path())
def do_session_new(
ctx: click.Context, param: click.Parameter, value: pathlib.Path
) -> None:
def do_session_new(ctx, param, value):
if ctx.obj.session is not None:
if value is None:
return
raise click.BadParameter(
"--continue-session, --last and --new-session are mutually exclusive",
param=param,
raise click.BadOptionUsage(
param, "--continue-session, --last and --new-session are mutually exclusive"
)
session_filename = new_session_path(value)
system_message = ctx.obj.system_message or ctx.obj.api.system_message
ctx.obj.session = [System(system_message)]
ctx.obj.session = Session.new_session(system_message)
ctx.obj.session_filename = session_filename
def colonstr(arg: str) -> tuple[str, str]:
def colonstr(arg):
if ":" not in arg:
raise click.BadParameter("must be of the form 'name:value'")
return cast(tuple[str, str], tuple(arg.split(":", 1)))
return arg.split(":", 1)
def set_system_message(ctx: click.Context, param: click.Parameter, value: str) -> None:
if value is None:
return
def set_system_message(ctx, param, value): # pylint: disable=unused-argument
if value and value.startswith("@"):
with open(value[1:], "r", encoding="utf-8") as f:
value = f.read().rstrip()
ctx.obj.system_message = value
def set_system_message_from_file(
ctx: click.Context, param: click.Parameter, value: io.TextIOWrapper
) -> None:
if value is None:
return
content = value.read().strip()
ctx.obj.system_message = content
def set_backend(ctx: click.Context, param: click.Parameter, value: str) -> None:
def set_backend(ctx, param, value): # pylint: disable=unused-argument
if value == "list":
formatter = ctx.make_formatter()
format_backend_list(formatter)
@ -209,12 +120,12 @@ def set_backend(ctx: click.Context, param: click.Parameter, value: str) -> None:
ctx.exit()
try:
ctx.obj.api = get_api(ctx, value)
ctx.obj.api = get_api(value)
except ModuleNotFoundError as e:
raise click.BadParameter(str(e))
def format_backend_help(api: Backend, formatter: click.HelpFormatter) -> None:
def format_backend_help(api, formatter):
with formatter.section(f"Backend options for {api.__class__.__name__}"):
rows = []
for f in fields(api.parameters):
@ -223,16 +134,12 @@ def format_backend_help(api: Backend, formatter: click.HelpFormatter) -> None:
doc = get_attribute_docstring(type(api.parameters), f.name).docstring_below
if doc:
doc += " "
doc += f"(Default: {default!r})"
f_type = get_field_type(f)
typename = f_type.__name__
rows.append((f"-B {name}:{typename.upper()}", doc))
doc += f"(Default: {default})"
rows.append((f"-B {name}:{f.type.__name__.upper()}", doc))
formatter.write_dl(rows)
def set_backend_option(
ctx: click.Context, param: click.Parameter, opts: list[tuple[str, str]]
) -> None:
def set_backend_option(ctx, param, opts): # pylint: disable=unused-argument
api = ctx.obj.api
if not hasattr(api, "parameters"):
raise click.BadParameter(
@ -240,19 +147,24 @@ def set_backend_option(
)
all_fields = dict((f.name.replace("_", "-"), f) for f in fields(api.parameters))
def set_one_backend_option(kv: tuple[str, str]) -> None:
def set_one_backend_option(kv):
name, value = kv
field = all_fields.get(name)
if field is None:
raise click.BadParameter(f"Invalid parameter {name}")
tv = convert_str_to_field(ctx, field, value)
setattr(api.parameters, field.name, tv)
try:
tv = field.type(value)
except ValueError as e:
raise click.BadParameter(
f"Invalid value for {name} with value {value}: {e}"
) from e
setattr(api.parameters, name, tv)
for kv in opts:
set_one_backend_option(kv)
def format_backend_list(formatter: click.HelpFormatter) -> None:
def format_backend_list(formatter):
all_backends = []
for pi in pkgutil.walk_packages(backends.__path__):
name = pi.name
@ -274,7 +186,7 @@ def format_backend_list(formatter: click.HelpFormatter) -> None:
formatter.write_dl(rows)
def uses_session(f: click.decorators.FC) -> Callable[[], None]:
def uses_session(f):
f = click.option(
"--continue-session",
"-s",
@ -286,15 +198,18 @@ def uses_session(f: click.decorators.FC) -> Callable[[], None]:
f = click.option(
"--last", is_flag=True, callback=do_session_last, expose_value=False
)(f)
return click.pass_obj(f)
f = click.pass_obj(f)
return f
def command_uses_existing_session(f: click.decorators.FC) -> click.Command:
return click.command()(uses_session(f))
def command_uses_existing_session(f):
f = uses_session(f)
f = click.command()(f)
return f
def command_uses_new_session(f_in: click.decorators.FC) -> click.Command:
f = uses_session(f_in)
def command_uses_new_session(f):
f = uses_session(f)
f = click.option(
"--new-session",
"-n",
@ -303,15 +218,15 @@ def command_uses_new_session(f_in: click.decorators.FC) -> click.Command:
callback=do_session_new,
expose_value=False,
)(f)
return click.command()(f)
f = click.command()(f)
return f
def version_callback(ctx: click.Context, param: click.Parameter, value: None) -> None:
def version_callback(ctx, param, value) -> None: # pylint: disable=unused-argument
if not value or ctx.resilient_parsing:
return
git_dir = pathlib.Path(__file__).parent.parent.parent / ".git"
version: str
if git_dir.exists():
version = subprocess.check_output(
["git", f"--git-dir={git_dir}", "describe", "--tags", "--dirty"],
@ -319,8 +234,7 @@ def version_callback(ctx: click.Context, param: click.Parameter, value: None) ->
)
else:
try:
# __version__ file is not generated yet during CI
from .__version__ import __version__ as version # type: ignore
from .__version__ import __version__ as version
except ImportError:
version = "unknown"
prog_name = ctx.find_root().info_name
@ -333,60 +247,17 @@ def version_callback(ctx: click.Context, param: click.Parameter, value: None) ->
@dataclass
class Obj:
api: Optional[Backend] = None
system_message: Optional[str] = None
session: Optional[list[Message]] = None
session_filename: Optional[pathlib.Path] = None
api: object = None
system_message: object = None
session: Session | None = None
def maybe_add_txt_extension(fn: pathlib.Path) -> pathlib.Path:
if not fn.exists():
fn1 = pathlib.Path(str(fn) + ".txt")
if fn1.exists():
fn = fn1
return fn
def expand_splats(args: list[str]) -> list[str]:
result = []
saw_at_dot = False
for a in args:
if a == "@.":
saw_at_dot = True
continue
if saw_at_dot:
result.append(a)
continue
if a.startswith("@@"): ## double @ to escape an argument that starts with @
result.append(a[1:])
continue
if not a.startswith("@"):
result.append(a)
continue
if a.startswith("@:"):
fn: pathlib.Path = preset_path / a[2:]
else:
fn = pathlib.Path(a[1:])
fn = maybe_add_txt_extension(fn)
with open(fn, "r", encoding="utf-8") as f:
content = f.read()
parts = shlex.split(content)
result.extend(expand_splats(parts))
return result
class MyCLI(click.Group):
def make_context(
self,
info_name: Optional[str],
args: list[str],
parent: Optional[click.Context] = None,
**extra: Any,
) -> click.Context:
class MyCLI(click.MultiCommand):
def make_context(self, info_name, args, parent=None, **extra):
result = super().make_context(info_name, args, parent, obj=Obj(), **extra)
return result
def list_commands(self, ctx: click.Context) -> list[str]:
def list_commands(self, ctx):
rv = []
for pi in pkgutil.walk_packages(commands.__path__):
name = pi.name
@ -395,126 +266,18 @@ class MyCLI(click.Group):
rv.sort()
return rv
def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command:
def get_command(self, ctx, cmd_name):
try:
return cast(
click.Command,
importlib.import_module("." + cmd_name, commands.__name__).main,
)
return importlib.import_module("." + cmd_name, commands.__name__).main
except ModuleNotFoundError as exc:
raise click.UsageError(f"Invalid subcommand {cmd_name!r}", ctx) from exc
def gather_preset_info(self) -> list[tuple[str, str]]:
result = []
for p in preset_path.glob("*"):
if p.is_file():
with p.open() as f:
first_line = f.readline()
if first_line.startswith("#"):
help_str = first_line[1:].strip()
else:
help_str = "(A comment on the first line would be shown here)"
result.append((f"@:{p.name}", help_str))
return result
def format_splat_options(
self, ctx: click.Context, formatter: click.HelpFormatter
) -> None:
with formatter.section("Splats"):
formatter.write_text(
"Before any other command-line argument parsing is performed, @FILE arguments are expanded:"
)
formatter.write_paragraph()
formatter.indent()
formatter.write_dl(
[
("@FILE", "Argument is searched relative to the current directory"),
(
"@:FILE",
"Argument is searched relative to the configuration directory (e.g., $HOME/.config/chap/preset)",
),
("@@…", "If an argument starts with a literal '@', double it"),
(
"@.",
"Stops processing any further `@FILE` arguments and leaves them unchanged.",
),
]
)
formatter.dedent()
formatter.write_paragraph()
formatter.write_text(
textwrap.dedent(
"""\
The contents of an `@FILE` are parsed by `shlex.split(comments=True)`.
Comments are supported. If the filename ends in .txt,
the extension may be omitted."""
)
)
formatter.write_paragraph()
if preset_info := self.gather_preset_info():
formatter.write_text("Presets found:")
formatter.write_paragraph()
formatter.indent()
formatter.write_dl(preset_info)
formatter.dedent()
def format_options(
self, ctx: click.Context, formatter: click.HelpFormatter
) -> None:
self.format_splat_options(ctx, formatter)
def format_options(self, ctx, formatter):
super().format_options(ctx, formatter)
api = ctx.obj.api or get_api(ctx)
api = ctx.obj.api or get_api()
if hasattr(api, "parameters"):
format_backend_help(api, formatter)
def main(
self,
args: Sequence[str] | None = None,
prog_name: str | None = None,
complete_var: str | None = None,
standalone_mode: bool = True,
windows_expand_args: bool = True,
**extra: Any,
) -> Any:
if args is None:
args = sys.argv[1:]
if os.name == "nt" and windows_expand_args:
args = click.utils._expand_args(args)
else:
args = list(args)
args = expand_splats(args)
return super().main(
args,
prog_name=prog_name,
complete_var=complete_var,
standalone_mode=standalone_mode,
windows_expand_args=windows_expand_args,
**extra,
)
class ConfigRelativeFile(click.File):
def __init__(self, mode: str, where: str) -> None:
super().__init__(mode)
self.where = where
def convert(
self,
value: str | os.PathLike[str] | IO[Any],
param: click.Parameter | None,
ctx: click.Context | None,
) -> Any:
if isinstance(value, str):
if value.startswith(":"):
value = configuration_path / self.where / value[1:]
else:
value = pathlib.Path(value)
if isinstance(value, pathlib.Path):
value = maybe_add_txt_extension(value)
return super().convert(value, param, ctx)
main = MyCLI(
help="Commandline interface to ChatGPT",
@ -526,14 +289,6 @@ main = MyCLI(
help="Show the version and exit",
callback=version_callback,
),
click.Option(
("--system-message-file", "-s"),
type=ConfigRelativeFile("r", where="prompt"),
default=None,
callback=set_system_message_from_file,
expose_value=False,
help=f"Set the system message from a file. If the filename starts with `:` it is relative to the {configuration_path}/prompt. If the filename ends in .txt, the extension may be omitted.",
),
click.Option(
("--system-message", "-S"),
type=str,

View file

@ -2,63 +2,25 @@
#
# SPDX-License-Identifier: MIT
import json
import subprocess
from typing import Protocol
import functools
import platformdirs
class APIKeyProtocol(Protocol):
@property
def api_key_name(self) -> str:
...
class HasKeyProtocol(Protocol):
@property
def parameters(self) -> APIKeyProtocol:
...
class UsesKeyMixin:
def get_key(self: HasKeyProtocol) -> str:
return get_key(self.parameters.api_key_name)
class NoKeyAvailable(Exception):
pass
_key_path_base = platformdirs.user_config_path("chap")
USE_PASSWORD_STORE = _key_path_base / "USE_PASSWORD_STORE"
if USE_PASSWORD_STORE.exists():
content = USE_PASSWORD_STORE.read_text(encoding="utf-8")
if content.strip():
cfg = json.loads(content)
pass_command: list[str] = cfg.get("PASS_COMMAND", ["pass", "show"])
pass_prefix: str = cfg.get("PASS_PREFIX", "chap/")
@functools.cache
def get_key(name, what="openai api key"):
key_path = _key_path_base / name
if not key_path.exists():
raise NoKeyAvailable(
f"Place your {what} in {key_path} and run the program again"
)
@functools.cache
def get_key(name: str, what: str = "api key") -> str:
if name == "-":
return "-"
key_path = f"{pass_prefix}{name}"
command = pass_command + [key_path]
return subprocess.check_output(command, encoding="utf-8").split("\n")[0]
else:
@functools.cache
def get_key(name: str, what: str = "api key") -> str:
key_path = _key_path_base / name
if not key_path.exists():
raise NoKeyAvailable(
f"Place your {what} in {key_path} and run the program again"
)
with open(key_path, encoding="utf-8") as f:
return f.read().strip()
with open(key_path, encoding="utf-8") as f:
return f.read().strip()

View file

View file

@ -2,23 +2,12 @@
#
# SPDX-License-Identifier: MIT
from __future__ import annotations
from dataclasses import dataclass, field
import json
import pathlib
from dataclasses import asdict, dataclass
from typing import Union, cast
from typing_extensions import TypedDict
# not an enum.Enum because these objects are not json-serializable, sigh
class Role:
ASSISTANT = "assistant"
SYSTEM = "system"
USER = "user"
from dataclasses_json import dataclass_json
@dataclass_json
@dataclass
class Message:
"""Represents one Message within a chap Session"""
@ -27,49 +16,28 @@ class Message:
content: str
MessageDict = TypedDict("MessageDict", {"role": str, "content": int})
Session = list[Message]
SessionDicts = list[MessageDict]
def Assistant(content):
return Message("assistant", content)
def Assistant(content: str) -> Message:
return Message(Role.ASSISTANT, content)
def System(content):
return Message("system", content)
def System(content: str) -> Message:
return Message(Role.SYSTEM, content)
def User(content):
return Message("user", content)
def User(content: str) -> Message:
return Message(Role.USER, content)
@dataclass_json
@dataclass
class Session:
"""Represents a series of Messages"""
session: list[Message] = field(default_factory=list)
def new_session(
system_message: str = "You're named Chap (they/them). Chap strives to answer truthfully and frankly. You have opinions, preferences, and emotions, you can say when you don't know, and you prefer to speak in plain language.",
) -> Session:
return [System(system_message)]
def session_to_json(session: Session) -> str:
return json.dumps(session_to_list(session))
def session_to_list(session: Session) -> SessionDicts:
return [cast(MessageDict, asdict(message)) for message in session]
def session_from_json(data: str) -> Session:
j = json.loads(data)
if isinstance(j, dict):
j = j["session"]
return [Message(**mapping) for mapping in j]
def session_from_file(path: Union[pathlib.Path, str]) -> Session:
with open(path, "r", encoding="utf-8") as f:
return session_from_json(f.read())
def session_to_file(session: Session, path: Union[pathlib.Path, str]) -> None:
with open(path, "w", encoding="utf-8") as f:
f.write(session_to_json(session))
@classmethod
def new_session(
cls,
system_message: str = "You're named Chap (they/them). Chap strives to answer truthfully and frankly. You have opinions, preferences, and emotions, you can say when you don't know, and you prefer to speak in plain language.",
):
return Session([System(system_message)])