Cron — фоновые задачи¶
Кастомный планировщик FARA. Хранит задачи в таблице cron_job, выполняет их в отдельном процессе. Атомарный захват через PostgreSQL — безопасно работает в multi-worker окружении.
Архитектура¶
graph TB
subgraph "FastAPI worker"
WW["Web traffic"]
end
subgraph "Cron process"
CL[Cron Loop<br/>каждые 60s]
EX[Executor]
end
subgraph PostgreSQL
T[(cron_job)]
end
CL -->|UPDATE ... RETURNING<br/>атомарный захват| T
T -->|claimed jobs| EX
EX -->|exec code или<br/>model.method| EX
EX -->|UPDATE last_status,<br/>nextcall, run_count| T
style T fill:#336791,color:white
Главные особенности:
- Отдельный процесс (
backend/main_cron.py) — не делит лимит коннектов с web-воркерами и не падает при их рестарте. - Атомарный захват через
UPDATE ... WHERE ... AND nextcall <= NOW() RETURNING *— если два процесса cron поднялись параллельно, одну задачу возьмёт только один. - Stale-locks: если процесс упал во время выполнения, другой подхватит задачу через
lastcall + timeout < NOW(). - Таймаут на каждый job через
asyncio.wait_for— зависший job не блокирует другие.
Модель CronJob¶
name Char(255) required
Название задачи. Используется в логах: [FARA CRON] Executing: {name} (id={id}).
active bool
Активна ли. Неактивные не запускаются — так можно временно отключить задачу без удаления.
code Text
Произвольный Python-код. Выполняется в окружении с доступом к env. Например:
model_name + method_name Char
Альтернатива code: указать модель и метод. Метод должен быть @hybridmethod или @classmethod. args/kwargs (JSON-строки) передаются как аргументы.
interval_number + interval_type Integer + Selection
Интервал между запусками. Типы: minutes, hours, days, weeks, months. Например, interval_number=5, interval_type=minutes — каждые 5 минут.
numbercall Integer
Сколько раз ещё должен выполниться. -1 = бесконечно. После каждого запуска уменьшается. Когда становится 0 — задача автоматически становится active=false.
nextcall / lastcall Datetime
Когда должен быть следующий запуск / когда был последний. После запуска nextcall = now() + interval.
last_status Selection
pending (ждёт), running (выполняется), success, error. last_error хранит traceback для отладки.
timeout Integer
Максимальная длительность в секундах. По истечении — задача снимается, статус error. Default 300s (5 мин).
priority Integer
При множестве готовых задач захватываются по возрастанию priority — сначала меньшие (более приоритетные).
Создание cron-задачи¶
Через классовый метод¶
from backend.base.system.cron.models.cron_job import CronJob
await CronJob.create_job(
name="Activity: check deadlines",
model_name="activity",
method_name="check_deadlines",
interval_number=1,
interval_type="minutes",
)
Через произвольный код¶
await CronJob.create_job(
name="Cleanup old sessions",
code="""
async def __cron_task__():
await env.models.session.cleanup_expired()
""",
interval_number=1,
interval_type="hours",
)
В коде доступны:
env— глобальное окружение (env.models,env.apps,env.settings).- Любые встроенные Python-модули (через
import).
Безопасность кода
Поле code — это exec() под админскими правами. Доступ к таблице cron_job через UI должен быть только у is_admin=true. Для обычных пользователей запрети edit на уровне ACL.
Атомарный захват — как это работает¶
Проблема: два cron-процесса (например, после рестарта) одновременно увидели задачу с nextcall <= now(). Без синхронизации оба её выполнят.
Решение FARA — захват через UPDATE:
UPDATE cron_job
SET last_status = 'running',
lastcall = NOW()
WHERE id = ANY($1) -- кандидаты
AND active = true
AND (
last_status != 'running' -- не выполняется сейчас
OR (lastcall + make_interval(secs => timeout) < NOW()) -- или выполняется давно (stale lock)
)
AND nextcall <= NOW()
RETURNING *;
UPDATE ... RETURNING атомарен на уровне строки. Только один процесс получит запись, второй увидит пустой результат.
Stale-lock detection: условие lastcall + timeout < NOW() в OR снимает блокировку, если предыдущий процесс упал и не успел поставить success/error. Таймаут — это время, которое мы готовы ждать, прежде чем считать «процесс умер».
sequenceDiagram
participant P1 as Cron #1
participant DB as PostgreSQL
participant P2 as Cron #2
P1->>DB: SELECT id WHERE nextcall <= NOW()
DB-->>P1: [job 1, job 2]
par Процесс 1
P1->>DB: UPDATE ... WHERE id=1 AND status!='running' RETURNING *
DB-->>P1: 1 row (захватил)
and Процесс 2
P2->>DB: SELECT id WHERE nextcall <= NOW()
DB-->>P2: [job 1, job 2]
P2->>DB: UPDATE ... WHERE id=1 AND status!='running' RETURNING *
DB-->>P2: 0 rows (P1 уже взял)
P2->>DB: UPDATE ... WHERE id=2 AND status!='running' RETURNING *
DB-->>P2: 1 row (захватил)
end
Запуск cron¶
В Docker-композе FARA это отдельный сервис рядом с backend:
cron:
build: .
command: python backend/main_cron.py
depends_on:
postgres:
condition: service_healthy
environment:
# те же DB-переменные что у backend
...
Settings¶
backend/base/system/cron/settings.py:
| Переменная | По умолчанию | Описание |
|---|---|---|
CRON__ENABLED |
true |
Включён ли cron вообще |
CRON__CHECK_INTERVAL |
60 |
Как часто (сек) опрашивать БД |
CRON__MAX_THREADS |
2 |
Сколько одновременных задач выполнять |
Системный пользователь¶
Cron-задачи выполняются под SystemSession(user_id=SYSTEM_USER_ID) — это даёт полный доступ ко всем моделям, минуя ACL и Rules. Это намеренно: cron — служебный код, не пользовательский.
# Внутри _execute_job()
set_access_session(SystemSession(user_id=SYSTEM_USER_ID))
try:
await self._run_job_code(job_data)
finally:
set_access_session(None)
Типичные cron-задачи в FARA¶
| Имя | Интервал | Что делает |
|---|---|---|
Activity: check deadlines |
1 минута | Находит просроченные активности → шлёт уведомления в чат |
Auth: deactivate expired sessions |
5 минут | Помечает истёкшие сессии active=false |
Attachments: cleanup orphan files |
1 день | Удаляет вложения без res_model/res_id старше N дней |
Chat: archive inactive chats |
1 неделя | Архивирует чаты без активности 90+ дней |
См. также¶
- Activity — задачи и дедлайны — главный потребитель cron в FARA.