With Flink Forward Berlin 2024 coming up fast, Ververica is spotlighting some of the interesting learnings from prior years and the engineering techniques behind them. This guest blog from Erik de Nooij, current Program Committee Member and Flink Forward Speaker, focuses specifically on data enrichment by connecting to an API endpoint using FlinkSQL, by building a novel HTTP connector for FlinkSQL at ING bank.
This blog provides additional information to what ING shared during this presentation last year at Flink Forward Seattle 2023. You can watch the recording of that session by signing into Ververica Academy. ING are long time users of Apache Flink and Ververica, and in this piece Erik shows how ING enables its business users to do API calls, including model inferencing using Flink SQL.
Data enrichment is a common task in stream processing, and Flink offers various capabilities to connect to external sources like lookup tables, databases, or web services that can be used to fetch the required data during the processing of events in a stream.
Although calling an API from Flink SQL for data enrichment purposes was our first and main objective, without changing any code it can also be used for model inferencing in case an ML model has been wrapped in an API which may be the case if you use e.g. MLflow for model inferencing.
ING Bank has been an early adopter of Apache Flink since 2016. Our first implementation was a custom Scala implementation in which we implemented a DSL (Domain Specific Language) enabling business users to configure their use cases. Note that back then Flink SQL did not exist yet, but we already adopted the same philosophy that use cases could be described in a declarative way.
For this initial custom implementation, custom code was written to be able to call APIs. However, when ING adopted Flink SQL and moved to a SQL-first approach it revealed a gap in the out of box capabilities of Flink SQL, necessitating the development of an alternative solution to do data enrichments: the HTTP connector.
The following section lists the requirements followed by a paragraph per requirement providing additional details.
In Flink SQL, data sources and sinks are abstracted as tables. This allows you to use standard SQL to declaratively define the transformations you want to perform on the data. The logical consequence is that to support data enrichment via API calls as part of these transformations, also API endpoints need to be abstracted as tables.
By abstracting an API endpoint as a reference table, a lookup join can be used to enrich data when processing events.
A lookup join in Flink SQL is implemented using the processing Time Temporal Join syntax which is characterised by having a common key (like any other join) but also by time attributes of the participating events. Asynchronicity is achieved by inheriting from the AsyncLookupFunction class.
An API endpoint that is called is often developed and operated by another team, or even another organisation than the team using the HTTP Connector to call the API, so the specification of the endpoint is a given.
This specification of an API is typically described via a Swagger file or Open API document which describes the following items:
The HTTP connector should be able to deal with the characteristics of a specific API being:
To have the HTTP connector perform an API call an HTTP client is required. There are many libraries available that implement an HTTP client, some of them are compatible with the libraries used by Flink but some are not. To avoid dependencies issues, an HTTP client of our choosing has been used that has been deployed exclusively in a side car container that co-exists in the pod of the Flink taskmanager. From the Flink taskmanager the sidecar is called using an HTTP client that is implemented in a library already available as part of the Flink libraries.
Now that the requirements are clear, let’s look at using the HTTP connector from the perspective of a user that wants to do an API call in a use case built using Flink SQL.
To convert an open-api file to a Flink SQL table definition the following configuration is needed by the conversion script:
Based on the configuration the following is generated:
The table definition that is generated contains both the key-fields that are used in the request as well as the fields that are used in the response. In other words, the table is being used as an abstraction to pass information back and forth between the Flink job and the API. In the appendix there is a sample open-api file and the corresponding generated Flink SQL table definition.
A lookup join enriches information based on a (compound) key field from an input table and uses the enriched information to populate the fields of an output table.
In the example that we will be using below the input table only contains a customer identifier which is used to do the API call. For the given customer identifier, the API will return firstName, lastName and date_of_birth.
Note: The table definitions of the input and output table can be found in the appendix
Now that we have the definitions of all tables, we can write our query. The query is a standard lookup join shielding all the technical details of the API call that are being made under the hood. This enables every end user with standard SQL knowledge to do data enrichments without the need to have technical knowledge of the API. In other words, something inherently complex has been made very easy to use.
INSERT INTO `outputTable` (
`id`,
`firstName`,
`lastName`,
`date_of_birth`
)
SELECT
inputTable.`id`,
apiTable.`firstName`,
apiTable.`lastName`,
apiTable.`date_of_birth`
FROM inputTable
INNER JOIN apiTable FOR SYSTEM_TIME AS OF inputTable.procTime
ON inputTable.id = apiTable.id
WHERE apiTable.httpStatusCode = 200
Once the Flink job has been deployed, an event on the input topic triggers the API call and in this example the customer ID is found, and the fields returned by the API are written to the output topic.
Note: The screenshot above has been taken from the outputTable in the Flink SQL editor. The field values shown are those returned by the API.
Proper error handling is important for any IT system but maybe even more so when doing an a-synchronous API call from a stream to an API that you do not have any control over.
When calling an API, errors can be divided into two categories: The “known” documented errors that are described in the open-api file for the various http return codes errors and the ones that you don’t have any control over like network errors.
If the API returns a http code code other than 200 then the specific field for that code is being populated. E.g. calling the API with a non-existing key will populate the httpcode404 field and leave the other fields empty. The same mechanism is in place for all http codes defined in the swagger file. Additionally, there is a fall back implemented in case an HTTP code is returned that has not been defined in the open-api file since documentation tends not to be flawless.
Errors in this category are timeouts, network issues and similar issues that prevent the request to either reach the API or that prevent the API to return a response. The timeouts can be detected by the Http Client in the sidecar which will then return a HTTP 408 return code.
The original purpose behind the HTTP connector was to be able to do data enrichments using plain SQL. Technically this can be done by a get in case the parameters are passed on via the url or a post in case the parameters are passed on via the request body.
Over time additional requirements were met with only slight additions to the HTTP connector.
This section peeks under the hood and explains several implementation details.
The behaviour of an API table is configured via table options which are part of the table definition and are generated by converting an open-api file. Below is an example of a table where the various options are detailed further down.
CREATE TABLE apiTable (
-- See Appendix for an example of the fields that are generated for a given open--- api file
) WITH (
-- connector options
'connector' = 'api-endpoint-ing', -- Mandatory table options
'host' = 'api.mycompany.com',
'method' = 'post',
'path-template' = '/lookup',
'status-code-column' = 'httpStatusCode',
'response-column-prefix' = 'httpcode',
'format' = 'json', -- Optional table options
'base-uri' = '${api_base_uri}',
'http-connect-timeout' = '5 seconds',
'http-socket-timeout' = '10 seconds',
Etc, see further down
)
The API table is a Dynamic table meaning Flink does not own the data itself. The content of a Dynamic table comes from an external system, in this case it comes from an API response. Implementing the connector can be done by hooking into the existing stack. To make it a bit more specific, the custom 'api-endpoint-ing' connector has been implemented by creating a class 'ApiDynamicTableFactory' which implements the Java interface DynamicTableSourceFactory which has several methods that need to be implemented as document below.
Source: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#overview
The function `factoryIdentifier` registers the connector `api-endpoint-ing` to Flink
@Override
public String factoryIdentifier() {
return "api-endpoint-ing";
}
The function `requiredOptions` makes the mandatory table options known to the connector so that the connector can construct the request and knows in which columns to store the result. The actual values come from the mandatory table options
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOST); // FQDN of the host that serves the API
options.add(METHOD); // post or get
options.add(PATH_TEMPLATE); // Query URL parameter, e.g. /lookup
options.add(STATUS_CODE_COLUMN); // Field used to store the httpcode
options.add(RESPONSE_COLUMN_PREFIX); // Prefix applied to all column names
return options;
}
The function `optionalOptions ` is similar to the function `requiredOptions` with the difference that these options are optional and are used for more specific APIs and to optimize the runtime behavior of the connector to fit the runtime characteristics of the API.
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(BASE_URI); // base-uri used to call the api
options.add(REQUEST_BODY_COLUMN); // Column name for for the request-body
options.add(QUERY_COLUMNS); // Columns names mapped to query-arguments
options.add(HEADER_COLUMNS); // Columns names mapped to header values
options.add(HTTP_CONNECT_TIMEOUT); // Timeout setting for the HTTP connection
options.add(HTTP_SOCKET_TIMEOUT); // Timeout setting for the SOCKET connection
options.add(HTTP_VALIDATE_AFTER_INACTIVITY); //Re-validate connections setting
options.add(HTTP_TIME_TO_LIVE); // Total span of time connections
return options;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context)
…
// Helper functions implementing the functionality of the DynamicTableSource
…
return new ApiDynamicTableSource(
apiEndpointDefinition,
decodingFormat,
encodingFormat,
producedDataType,
cache);
}
Finally, the core of the runtime behavior is in the `ApiRowDataAsyncLookupFunction` class which has all parameters to its proposal and implements the construction of the HTTP request and handles the response.
No, it has not been open sourced at this moment. Reason being is that there are several implementation details that are ING specific. Then again, certain parts can be beneficial to other users of Flink SQL and can be open-sourced. E.g. the tool to generate a table definition based on an open-api file which is very similar to the tool we have built to convert an. avro schema to a Flink SQL table definition.
Both the HTTP connector and the ING specific Kafka connector are based on standard connectors with additional ING specific security functionality which is the reason why the source code of these connectors cannot be open sourced in its entirety.
The HTTP connector can also be used for model inferencing based on the condition that the ML model is wrapped in an API. If that condition is met, then an end-user can do model inferencing using plain SQL. The approach that is outlined in FLIP 437 is a programmatically approach that requires implementing a Java interface.
ING has adopted a SQL first approach for its Streaming Data Analytics (SDA) platform and where possible a SQL only approach. Paramount to its success is being able to abstract all sources and sinks using tables and where needed develop the relevant connectors.
Moving forward, SDA is expected to play a bigger role in the orchestration of model inferencing via integration with a feature store. Last but not least, we are investigating if and how SDA can play a role in the RAG architecture of Generative AI and LLM’s. Developments in these areas will be shared via additional future blogs and possible on Flink Forward (subject to abstract approval).
If you'd like to learn more about Apache Flink and network with the Flink Community, including Erik, tickets for Flink Forward 2024 in Berlin are available here.
Ready to try Apache Flink for yourself? Spin up Ververica's cloud offering with $400 free credits to get started.
This appendix contains the files that were used in the example described earlier in this blog.