Additional Utilities
Following are some useful modules that Octostar provides out-of-the-box via its libraries, with no dependency on any particular back-end framework.
NLP Functions​
The NLP module is available in streamlit-octostar-utils by including it in the pip install:
pip install streamlit-octostar-utils[nlp]
This module is found under streamlit-octostar-utils.nlp and it is separate as it includes a variety of additional dependencies. Currently the NLP module contains:
- A NER module for Named Entity Recognition. Entities can be extracted either via Spacy or Flair neural networks. The module also contains a language detector and an extractive summary tool
Celery Functions​
Celery is a task queue implementation for Python web applications used to asynchronously execute work outside the HTTP request-response cycle. We provide a module streamlit-octostar-utils.api_crafter.celery for tight integration between FastAPI and Celery (and the processing pipeline). The module includes three main classes:
CeleryExecutoris a wrapper over a celery app. It takes care, based on a simplified configuration (a set ofCeleryQueueConfigobjects), to spin up workers for different task queues and takes care of data serialization/deserialization, even for large files. In particular, each celery worker is assigned to a queue name, a number of processes to spawn and given tasks defined with the@CeleryExecutor.taskdecorator. Finally, it ensures that sending a task, polling for a task, cancelling a task, and all such operations, are executed asynchronously (via threads), which is something that celery does not offer out-of-the-box.
Do note that the current implementation is meant to be fully self-contained: it assumes all workers are on a single machine, it spawns its own redis server and reads/writes to the filesystem for large files support.
CeleryRouteis an extension of theRouteclass from thestreamlit-octostar-utils.api_crafter.fastapimodule with additional support for defining and sending tasks via Celery. In particular, it offers adefine_tasks()anddefine_preloads()methods which are fully integrated withCeleryExecutor:define_tasks()is where we can define a Celery task, which is simply a synchronous function where the first argument is the special task parameter. While Celery defines its@celery_app.taskdecorator, we define our own@CeleryExecutor.taskdecorator (in the same way as@Route.routedoes)define_preloads()is where we can define functions that should be executed ONCE when a Celery process is spawned. This allows the developer to preload resources, like neural models, onto the specific celery process that would run the task(s) defined in the sameCeleryRoute. It expects a dictionary to be returned, so that the tasks in the same queue can fetch this data via the customtask.request.resourcesdictionary, which will contain the merged dictionary of all preload for that queue.
PRELOADS CAN BE TRICKY
Due to the way python works, every process spawned by celery will run the main code of your application. To minimize resource consumption, you should make sure that imports and functions which load resources into memory are never called at the top of your python files, rather only inside the preload method. This is especially crucial when working with GPU-enabled application, to prevent the following error:
RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
CUDA should never be initialized in both the parent process and its forked processes (it does not support shared memory usage), which is exactly what Celery does. Therefore, you must ensure CUDA is never initialized in the parent process.
SELECT THE CORRECT QUEUE
Each Celery worker can listen to one (or multiple) queues, and the processes of that worker will only execute tasks associated with those queues. Therefore, the queue used for a preload must match the queue used to define the task in a given CeleryRoute, unless you are absolutely sure of what you are doing.
FastAPICeleryTaskRouteis aRoutewhich creates two endpoints related to generic task handling: GET and DELETE attask/<task_id>. The former can be polled for a task result from Celery once a task has been sent, while the latter can be used to cancel a task, either in queue or running. These two endpoints can be used for ALL celery tasks defined with@CeleryExecutor.task, regardless of the original endpoint which created them.
LLM Functions​
TODO