Skip to main content

Writing App Pipeline Endpoints

The NiFi pipeline is an orchestrator for apps to perform parallel or sequential computations on Octostar entities. Apps expose their functionality as one or more REST API endpoints, and NiFi should also have a dedicated API endpoint to mediate the data exchange.

For python back-ends there is an integration module understreamlit-octostar-utils, which can be imported as such:

from streamlit_octostar_utils.api_crafter import nifi

The following guide uses FastAPI as our back-end of choice to showcase how an API can be built to work with the NiFi Octostar pipeline.

A Minimal Example

A minimal but complete usage of the nifi + fastapi modules (+ the celery module for asychronous task handling) will look something like this:

from fastapi import FastAPI, APIRouter, UploadFile, File, Query, Form
import uvicorn
from contextlib import asynccontextmanager
import os
import logging
from typing import List, Union, Optional
logger = logging.getLogger(__name__)

from streamlit_octostar_utils.api_crafter.fastapi import DefaultErrorRoute, RequestCancelledMiddleware, Route, CommonModels
from streamlit_octostar_utils.api_crafter.celery import FastAPICeleryTaskRoute, CeleryExecutor, CeleryQueueConfig, CeleryRoute
from streamlit_octostar_utils.api_crafter.nifi import NifiRoute

class ExtractHeaderRoute(CeleryRoute):
success_responses = {
200: {
"description": "Successful Response",
"content": {
"application/json": {
"example": {"data": ["Hello world! This is extracted from a file.", None], "status": "success"}
}
}
}
}

class ResponseModel(CommonModels.DataResponseModel):
data: Union[Optional[str], List[Optional[str]]]
status: str = "success"

endpoint_name = "extract-header"
endpoint_path = "/" + endpoint_name.replace("_", "-") + "/"
queue = 'heavy'
time_limit = 300

def __init__(self, app, celery_executor, header_size=256, router=None, async_router=None):
super().__init__(app, celery_executor, router, async_router)
self.header_size = header_size

def define_routes(self):
@Route.include(self)
@Route.route(self, router=self.async_router, path=ExtractHeaderRoute.endpoint_path, methods=["POST"])
async def post_async(
files: List[UploadFile] = File(),
decode: Union[bool, List[bool]] = Form(False),
header_size: int = Query(None)
) -> CommonModels.DataResponseModel:
for i in range(len(files)):
files[i] = await files[i].read()
header_size = header_size or self.header_size
task_id = await self.celery_executor.send_task(self.base_task, args=[files, decode, header_size])
return CommonModels.DataResponseModel(data={"task_id": task_id})

@Route.include(self)
@Route.route(self, router=self.router, path=ExtractHeaderRoute.endpoint_path, methods=["POST"])
async def post(
files: List[UploadFile] = File(),
decode: Union[bool, List[bool]] = Form(False),
header_size: int = Query(None)
) -> ExtractHeaderRoute.ResponseModel:
for i in range(len(files)):
files[i] = await files[i].read()
header_size = header_size or self.header_size
result = await self.celery_executor.send_and_wait_task(self.base_task, args=[files, decode, header_size])
return ExtractHeaderRoute.ResponseModel(data=result)

@staticmethod
def extract_header(file, decode, header_size):
if len(file) > header_size:
file = file[:header_size]
if decode:
file = file.decode('utf-8')
return file

def define_tasks(self):
@Route.include(self)
@CeleryExecutor.base_task(self.celery_executor, bind=True, queue=ExtractHeaderRoute.queue, time_limit=ExtractHeaderRoute.time_limit, name=f'fastapi_app.{ExtractHeaderRoute.endpoint_name}.base_task')
def base_task(task, files, decode, header_size, *args, **kwargs):
postprocess_model = task.request.resources['postprocess_model']
results = []
for file in files:
try:
header = ExtractHeaderRoute.extract_header(file, decode, header_size)
header = postprocess_model(header)
results.append(header)
except Exception as e:
print(f"Error: {e}")
results.append(None)
return results

@Route.include(self)
@NifiRoute.nifi_task(self.celery_executor, bind=True, queue=ExtractHeaderRoute.queue, time_limit=ExtractHeaderRoute.time_limit, name=f'fastapi_app.{ExtractHeaderRoute.endpoint_name}.nifi_task')
def nifi_task(task, nifi_context, nifi_batches, processor_name, *args, **kwargs):
postprocess_model = task.request.resources['postprocess_model']
for batch in nifi_batches:
config = batch.config
for entity in batch.entities:
try:
out = postprocess_model(ExtractHeaderRoute.extract_header(
entity.contents,
config.get('decode', False),
config.get('header_size', 256),
))
except BaseException as e:
nifi_context.raise_exception(entity, e)
out = None
if out:
entity.add_metadata({'header': out})
entity.update_last_timestamp()
return nifi_batches

def define_preloads(self):
@CeleryExecutor.preload(self.celery_executor, queue=ExtractHeaderRoute.queue)
def preload():
dummy_model = lambda x: x
return {
'postprocess_model': dummy_model
}

PROCESSOR_NAME = "my_app_name"
CUSTOM_N_BYTES = 512

celery_executor = CeleryExecutor(
'celery_app', os.path.splitext(os.path.basename(__file__))[0], preload_time_limit=30,
queue_config={
'default': CeleryQueueConfig(n_workers=1, max_tasks_per_child=1000),
'heavy': CeleryQueueConfig(n_workers=1, max_tasks_per_child=1000)
})
celery_app = celery_executor.app

@asynccontextmanager
async def lifespan(app: FastAPI):
try:
celery_executor.start()
yield
finally:
celery_executor.close()

app = FastAPI(title="My App API", version="0.5.0",
description='This is an app with many endpoints.',
lifespan=lifespan)
app.add_middleware(RequestCancelledMiddleware)
DefaultErrorRoute.add_default_exceptions_handler(app)

router = APIRouter(prefix="/api")
async_router = APIRouter(prefix="/async")
nifi_router = APIRouter(prefix="/nifi")

tasks_routes = FastAPICeleryTaskRoute(app, celery_executor, async_router).include_routes()
nifi_routes = NifiRoute(app, tasks_routes, PROCESSOR_NAME, celery_executor, nifi_router)
extract_header_routes = ExtractHeaderRoute(app, celery_executor, CUSTOM_N_BYTES, router, async_router).include_routes()

for routes in [extract_header_routes]:
nifi_routes.register_route(routes.endpoint_path, routes.nifi_task)
nifi_routes.include_routes()

router.include_router(nifi_router, tags=["Nifi-Reserved API"])
router.include_router(async_router, tags=["Async API"])
app.include_router(router, tags=["API"])

celery_executor.finalize()

if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8502)

Let's analyze the elements in this code step-by-step in order to understand the full potential of the library.

Base Structure

The core of the server definition relies on FastAPI conventions and methods. This makes it easy to transition between a generic FastAPI server and one interacting with the pipeline. As you can see, we use the Route class to define endpoints (with the @Route.route decorator) and dynamically link them to FastAPI routers by calling include_routes().

For more information on this, you can check the corresponding chapter on the Route class here Utilities for FastAPI.

Celery is integrated into the FastAPI lifecycle just as well via the CeleryExecutor class, where we also define a set of queues and their configuration. This allows different tasks to be executed on different pool of processes, each with their preloaded resources, ensuring both priority and stability in usage of resources. More information on the Celery integration is here Additional Utilities.

ROUTES AND CELERY ARE INDEPENDENT FROM NIFI PIPELINE

The extensions discussed here are fully independent from the pipeline, and can be used as-is to develop more modular FastAPI servers.

Two special routes are used in the example code above: the NiFiRoute and the FastAPICeleryTaskRoute .

FastAPICeleryTaskRoute & NiFiRoute

There are two special routes related to processing with Celery and with the pipeline, respectively:

* FastAPICeleryTaskRoute exposes task status polling/cancellation from Celery as FastAPI endpoints. For more information, see here Additional Utilities

  • A NifiRoute wraps a task with a NiFi context manager, so it handles from/to the entity format defined for FlowFiles. Internally, it does this like so:
async with NifiContextManager(json_body) as nifi_context:
entity_batches = await nifi_context.receive_input(json_body, processor_name)
match op:
case "my-function":
entity_batches = await my_function(task, nifi_context, entity_batches, processor_name)
case _:
raise StarletteHTTPException(401, f"Route {op} is forbidden for NiFi.")
return await nifi_context.send_output(entity_batches, processor_name)

The entities received from receive_input() are ensured to be of type NiFiEntity, while their related entities (children) may be of any of the proxy types (more on this on section Entity Structure). send_output() returns a flat list of jsonified entities, ready to be returned, and performs clean-up and synching as needed.

Note that the receive_input() and send_output() do not actually respond to a request, rather they prepare entities for input and output, as that is the role of this context manager. The context manager also provides a few utility functions which can be called within your processing function, such as:

    • raise_exception(), to raise an Exception for a specific entity, instead of for the whole batch. This will result in a successful 200 response from the endpoint to NiFi, but the pipeline will recognize and re-route to failure only the failing entity(ies)
  • request_entity_sync(), to add flags that specify how an entity should be eventually consolidated (synched) against Octostar. More on section Entity Scope & Synchronization

The context manager is instantiated in the code example for the NiFiRoute endpoint. We notice the endpoint is a standard FastAPI endpoint (wrapped in a class for convenience), and that the endpoint uses an op parameter to wrap another endpoint after receiving the entities, such that e.g. a request to endpoint /api/nifi/extract-header will invoke the same functionality as /api/extract-header but on NiFi entities rather than on arbitrary input.

NifiRoute simplifies this logic for you, as it wraps a NifiRoute.nifi_task() with this logic for you so that the task effectively only needs to be an implementation for my_function(task, nifi_context, entity_batches, processor_name) . Besides defining a task of type nifi_task(), you also need to register the tasks of your route to the NifiRoute using method register_route().

Entity Structure

The main components of a NiFi entity (NiFiEntity class) are:

  • context: This is the parent NiFiContextManager entity
  • record: This dictionary contains the same fields as the corresponding entity in Octostar, such as os_entity_uid, entity_type and any other fields specific to the ontological entity type. Please do note that:
    • To access the label of the entity specifically, it is recommended to use the label property of the entity instead
    • To access the metadata of the entity specifically (os_item_content field), it is recommended to use the metadata property of the entity. In case new pieces of metadata needs to be written, methods add_metadata() and propagate_metadata() are given to facilitate the process: the former can recursively update the metadata of the entity, while the latter can be used to merge the metadata of the entity onto another entity (e.g. the metadata of a child image onto the metadata of the parent PDF document)
    • To access the os_workspace for write operations, it is recommended to use the write_os_workspace property, as it returns a fallback workspace in case the entity is global or in case the JWT does not have authorization to write to the os_workspace workspace. Similarly, for update operations it is recommended to pass request.entity_timestamp as the os_last_updated_at field since it contains the last concrete value for that field, without any modifications caused by lazy operations

ENTITY UPDATES WILL FAIL ON AN INVALID TIMESTAMP

Remember that Octostar will reject an update if theos_last_updated_at provided in the request does not match the one in the database!

    • Whenever making changes to the fields in this dictionary, it is recommended to call the update_last_timestamp() method to keep the os_last_updated_at field in sync with the lazy operations
  • contents: This fields is a proxy for the entity's attachment, if any. It can be written to and read to freely. Do note that:
    • If this field is currently None and it is read, it will be fetched using the Octostar API and the resulting bytes will be cached in this field for all successive reads. This field, although large, may be sent via HTTP from/to the pipeline due to the lazy sync mechanism: the contents may not be saved in Octostar, therefore they are stored within the FlowFile unless explicitly nulled. If filesize could be an issue over the HTTP connection, it is recommended to sync the contents (that is, save them somewhere in the Octostar environment) and then setting the contents of the NiFi entity to None before returning
  • request: This dictionary contains necessary metadata about the processing. This includes:
    • jwt: The JWT of the requesting user, to be used for all requests to the Octostar API. Do note this field is not refreshed by the pipeline, as the assumption is that processing will take less than the time to expire a JWT
    • sync_params: A dictionary of (flags, parameters) pairs to indicate which operations should be done to synchronize this entity against Octostar. For example, flag UPSERT_ENTITY_SPECIFIC_FIELDS will request an upsert of the entity to Octostar for the list of fields specified as a parameter. The possible flags and their parameters are in enum NiFiContextManager.SyncFlag
    • nifi_attributes: This dictionary can be filled by an endpoint with attributes that NiFi should extract from the FlowFile into its own attributes. This is done via a custom processor within the app processor group, shortly after the InvokeHTTP processor
    • config: This dictionary, indexed by processor name, is used to specify parameters for each processor on a per-entity basis. This also includes information about whether this entity is a fragment of another entity and, if so, how that entity can be reconstructed. Thanks to this config, each entity can have a different processing, even if coming from the same request. Endpoints are free to read and write to this dictionary, so long as they follow its convention. The full configs for an entity can be accessed from the request.config field or, for the configs related to the current processor, via the NifiEntityBatch instance (preferred)
    • is_temporary: Whether the entity exists in Octostar or only within the pipeline
    • exception: A dictionary containing a status code and message as written by context.raise_exception()
  • children: This is a list of related entities and/or relationships. Each element in children can be:
    • NiFiEntityProxy: A proxy for a NiFiEntity. Fields of NiFiEntity can be accessed from this proxy, in which case the proxy will fetch the real entity instance from the input list of entities. Do note that if the entity is not locally available within the input list of entities, attempting to read such fields will raise an exception.
    • NiFiOTMRelationshipProxy: A virtual entity representing a One-to-Many (OTM) relationship.
    • NiFiOTMRelationshipProxy: A virtual entity representing a One-to-Many (OTM) relationship.

MTM RELATIONSHIPS ARE ENTITIES

Remember that Many-to-Many relationships are effectively entities of type os_relationship unlike One-to-Many relationships, which are instead implicitly defined by a pair of foreign keys. Therefore, in the context of pipeline endpoints, MTM relationships are represented as instances of either NiFiEntity or NiFiEntityProxy just like any other entity would.

  • For convenience, properties children_entities and relationships are available to fetch only a specific subset of children, respentities only (including MTM relationships but not OTM ones) and relationships only (OTM + MTM relationships).

It is also worthwhile mentioning the following methods:

  • is_child_concept() checks whether the entity is ontologically a child of another entity
  • add_child_entity()add_child_file()add_mtm_relationship()add_otm_relationship()add_tag() are all functions which allows the developer to add children entities in accordance with the lazy sync mechanism

REMEMBER ABOUT THE LAZY SYNC

Reading and writing to these fields, as well as using the methods described above, cause lazy operations against Octostar, meaning that only the NiFi instance of the entity will have those fields updated. This will be the case until a sync is requested to the NiFiContextManager via context.request_entity_sync(now=True) or upon context.send_output() with a context.lazy_sync flag set to False.

On the other hand, invoking directly the Octostar API within the endpoint will cause these changes to be immediate, and it is the developer's responsibility to update the NiFiEntity instance with the changes before sending it in output.

Entity Scope & Synchronization

The NifiContextManager class allows for delayed entity synchronization with Octostar. Let us consider this scenario:

I have a PDF document and I want to extract all textual content from it, including text contained in images or embedded PDFs.

The flow of data using the pipeline will be something like:

  1. Use an image extractor to extract all images from the PDF, all as separate entities
  2. Use a text extractor/OCR reader on the PDF
  3. Use a OCR/neural network to extract text from the images
  4. Merge all FlowFiles together based on a common attribute (See Entity Fragmentation section)
  5. Merge all texts together
  6. Discard the image entities
  7. Save the merged text into the PDF entity

Notice the images are used only as intermediate steps and should not be saved into Octostar to avoid cluttering and slowness of the system's responses. The nifi module fully supports temporary entities, that is, entities that only live within the FlowFiles of the pipeline and are not matched by a record in the Octostar platform.

This system is enabled by a delayed sync, which is eventually executed on the send_output() method when the sync flag is set to True. This can also be enabled on a per-entity basis when calling request_entity_sync() by setting now=True.

To read and write entities in the context of the pipeline it is recommended to use the methods in the NifiEntity class rather than the methods in octostar-python-client in order to make full use of the lazy sync mechanism, and also to write the new entities as children of the source entity (which can be useful for some operations)

CRUD OPERATIONS AGAINST OCTOSTAR

Any back-end function can be executed as normal within an endpoint's code, if there is no need to track that operation within the pipeline. Please make sure a client is properly instantiated if that is the case. The NiFiContextManager comes with its JWT and parameters to instantiate a client.

Entity Fragmentation

Fragmentation is a parallelization/chunking tool native to NiFi which has been deeply integrated into app processors. In circumstances under which a FlowFile need to be split into chunks and then re-joined, NiFi will typically write the following attributes to the split FlowFiles: fragment.index, fragment.count, fragment.identifier. Together, these three attributes represent a fragmentation, and processors such as MergeContent are able to read these fields to reconstruct the original FlowFile back.

This mechanism is very useful for parallelization and child concept processing. Again, using the example of a PDF containing many images, each child image can be a fragment of the parent, such that when all fragments reach a join processor they are batched together and can be processed as one.

The process of fragmentation and re-joining is typically composed of the following steps, where the pipeline and processors must co-operate:

  • A processor defines that it has produced fragments out of an input entity, writing the appropriate fragment attributes to each. The NiFiFragmenter class takes care of this with method as_nifi_fragments()
  • The pipeline or other processors define how the fragments should merge together their annotations by attaching a merge strategy. A processor can use method push_defragment_strategy() from NifiFragmenter, but the pipeline can write this field manually, too, using a JoltTransformJSON processor
  • Fragments are processed independently, potentially going through different sub-pipelines and then are routed back together (e.g. via a RouteOnAttribute processor looking at the first value in the fragments stack)
  • Fragments reach a Notify and then are pooled in a Wait processor. The former increases a counter (where the key is the unique fragment.identifier field) in NiFi's cache map. Once the cache reaches the fragment.count value or once a (long) expiration time is reached, all fragments are released down-stream
    • If some fragments fail, the Failure processor group will also use a Notify processor to release the other fragments, though this is not guaranteed to work (best-effort)
  • A MergeContent processor will batch the fragments together as a list of NiFiEntity elements
  • A Defragment processor group from the NiFiDashboard app will merge annotations together based on the selected defragment strategy, and output only the root fragment (the one with fragment.index == 0).

It is possible to fragment multiple times, so long as they are joined back in LIFO order. This is because a NiFiEntity keeps not just a single fragment's info but a whole stack, and merge strategies are kept separate for each.

Multiple Tasks & Endpoints Execution

Computational tasks are often synchronous, thus they occupy their thread until completion. At the same time, the pipeline will time out if its InvokeHTTP processor fails to receive responses for a long enough time and any connection issues while waiting will similarly cause failures. To further complicate things, since we support multiple endpoints for the pipeline from the same FastAPI instance, a set of requests to one endpoint can potentially block all other requests to all other endpoints for the same app.

There are several possible (non-exclusive) mitigation strategies to this, in order of complexity:

  • Spawn multiple FastAPI workers, to have redundancy even if one is stuck
  • Use synchronous functions with FastAPI (which will be executed in the default threadpool) and consider expanding the threadpool or manually instancing and running multiple threadpools, if it's not enough
  • Process asynchronously, which is natively supported by FastAPI
  • Use a separate queue and messaging system like celery, which allows initial requests to return immediately

The last option is encouraged and facilitated by the CeleryExeuctor class under streamlit-octostar-utils.api_crafter.celery module, as showcased in this whole section. It is possible to also choose the other options, of course, so long as the endpoint uses the NifiContextManager correctly.

KNOW THE DANGERS OF THREADS

Given that FastAPI already sends synchronous functions to the default threadpool, one might think using processes is overkill. However, do consider that threads cannot be forcibly killed and can therefore lead to starvation of the threadpool, under which all further requests would timeout without any processing done.

Local Testing

Testing NiFi endpoints locally can be tricky, due to the complex interactions between the Octostar platform and the pipeline. The suggested workflow to test an entire pipeline is to test it one endpoint at a time. This workflow assumes that the pipeline is up and running in your Octostar environment of choice.

To fetch a valid input JSON (list of entities) for an endpoint, it is recommended to fetch it directly from the queue prior to the processor invoking that endpoint. This can be done either:

  • Manually, via the NiFi GUI interface. Find the involved processor and select Stop. Then, send an entity which this processor would take in input. If the processor had been recently in use, it is also possible to use the Replay Last Event option to queue again the last processed file, which is very useful for debugging
  • Programmatically, via the NiFi Rest API

Once a valid input has been obtained, the endpoint can be tested by running the server locally while also running a test script, like this:

docs = "your JSON here"

import requests
import json
response = requests.post(
"http://127.0.0.1:8502/api/nifi/your-endpoint?processor_suffix=0",
json=docs
)

data = json.loads(response.content.decode())
print(data)

Do note your back-end server should be able to interact with the Octostar API without any modifications (except for specifying the Octostar base url), as the input JSON already contains the necessary authorizations to do so.

If you are making asynchronous requests (e.g. by using Celery), you can easily extend this code by adding e.g. a polling mechanism in your test code.

If this output JSON needs to be tested against the pipeline, a GenerateFile processor can be temporarily inserted between the InvokeHTTP processor and the following one with the output JSON as its contents. Do note the generated FlowFile may lack attributes which would have been written in previous steps by the pipeline; an UpdateAttribute processor can be used to inject those as well.