Skip to main content

Additional Utilities

Following are some useful modules that Octostar provides out-of-the-box via its libraries, with no dependency on any particular back-end framework.

NLP Functions​

The NLP module is available in streamlit-octostar-utils by including it in the pip install:

pip install streamlit-octostar-utils[nlp]

This module is found under streamlit-octostar-utils.nlp and it is separate as it includes a variety of additional dependencies. Currently the NLP module contains:

  • A NER module for Named Entity Recognition. Entities can be extracted either via Spacy or Flair neural networks. The module also contains a language detector and an extractive summary tool

Celery Functions​

Celery is a task queue implementation for Python web applications used to asynchronously execute work outside the HTTP request-response cycle. We provide a module streamlit-octostar-utils.api_crafter.celery for tight integration between FastAPI and Celery (and the processing pipeline). The module includes three main classes:

  • CeleryExecutor is a wrapper over a celery app. It takes care, based on a simplified configuration (a set of CeleryQueueConfig objects), to spin up workers for different task queues and takes care of data serialization/deserialization, even for large files. In particular, each celery worker is assigned to a queue name, a number of processes to spawn and given tasks defined with the @CeleryExecutor.task decorator. Finally, it ensures that sending a task, polling for a task, cancelling a task, and all such operations, are executed asynchronously (via threads), which is something that celery does not offer out-of-the-box.

Do note that the current implementation is meant to be fully self-contained: it assumes all workers are on a single machine, it spawns its own redis server and reads/writes to the filesystem for large files support.

  • CeleryRoute is an extension of the Route class from the streamlit-octostar-utils.api_crafter.fastapi module with additional support for defining and sending tasks via Celery. In particular, it offers a define_tasks() and define_preloads() methods which are fully integrated with CeleryExecutor :
    • define_tasks() is where we can define a Celery task, which is simply a synchronous function where the first argument is the special task parameter. While Celery defines its @celery_app.task decorator, we define our own @CeleryExecutor.task decorator (in the same way as @Route.route does)
    • define_preloads() is where we can define functions that should be executed ONCE when a Celery process is spawned. This allows the developer to preload resources, like neural models, onto the specific celery process that would run the task(s) defined in the same CeleryRoute. It expects a dictionary to be returned, so that the tasks in the same queue can fetch this data via the custom task.request.resources dictionary, which will contain the merged dictionary of all preload for that queue.

PRELOADS CAN BE TRICKY

Due to the way python works, every process spawned by celery will run the main code of your application. To minimize resource consumption, you should make sure that imports and functions which load resources into memory are never called at the top of your python files, rather only inside the preload method. This is especially crucial when working with GPU-enabled application, to prevent the following error:

RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method

CUDA should never be initialized in both the parent process and its forked processes (it does not support shared memory usage), which is exactly what Celery does. Therefore, you must ensure CUDA is never initialized in the parent process.

SELECT THE CORRECT QUEUE

Each Celery worker can listen to one (or multiple) queues, and the processes of that worker will only execute tasks associated with those queues. Therefore, the queue used for a preload must match the queue used to define the task in a given CeleryRoute, unless you are absolutely sure of what you are doing.

  • FastAPICeleryTaskRoute is a Route which creates two endpoints related to generic task handling: GET and DELETE at task/<task_id>. The former can be polled for a task result from Celery once a task has been sent, while the latter can be used to cancel a task, either in queue or running. These two endpoints can be used for ALL celery tasks defined with @CeleryExecutor.task, regardless of the original endpoint which created them.

LLM Functions​

TODO