Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(embeddings): vectorize corpora corpus #15

Merged
merged 13 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .corpora.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# .corpora.yaml

name: "corpora"
name: "corpora2"
url: "https://github.com/skyl/corpora"

server:
Expand Down
2 changes: 1 addition & 1 deletion .devcontainer/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# echo "starting zsh..."
echo 'autoload -Uz add-zsh-hook; append_history() { fc -W }; add-zsh-hook precmd append_history; export HISTFILE=/home/vscode/.corpora.zsh_history/.zsh_history' >> ~/.zshrc
echo alias tree="tree -I '.venv|node_modules|__pycache__|.git|.pytest_cache' -a" >> ~/.zshrc
zsh
# zsh
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/.env

# macOS system files
.DS_Store

Expand Down
11 changes: 10 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# TODO

- compare vector of summary versus vector of splits, create search endpoint(s) test adhoc queries

- test full init with chained tasks

- Python split with AST - langchain one is kinda' lame.

- for small corpora, we could get away with a mega-task but let's break into many single responsibilities
- finish ingest for real
- Start building the real records for the `corpora` corpus
Expand All @@ -20,7 +26,7 @@
- agents ... function calling ... maybe we have a menu of function `mkdir`, `create file`, `rewrite file`
* Take input. Analyze problem. Choose sequence of agents. Let agents execute in order. Report back to user.

- rm unused imports, for instance (on save, in CI..)
- more CI checks, local tools - rm unused imports, for instance (on save, in CI..)

# Next

Expand Down Expand Up @@ -60,4 +66,7 @@
- collect corpus, upload tarball to API
- introduce celery (?) or similar for async tasks
- handle the file (sync/async) upload in the API
- vector length in models versus oai implementation
- langchain-text-splitters - split based on type, cohesive approach, python and md


4 changes: 4 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ services:
environment:
PYTHONPATH: "/workspace/py/packages"
REDIS_URL: "redis://redis:6379/0"
OPENAI_API_KEY: "${OPENAI_API_KEY}"
command: python manage.py runserver 0.0.0.0:8000
working_dir: /workspace/py/packages/corpora_proj
depends_on:
- db
- redis
Expand Down Expand Up @@ -49,6 +52,7 @@ services:
environment:
PYTHONPATH: "/workspace/py/packages"
REDIS_URL: "redis://redis:6379/0"
OPENAI_API_KEY: "${OPENAI_API_KEY}"
depends_on:
redis:
condition: service_healthy
Expand Down
217 changes: 217 additions & 0 deletions md/notes/celery-tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
> Understanding Celery's task methods, such as `s`, `si`, `delay`, and `chain`, is essential for managing asynchronous workflows effectively. Each method has its own purpose, especially when working with task dependencies, immutability, and argument handling.
>
> Here’s a breakdown of the key Celery task methods and techniques, along with practical examples to illustrate their use cases.

---

## 1. `delay`: The Simplest Way to Trigger a Task

`delay` is a shortcut for calling a Celery task asynchronously with positional arguments. It’s easy to use and is equivalent to `.apply_async()` but with simplified syntax.

### Example

```python
@shared_task
def add(x, y):
return x + y

# Calling the task asynchronously with delay
result = add.delay(4, 5) # Runs add(4, 5) asynchronously
```

### Usage Notes

- **Simplifies Syntax**: You don’t need to use `.apply_async((args, kwargs), ...)`.
- **Fire and Forget**: `delay` is perfect for simple, one-off tasks.

---

## 2. `.apply_async()`: Full Control over Task Execution

While `delay` is convenient, `.apply_async()` provides more control. You can pass additional options such as `countdown`, `eta`, and `retry` policies.

### Example

```python
@shared_task
def send_email(to_address, subject, body):
# Send email logic here
return "Email sent"

# Schedule task with additional options
result = send_email.apply_async(
args=("[email protected]", "Subject", "Body text"),
countdown=60 # Delay execution by 60 seconds
)
```

### Usage Notes

- **Control Timing**: You can schedule tasks to run at specific times or after delays.
- **Pass Custom Options**: Supports retry policies, timeouts, and more.

---

## 3. `s`: Signature for Building Task Chains and Groups

The `s` method, short for “signature,” is used to define a task and its arguments without executing it. You can think of it as preparing a “task blueprint” for use in chains, groups, or chords.

### Example

```python
@shared_task
def process_data(data):
# Process data logic
return "Processed"

# Define a task signature
task_signature = process_data.s("sample data")

# Use the signature to run the task
task_signature.delay()
```

### Usage Notes

- **Reusable**: Allows you to define tasks with arguments that can be reused in other contexts.
- **Works in Chains**: Essential for creating task chains, groups, and chords.

---

## 4. `si`: Immutable Signature

The `si` method, short for “signature immutable,” is similar to `s` but creates an **immutable** signature. This means that any result from a previous task in a chain will not be passed to this task, even if it’s part of a chain.

### Example

```python
@shared_task
def step_one():
return "Result from step one"

@shared_task
def step_two(data):
return f"Received: {data}"

# Immutable signature - it will not receive output from step_one
chain(step_one.s(), step_two.si("Custom data")).apply_async()
```

In this example, `step_two` will receive `"Custom data"` as its input, ignoring the output of `step_one`.

### Usage Notes

- **Useful in Chains**: If a task should always receive specific arguments, use `si` to prevent it from receiving output from prior tasks.
- **Avoids Argument Mismatch Errors**: Prevents issues when chaining tasks that don’t accept the output of previous tasks.

---

## 5. `chain`: Creating Sequential Task Pipelines

`chain` is used to create a sequence of tasks where each task runs after the previous one completes. The result of each task is passed as the input to the next task in the chain (unless `si` is used).

### Example

```python
@shared_task
def add(x, y):
return x + y

@shared_task
def multiply(result):
return result * 10

# Chain tasks together
result = chain(add.s(4, 5), multiply.s()).apply_async()
```

In this example:
- `add` runs first with arguments `(4, 5)` and returns `9`.
- `multiply` then receives `9` as its input and returns `90`.

### Usage Notes

- **Sequential Dependency**: Use `chain` when each task depends on the output of the previous task.
- **Error Handling**: If any task in the chain fails, the subsequent tasks will not run.

---

## 6. `group`: Running Tasks in Parallel

`group` lets you run multiple tasks in parallel and collect their results. It’s useful when tasks can run independently, and you want to aggregate their outputs.

### Example

```python
@shared_task
def add(x, y):
return x + y

# Run tasks in parallel
result = group(add.s(2, 2), add.s(4, 4), add.s(6, 6)).apply_async()
```

In this example:
- Each `add` task runs independently.
- The results are collected as a list once all tasks complete (e.g., `[4, 8, 12]`).

### Usage Notes

- **Parallel Execution**: Ideal for tasks that don’t depend on each other.
- **Collect Results**: Results are returned as a list after all tasks complete.

---

## 7. `chord`: Combining Groups with a Final Callback

A `chord` is a combination of a `group` and a callback. All tasks in the group run in parallel, and once they complete, their results are passed to the callback task.

### Example

```python
@shared_task
def add(x, y):
return x + y

@shared_task
def summarize(results):
return sum(results)

# Define a chord
result = chord([add.s(2, 2), add.s(4, 4), add.s(6, 6)], summarize.s()).apply_async()
```

In this example:
- Each `add` task runs in parallel.
- The results `[4, 8, 12]` are passed to `summarize`, which returns `24`.

### Usage Notes

- **Useful for Aggregating Results**: The callback task can aggregate results from multiple tasks.
- **Sequential Logic**: Ensures that the callback runs only after all tasks in the group complete.

---

## Summary Table

| Method | Purpose | Usage Example |
|------------------|------------------------------------------------------------------------------------------|-----------------------------------|
| `delay` | Shortcut for asynchronously calling a task | `task.delay(args)` |
| `apply_async` | Full control over task execution with additional options | `task.apply_async(args, options)` |
| `s` | Signature for creating task definitions without execution | `task.s(args)` |
| `si` | Immutable signature to prevent prior task results from being passed | `task.si(args)` |
| `chain` | Sequential pipeline where each task depends on the output of the previous task | `chain(task1.s(), task2.s())` |
| `group` | Parallel execution of multiple tasks, with results collected as a list | `group([task1.s(), task2.s()])` |
| `chord` | Combines a group of parallel tasks with a callback that receives the group’s results | `chord(group, callback.s())` |

---

### Choosing the Right Method

- **For simple, independent tasks**: Use `delay` or `apply_async`.
- **For sequential tasks with dependencies**: Use `chain`.
- **For parallel tasks with independent execution**: Use `group`.
- **For combining parallel tasks with a final aggregation**: Use `chord`.
- **For tasks in a chain where you need a fixed input**: Use `si` to make the signature immutable.

This understanding gives you the flexibility to create robust, efficient Celery workflows that handle task dependencies, sequencing, and parallelism as needed.
Loading
Loading