Architecture Pattern: Machine Learning Model as a Service Backend.

Over years many design patterns had been invented and documented. Currently, a new pattern emerges that relies on using a Generalized Machine Learning Model as a core of a Service Backend.

Just to make my point clear, the claim is not about developing a very specialized and single-purpose model that can fulfill one, often very narrow product feature. Solutions like that exist for a long time and are already well-researched, documented, and tested. The best examples can be the recommendation system that companies like Google, Facebook, Netflix, or Amazon are using and many of us are familiar with. The true novelty is the architecture design pattern in which, a generalized machine learning model could be used at the core of the service architecture. A model that supports virtually all of the product’s features. The term support was used here on purpose as opposed to implements, as in this case, the model itself will have little to no code that is tailored for a particular feature and instead will be generalized enough to handle all of the product requirements.

This opens completely new possibilities. Even for completely dynamic product requirements that could be changing without the need for major re-architecting and re-implementation. The core of the model would remain virtually unchanged. Not to try to sound naive or over-optimistic. Today models that are considered the state-of-art require training and then very meticulous fine-tuning so that they achieve an acceptable level of accuracy. Such a process will remain mandatory in the foreseeable future. Yet increasingly so we are moving towards a world where the models themself will be more and more capable and could be generalized to a point in which they could be easily applied to a wide set of use cases.

For the past two decades, multiple Software as Service products had been developed, and many of them had required solving a complex architecture or even distributed system problem to enable a specific set of features. They require a significant amount of time for design, development, and operation. Many of such services were continuously worked on so that they had been gradually improving over time as new features were added, and bugs had been fixed. All the effort that had been put into that development process could be measured in the amount of money, time, and resources that it required to build those products. The ones that become most successful have a never-ending stream of new requirements, improvements, and changes.

What if, all of that well-established software development process were to become a thing of the past? The emerging generalized Large Language Models are opening a possibility not only to use them in various interactive applications but more fundamentally open the path forward for building a generalized computation platform that can become a core component in architecting and developing all kinds of commercial products. Since the release of GPT-3 back in 2020 in the past three years multiple startups emerged that offer their products that had been enabled only thanks to the invention of the Large Language Models.

Yet it appears that there is still a design paradigm from which the industry hasn’t shifted just yet. The current mindset is to develop a product usually in the form of SaaS to which the ML model can be considered as an add-on. In this post, I am going to claim that is already possible today to build a product that at its core has a machine learning model and that model is supporting every single product feature. Such a backend model would be still hidden behind API and as of today still require integration code in front of it. Yet it is the model that performs all of the business-critical logic without a dedicated line of code. That having said the specific feature set supported today is going to be narrow and very specialized, but it’s only a matter of time until the models will be improved and go beyond current limitations.

Arguably while still with quite limited application of text processing those models can be very successfully applied in various tasks that would rely either on classification, summarization, or text generation. A very real example of such application could be a generalized template processing system. Such a system would then be easily adjusted for dynamically changing requirements. It would also be possible to put such an idea to an extreme — such architecture opens the possibility of making the end-user experience fully customizable which will go way beyond any of the initial design assumptions.

What would be the benefit of designing a system in such a way?

  • Agility — development speed and time to market does matter. Software development is a time and resource-intensive process today. Usually taking effort estimated in engineering years to be completed. Instead, the ML-powered architecture will allow the product teams to iterate quickly, possibly testing multiple different product variants to find the optimal solution, something that today is going to be expensive.
  • Flexibility — the system requirements may not be fully known or finalized by the time the product had been completed and deployed. This is definitely true even today as the software is evolving over time, yet this is time and cost-intensive process. For well establish products it is also becoming harder and harder for introducing significant changes as the complexity of that is constantly growing. Instead, a product built on top of the model could be adjusted almost instantly.
  • Unparallel customization — it will be possible not only for the service vendor to provide a set of features but also for the end user to be able to customize the end-user experience. Did a service vendor introduce a breaking change? Not a problem, you as the customer could restore the system behavior to the previous state without impacting any other user at the same time. It might be the case, not every product is going to be built in such a way, but for many, there might be little reason not to do so.

Without a doubt, future startups will emerge that will embrace such architecture choices at the core of their product. This will potentially allow them to quickly benefit and gain upper hand over their competitors, particularly ones that will be too slow to adapt to the new reality.

Yet, we don’t see yet such architecture being fully adapted today and there is good reason for it. The key points could be summarized as:

  • Correctness — it’s not a secret that no real-life models achieve 100% accuracy of their results, which might not be acceptable for many applications. Particularly the ones that are considered critical, such applications will definitely not adopt such solution.
  • Cost — today the cost of a one million GPT-3 APIs calls with an output of 4,000 characters each, costs 100,000 (one hundread thousand) times more than the cost of invoking AWS Lambda or Google Cloud Function. The models are cheaper if we compared them to the human cost, but still, there is tremendous room for improvement in terms of cost-effectiveness. Today they can not simply compete with existing SaaS products. Though, if history teaches us anything is to not bet against progress.
  • Performance — with the increased size of the input a model can take a significant amount of time to process result going in seconds. Still, this will outperform human manual work but will be overall slower than specialized implementations.

A significant amount of research and investment will be needed to improve the capability, performance and cost-effectiveness of the existing models or invent completely new ones until they would be able to fully compete with existing SaaS offerings on all fronts, but even before that will happen they will be able to compete in a very narrow field that today is not fully automated and still be able to be cost-effective and more efficient if compared with the alternative.

While we are still years away from being able to use ML models as fully generalized computation platforms. Despite that, even now is going to be possible to use a generalized ML model to fulfill all of the features of a very specialized product. This should worry software engineers more than using ChatGPT for generating code or instantly solving coding interview questions.

Using Amazon EventBridge for Automated Event Reconciliation

Amazon EventBridge had announced last week support for event Archiving & Replay that primary goal is to help users with disaster recovery and guarantee that the producer and consumers can reconcile the event and through that theirs state.

EventBridge itself by principle should not drop the event delivery, in case of failure by default the delivery is going to be retried up 24 hours. Only after which the event will be finally drop. This behaviour is now also configurable through the RetryPolicy and can be adjusted per configured target.

Besides that the there are still cases in which the event that had been delivered by the service might end up drop on the floor. Some of typical cases for that:

  • Bug in destination Lambda function, that does not configured DLQ.
  • Deleting or disabling the rule.
  • Misconfigured Rule input transformer.

The above are only the scenarios specific to use of AWS, in general there might be use cases in which reconciliation of events is actually part of the business requirements like guaranteeing that the warehouse inventory is up to date or all of the invoices in the given point in time had been process.

Arguably one of the problem whenever an incident happens is to be able to detect that scenario and then recover every of the events that hasn’t been processed. With the former typically CloudWatch alarms on failed invocations, Lambda function execution or missing data points can help in later case the archive created on the EventBridge event bus can be used for replaying all of the events from the beginning time of the incident to restore the consumer state.

This can be effectively done providing that consumer can process the events idempotently as replaying the events might also mean delivering duplicates of previously delivered events.

Though interesting enough the process of replaying the events doesn’t need to necessary done only in cases of failure and rather the service itself can be design in a way that the event reconciliation happens on regular basis in particular every day, week or month. Without event need of manual work. In such cases it possible to build the system that is able to self reconcile. This can be easily done today with actually little efford.

EventBridge allows to configured a scheduled events. That can be run based on cron expression. The event then can be wired to Lambda function that will be responsible for starting the replay. An example application that does exactly that can be found at Github repo.

You can simply clone it and deploy it using SAM.

$ git clone https://github.com/jmnarloch/aws-eventbridge-replay-scheduler
$ cd aws-eventbridge-replay-scheduler
$ sam build
$ sam deploy --guided

The CloudFormation template has a cron expression that can be used to configure how often the automated replay should be trigger and Lambda function. The logic of the function uses the same the same cron expression to compute the tumbling window to trigger automated replay of events.

The implementation is quite straightforward:

exports.lambdaHandler = async (event, context) => {
    const archive = process.env.AWS_EVENTBRIDGE_ARCHIVE_ARN;
    const eventBus = process.env.AWS_EVENTBRIDGE_EVENT_BUS_ARN;
    const schedule = process.env.AWS_EVENTBRIDGE_REPLAY_SCHEDULE;

    let cron = parser.parseExpression(schedule);
    let replayEndTime = cron.prev();
    let replayStartTime = cron.prev();

    console.log("Replaying events from %s to %s with event time between [%s, %s]",
        archive, eventBus, replayStartTime, replayEndTime);

    await events.startReplay({
        ReplayName: uuid.v4(),
        EventSourceArn: archive,
        Destination: {
            Arn: eventBus
        },
        EventStartTime: replayStartTime.toDate(),
        EventEndTime: replayEndTime.toDate()
    });

    return {};
};

Some of scheduling examples:

Running the reconciliation every day at 1 AM.

0 1 ? * * *

Running the reconciliation every week on Sunday at 1 AM.

0 1 ? * 0 *

Running the reconciliation last day of month at 1 AM.

0 1 L * ? *

Interestingly enough this allows to also implement a use case of delayed event delivery, providing that the events will be not processed on first publishing.

What are the tradeoff of executing the reconciliation on schedule? The biggest gain is fact that you have a zero touch operations by design and you don’t require a manual operation during an disaster recovery to restore the state, though there will be cases in which they implied additional time needed for event recovery is unacceptable and immediate means for replaying the events is necessary. The down side of continuously replaying the events is additional cost. Though that can be kept in check as the replay is using tumbling window to never replay the events more than once.

Some ideas for future improvements is to make sure that the Lambda function that triggers the replay is maintaining the state so that is guaranteed that replay will be never repeated for the same time window as well in case of failure all of the miss time windows would be covered.

On the EventBridge side an interesting idea would be to allow configuring Replay as Target to trigger it base on the schedule event without need to write code any code.

AWS EventBridge Pattern DSL

EventBridge had been lately updated with set of new pattern matching capabilities that had been nicely captured in the blog post by James Beswick. The existing functionality of matching the event based on exact equality of certain fields has now been expanded to support operations like prefix matches, not matches or matching base on a presence of certain attributes. This altogether helps to pre filter the events based on specific criteria, that previously was not only achievable by adding the needed boilerplate logic to your events consumer and dropping all events that are not meeting certain criteria.

Even before the pattern matching syntax had expanded with additional keywords and operators it was not uncommon to mis configure your event pattern by specifying wrong attribute name or pattern syntax, no matter whether use with AWS CLI, CloudFormation or through AWS Console.

To avoid this scenario and first of all make the creation of the Rule pattern more bulletproof experience as well as allow it to be fully reproducible and testable I had create a very small utility Java library that introduce DSL for EventBridge pattern language.

You can add the library to your existing project by simply dropping it to your Maven (or Gradle) project.

<dependency>
  <groupId>io.jmnarloch</groupId>
  <artifactId>aws-eventbridge-pattern-builder</artifactId>
  <version>1.0.0</version>
</dependency>

The utility is really simple to use. It defines a `EventsPattern` with flow API for defining the constraints of the matched events.

As an example a pattern that matches an event published by aws.ec2 that would be expressed in JSON like:

{
  "source": [ "aws.ec2" ]
}

Can be turn into small Java snippet.

EventsPattern.builder()
        .equal("source", "aws.ec2")
        .build();

Once a pattern had been created it can be turn into JSON simply by calling `toJson` method.

The newly supported operators are also supported.

Prefix matching
For instance if we wish to match events from specifying AWS regions we can create a pattern.

EventsPattern.builder()
        .prefix("region", "eu-")
        .build();

Will corespond to JSON syntax

{
  "region": [{"prefix": "eu-"}]
}

Anything but
Can be used to match everything except certain value.

EventsPattern.builder()
        .path("detail")
                .anythingBut("state", "initializing")
                .parent()
        .build();

Will result in.

{
  "detail": {
    "state": [ {"anything-but":"initializing"} ]
  }
}

AWS SDK

The above examples can be used as drop replacement whenever using the vanilla AWS SDK.

EventsPattern pattern = EventsPattern.builder()
        .equal("source", "aws.ec2")
        .build();

AmazonCloudWatchEvents eventsClient = AmazonCloudWatchEventsClient.builder().build();
eventsClient.putRule(new PutRuleRequest()
        .withName("EC2-Rule")
        .withRoleArn("arn:aws:iam::123456789012:role/ec2-events-delivery-role")
        .withEventPattern(pattern.toJson()));

Testability

The biggest benefit for moving the rule to the code would be arguable the possibility to test the rule even before it’s deployed. For that purpose a dedicated test could be added that would be part of the integration test suite.

    @Test
    public void patternShouldMatchEvent() {

        String event = "{\"id\":\"7bf73129-1428-4cd3-a780-95db273d1602\",\"detail-type\":\"EC2 Instance State-change Notification\",\"source\":\"aws.ec2\",\"account\":\"123456789012\",\"time\":\"2015-11-11T21:29:54Z\",\"region\":\"us-east-1\",\"resources\":[  \"arn:aws:ec2:us-east-1:123456789012:instance/i-abcd1111\"  ],\"detail\":{  \"instance-id\":\"i-abcd1111\",  \"state\":\"pending\"  }}";

        EventsPattern pattern = EventsPattern.builder()
                .equal("source", "aws.ec2")
                .build();

        AmazonCloudWatchEvents eventsClient = AmazonCloudWatchEventsClient.builder().build();
        TestEventPatternResult matchResult = eventsClient.testEventPattern(new TestEventPatternRequest()
                .withEvent(event)
                .withEventPattern(pattern.toJson()));
        assertTrue(matchResult.getResult());
    }

Now each time the event structure itself changes there is regression test in place that can guarantee that the created pattern will still be matching the event. Useful during early development stages or for prototyping purposes.

AWS CDK

AWS CDK is a development framework that allows you to define your AWS infrastructure as a code. At the moment the CDK does not yet support the full syntax of the pattern matching, but the ultimate goal would be to introduce such capacity in the CDK directly.

The project is open source and available on Github repo.

For additional information on the event matching capabilities please refer to the EventBridge documentation.

Automatically discovering SNS message structure

Last month during Re:Invent preview of EventBridge Schema Registry had been announced. One of unique feature that the service brings is the automatic discovery of any custom event publish to EventBridge. As soon as the discovery will be enabled on Event Bus with the service will aggregate the event over period of time and register them as OpenAPI document into the registry. You might ask yourself what is exactly the use case in which you will need a feature like that? Typically the discovery of event will be particularly useful whenever you don’t own the source that publishes the events typically when it’s owned by other team in you organization or even a third party. Even for consuming the AWS EventBridge events prior to the release of the service typical onboarding process required fallowing the contract of the events defined through the AWS documentation and mapping that into language of choice. That could be really tedious.

As for today the service allows integration with EventBridge, though it is definitely not limited to it. In matter of fact it will be able to easy integrate with any JSON base event source. To demonstrate that capabilities with little amount of code we can demonstrate how to automate the discovery of schema from any SNS topic.

To achieve that a little bit of wiring needs to be done to pipe the events from SNS to EventBridge. Unfortunately EventBridge is not a natively supported target for SNS at the moment. Instead we can use AWS Lambda to consume the event from SNS and forward it to EventBridge.

This leads us to very straight forward integration.

AWS To wire everything together we can create a SAM template.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  aws-schema-discovery-sns

  Sample SAM Template for aws-schema-discovery-sns

Globals:
  Function:
    Timeout: 60

Resources:
  SnsDiscoveryTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: 'sns-discovery'

  SnsSchemaDiscoveryFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: app/
      Handler: app.lambdaHandler
      Runtime: nodejs12.x
      Environment:
        Variables:
          event_source: sns.topic
          event_detail_type: !GetAtt SnsDiscoveryTopic.TopicName
      Events:
        SnsEvents:
          Type: SNS
          Properties:
            Topic: !Ref SnsDiscoveryTopic
      Policies:
        - Statement:
            - Sid: EventBridgePutEventsPolicy
              Effect: Allow
              Action:
                - events:PutEvents
              Resource: '*'
Outputs:
  SnsSchemaDiscoveryFunction:
    Description: 'SNS Schema Discovery function Lambda Function ARN'
    Value: !GetAtt SnsSchemaDiscoveryFunction.Arn
  SnsSchemaDiscoveryFunctionIamRole:
    Description: 'Implicit IAM Role created for SNS Schema Discovery function'
    Value: !GetAtt SnsSchemaDiscoveryFunctionRole.Arn

This will deploy through CloudFormation a Lambda function that will be subscribed to dedicated SNS topic. Publishing any message to the SNS will forward it to EventBridge and providing that the schema discovery had been enabled on the default Event Bus auto discovered schema will be registered.

Next thing is to develop the Lambda code. To keep things since let’s use JavaScript for that.

const AWS = require('aws-sdk');
exports.lambdaHandler = async (event, context) => {
    try {
        var eventbridge = new AWS.EventBridge();
        var entries = [];
        event.Records.forEach(record => entries.push({
            Source: process.env.event_source,
            DetailType: process.env.event_detail_type,
            Detail: record.Sns.Message,
            Time: new Date()
        }));
        return eventbridge.putEvents({
            Entries: entries
        }).promise();
    } catch (err) {
        console.log(err, err.stack);
        throw err;
    }
};

The code is very simple. It will deliver any event published to SNS back to EventBridge. It uses the configured source and detail-type attributes to identify this particular event.

The last two things is to enable the discovery on the EventBus. This can be done from Console, CloudFormation or using CLI command.

aws schemas create-discoverer --source-arn arn:aws:events:us-east-1:${ACCOUNT_ID}:event-bus/default

Now by publishing an event to SNS topic we will get automatic registered schema.

To demonstrate that everything is working let’s publish a sample JSON document into the SNS topic.

{
  "Records": [{
    "AwsRegion": "us-east-1",
    "AwsAccountId": "12345678910",
    "MessageId": "4e4fac8e-cf3a-4de3-b33e-e614fd25c66f",
    "Message": {
      "instance-id":"i-abcd1111",
      "state":"pending"
    }
  }]
}

The above event results in fallowing OpenAPI document created in the registry:

{
  "openapi": "3.0.0",
  "info": {
    "version": "1.0.0",
    "title": "SnsDiscovery"
  },
  "paths": {},
  "components": {
    "schemas": {
      "AWSEvent": {
        "type": "object",
        "required": ["detail-type", "resources", "detail", "id", "source", "time", "region", "version", "account"],
        "x-amazon-events-detail-type": "sns-discovery",
        "x-amazon-events-source": "sns.topic",
        "properties": {
          "detail": {
            "$ref": "#/components/schemas/SnsDiscovery"
          },
          "account": {
            "type": "string"
          },
          "detail-type": {
            "type": "string"
          },
          "id": {
            "type": "string"
          },
          "region": {
            "type": "string"
          },
          "resources": {
            "type": "array",
            "items": {
              "type": "object"
            }
          },
          "source": {
            "type": "string"
          },
          "time": {
            "type": "string",
            "format": "date-time"
          },
          "version": {
            "type": "string"
          }
        }
      },
      "SnsDiscovery": {
        "type": "object",
        "required": ["Records"],
        "properties": {
          "Records": {
            "type": "array",
            "items": {
              "$ref": "#/components/schemas/SnsDiscoveryItem"
            }
          }
        }
      },
      "SnsDiscoveryItem": {
        "type": "object",
        "required": ["AwsRegion", "Message", "AwsAccountId", "MessageId"],
        "properties": {
          "Message": {
            "$ref": "#/components/schemas/Message"
          },
          "AwsAccountId": {
            "type": "string"
          },
          "AwsRegion": {
            "type": "string"
          },
          "MessageId": {
            "type": "string"
          }
        }
      },
      "Message": {
        "type": "object",
        "required": ["instance-id", "state"],
        "properties": {
          "instance-id": {
            "type": "string"
          },
          "state": {
            "type": "string"
          }
        }
      }
    }
  }
}

The source code of the SAM application is available at Github repo. Feel free to try it out.

Spring Boot RxJava 2

Last month the RxJava 2 GA version has been released: https://github.com/ReactiveX/RxJava/releases/tag/v2.0.0

The project has been reworked to support the emerging JVM standard: Reactive Streams

Thanks to contribution from Brian Chung the small side project that I have initially authored: https://github.com/jmnarloch/rxjava-spring-boot-starter that adds support for returning the reactive types: Observable and Single from Spring MVC controllers has now support for RxJava2.

While Spring itself will support Reactive Streams mostly through it’s own project Project Reactor. RxJava still will have various support through different project. For instance the latest Spring Data project will allow to design the repositories with build in support for RxJava types

From the API level is the most significant change of the RxJava Spring Boot starter is the package change that nows support types from io.reactivex.* instead of rx.*. Besides that the usage is fairly similar.

Simply add the library to your project:

<dependency>
  <groupId>io.jmnarloch</groupId>
  <artifactId>rxjava-spring-boot-starter</artifactId>
  <version>2.0.0</version>
</dependency>

You can use the RxJava types as return types in your controllers:

@RestController
public static class InvoiceResource {

    @RequestMapping(method = RequestMethod.GET, value = "/invoices", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Observable<Invoice> getInvoices() {

        return Observable.just(
                new Invoice("Acme", new Date()),
                new Invoice("Oceanic", new Date())
        );
    }
}

If you looking for more detail description of migrating to RxJava2, here is a comprehensive guide.

Spring Cloud Stream: Hermes Binder

Introduction

Spring Cloud Stream is a interesting initiative for building message driven application in the widely considered Spring ecosystem. I think that the main idea is ease the usage and configuration to the bare minimum compared to more complex solution which the Spring Integration apparently is.

Altogether Spring Cloud Stream introduces the idea of binders, which are responsible for handling the integration between different MOM at the moment having out of the support for:

  • RabbitMQ
  • Kafka
  • Redis
  • GemFire

For additional information I highly recommend going through the Spring Cloud Stream reference guide.

Allegro Hermes is message broker build on top Kafka with REST API allowing to easily be integrated by HTTP based clients. It also has a rich set of features allowing to pass JSON and binary AVRO messages as well as broadcasting the messages or sending them in batches.

In order to be able to consume it through Spring Cloud Stream we need to provide a dedicated binder that will be able to connect the messages to Hermes.

Fortunately there is one here:

https://github.com/jmnarloch/hermes-spring-cloud-starter-stream

Example:

Let’s try to use it in practice, starting from sample project. You may want to first go through the Hermes quickstart guide to set up your environment.

Next we will download Spring Initializr template using httpie.


$ http -f POST https://start.spring.io/starter.zip type=gradle-project style=cloud-stream-binder-kafka > demo.zip

$ unzip demo.zip

Afterwards you can import the project using your favorite IDE.

The first is to do is to replace the spring-cloud-starter-stream-kafka with hermes binder:


compile('io.jmnarloch:hermes-spring-cloud-starter-stream:0.2.0')

Let’s start by configuring the Hermes URI for the binder.

spring:
  cloud:
    stream:
      hermes:
        binder:
          uri: 'http://frontend.hermes.local:8080'

Now we can design our binding and the POJO used for the message.

package io.jmnarloch.stream.hermes;

import java.math.BigDecimal;
import java.util.UUID;

public class PriceChangeEvent {

    private final UUID productId;

    private final BigDecimal oldPrice;

    private final BigDecimal newPrice;

    public PriceChangeEvent(UUID productId, BigDecimal oldPrice, BigDecimal newPrice) {
        this.productId = productId;
        this.oldPrice = oldPrice;
        this.newPrice = newPrice;
    }

    public UUID getProductId() {
        return productId;
    }

    public BigDecimal getOldPrice() {
        return oldPrice;
    }

    public BigDecimal getNewPrice() {
        return newPrice;
    }
}

And binding for the message channel.

package io.jmnarloch.stream.hermes;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Events {

    @Output
    MessageChannel priceChanges();
}

Through configuration we can specify, the destination topic name and the default content type of the topic.

spring:
  cloud:
    stream:
      bindings:
        priceChanges:
          destination: 'io.jmnarloch.price.change'
          contentType: 'application/json'

In order to enable Spring Cloud Stream binding we need to annotate our configuration class.

@Configuration
@EnableBinding(Events.class)
public class EventsConfiguration {
}

Using the binding is straightforward, a proper proxy is going to be created and can be afterwards injected.

@Component
public class EventsProducer {

    private final Events events;

    @Autowired
    public EventsProducer(Events events) {
        this.events = events;
    }

    public void publishPriceChange(PriceChangeEvent event) {

        events.priceChanges().send(new GenericMessage<>(event));
    }
}

Finally, we can publish our message:

eventsProducer.publishPriceChange(new PriceChangeEvent(uuid, oldPrice, newPrice));

At the moment the binder itself is still under development, but yet this presents the workable example.

Publishing AVRO binary messages is almost as simple as the JSON ones and I’m going to cover that in fallowing blog post.

Spring Boot: Hystrix and ThreadLocals

Foreward

Last time I have described a quite useful, at least from my perspective extensions for RxJava, but overall it defined only a syntactic sugar so that you could easily specify your custom RxJava Scheduler. One of the mentioned applications was very relevant to this blog post and it’s was related to be able to pass around ThreadLocal variables. As in case of RxJava whenever we will spawn a new thread, through subscribing to Scheduler, it’s going to lose any context that was stored within the ThreadLocal variables of the “outer” thread that initiated the task.

The same applies to Hystrix commands.

Initially, the credit for this idea should go to the development team back in my previous company – Allegro Tech, but it so much recurring problem that others has solve it in the past. Yet again I had the need to solve it once again.

Let’s say that I would like to execute the fallowing command, lets put a side for a moment the sense of doing so, only to illustrate the problem:

new HystrixCommand<Object>(commandKey()) {
    @Override
    protected Object run() throws Exception {
        return RequestContextHolder.currentRequestAttributes().getAttribute("RequestId", SCOPE_REQUEST);
    }
}.execute();

Puff – the data is gone.

Even when run in server container in “context” of request bound thread the above code will end with exception. This happens because Hystrix by default will spawn a new thread for executing the code, a side from the Semaphore mode that can be also used. Hystrix manages it’s own thread pools for the commands which will have no relocation to the any context stored in ThreadLocal of the triggering thread.

Overall ThreadLocal variables might be considered as anti-pattern by some, but it’s really so useful in many practical scenarios that it’s really not such uncommon that quite a few libraries depend on those.

Typically your logging MDC context or in case Spring Framework the security Authentication/Principal or the request/session scoped beans etc. So it quite important for some use cases to be able to correctly pass such information. Imagine a typical use case that you are trying to use OAuth2RestTemplate  in @HystrixCommand annotated method. Sadly this isn’t going to work.

The solution

Fortunately the designers of Hystrix library have anticipated such use case and designed the proper extension points. Basically the idea is to enable to decorate the executing task with your own logic, once it’s going to invoked by the thread.

On top of that I’ve prepared a small Spring Boot integration module:

https://github.com/jmnarloch/hystrix-context-spring-boot-starter

At this point the implementation is fairly simple and a bit limited. In order to pass the specific thread bounded value you need to provided a your custom implementation of HystrixCallableWrapper.

For instance to “fix” the above snippet we can register as a bean fallowing class:

@Component
public class RequestAttributeAwareCallableWrapper implements HystrixCallableWrapper {

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return new RequestAttributeAwareCallable<>(callable, RequestContextHolder.currentRequestAttributes());
    }

    private static class RequestAttributeAwareCallable<T> implements Callable<T> {

        private final Callable<T> callable;
        private final RequestAttributes requestAttributes;

        public RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) {
            this.callable = callable;
            this.requestAttributes = requestAttributes;
        }

        @Override
        public T call() throws Exception {

            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
                return callable.call();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}

Adding Java 8 syntactic sugar

It quite soon struck me that this is rather boilerplate implementation, because for every variable that we would like to share we would have to implement pretty much similar class.

So why not to try a bit different approach, simply create a “template” implementation that could be conveniently filled with specific implementation at the the defined “extension” points. Considerably the Java 8 method reference could be quite useful here, mostly because in typical scenario the operations that would be performed would be rather limited to: retrieving value, setting it and finally clearing any track of it.

public class HystrixCallableWrapperBuilder<T> {

    private final Supplier<T> supplier;

    private Consumer<T> before;

    private Consumer<T> after;

    public HystrixCallableWrapperBuilder(Supplier<T> supplier) {
        this.supplier = supplier;
    }

    public static <T> HystrixCallableWrapperBuilder<T> usingContext(Supplier<T> supplier) {
        return new HystrixCallableWrapperBuilder<>(supplier);
    }

    public HystrixCallableWrapperBuilder<T> beforeCall(Consumer<T> before) {
        this.before = before;
        return this;
    }

    public HystrixCallableWrapperBuilder<T> beforeCallExecute(Runnable before) {
        this.before = ctx -> before.run();
        return this;
    }

    public HystrixCallableWrapperBuilder<T> afterCall(Consumer<T> after) {
        this.after = after;
        return this;
    }

    public HystrixCallableWrapperBuilder<T> afterCallExecute(Runnable after) {
        this.after = ctx -> after.run();
        return this;
    }

    public HystrixCallableWrapper build() {
        return new HystrixCallableWrapper() {
            @Override
            public <V> Callable<V> wrapCallable(Callable<V> callable) {
                return new AroundHystrixCallableWrapper<V>(callable, supplier.get(), before, after);
            }
        };
    }

    private class AroundHystrixCallableWrapper<V> implements Callable<V> {

        private final Callable<V> callable;

        private final T context;

        private final Consumer<T> before;

        private final Consumer<T> after;

        public AroundHystrixCallableWrapper(Callable<V> callable, T context, Consumer<T> before, Consumer<T> after) {
            this.callable = callable;
            this.context = context;
            this.before = before;
            this.after = after;
        }

        @Override
        public V call() throws Exception {
            try {
                before();
                return callable.call();
            } finally {
                after();
            }
        }

        private void before() {
            if (before != null) {
                before.accept(context);
            }
        }

        private void after() {
            if (after != null) {
                after.accept(context);
            }
        }
    }
}

Above code is not part of the described extension, but you may use it freely as you wish.

Afterwards we may instantiate as many wrappers as we would like to:

HystrixCallableWrapperBuilder
                .usingContext(RequestContextHolder::currentRequestAttributes)
                .beforeCall(RequestContextHolder::setRequestAttributes)
                .afterCallExecute(RequestContextHolder::resetRequestAttributes)
                .build();

As a result we would be able to pass for instance MDC context or Spring Security Authentication or any other data that would be needed.

Separation of concerns

This is clearly a cleaner solution, but still has one fundamental drawback: it requires to specify the logic for every single ThreadLocal variable separately. It would be way more convenient to have only  define the logic of passing the variables across boundaries of threads between the Hystrix or any other library like for instance RxJava. The only catch is that in order to do so those variables would have be first identified and encapsulated in proper abstraction.

I think such idea would be worth implementing, though I don’t have yet a complete solution for it.

Plans

Nevertheless I would be interested in developing PoC of such generic solution and as done in the past, once again prepare a pull request for instance to Spring Cloud to provide such end to end functionality.

Spring Boot: RxJava Declarative Schedulers

As a fallow up to the last weeks article: Spring Boot: RxJava there is one additional project:

https://github.com/jmnarloch/rxjava-scheduler-spring-boot-starter

Setup as with most Spring Boot starters is fairy simple you just drop the dependency to your project classpath and you are all set:


<dependency>
  <groupId>io.jmnarloch</groupId>
  <artifactId>rxjava-scheduler-spring-boot-starter</artifactId>
  <version>1.0.0</version>
</dependency>

The library brings one functionality, it allows to specify the Scheduler on the RxJava reactive types: rx.Observable and rx.Single in Spring’s declarative manner – through annotations.

The basic use case is to annotate your bean methods with either @SubscribeOnBean or @SubscribeOn annotations.

Example:


    @Service
    public class InvoiceService {

        @SubscribeOnBean("executorScheduler")
        public Observable<Invoice> getUnprocessedInvoices() {
            return Observable.just(
                ...
            );
        }
    }

The motivation here is to ease the integration with Spring Framework and be able to define within the DI container the application level scheduler. Why you want to do that? There are a couple of use cases.

For example you might need to provide a custom scheduler that can be aware of ThreadLocal variables, a typical use case is to pass logging MDC context, so that afterwords the thread running within the RxJava Scheduler can access the same context as the thread that triggered the task, but the applications go beyond that.

Other typical example is for instance customize your scheduler, not relaying on the build in. In order to for instance to limit the thread pool size, considering that the build in schedulers like IO scheduler are unbounded.

In case you want to simply relay on the RxJava predefined schedulers you can still use them with @SubscribeOn annotation.

    @Service
    public class InvoiceService {

        @SubscribeOn(Scheduler.IO)
        public Observable<Invoice> getInvoices() {
            return Observable.just(
                ...
            );
        }
    }

Spring Boot: RxJava

Back to posting, this will be a bit old since I had been working on this integration back in the February.

Interestingly enough I had already prepared a blog post related to this feature within Spring Cloud that added tight RxJava integration with Spring MVC controllers. Remarkably it turn out that the implementation in one of the previous milestones had a flow within it.

I’ve been interested in trying to find a solution towards the problem, though I was keen to support mostly the widely used REST like approach in which (mostly) the entire payload is being returned upon computation, in contrast to streaming the response over HTTP. This approach has been reflected in this small project:

https://github.com/jmnarloch/rxjava-spring-boot-starter

Which work out very well as a reference project, in which I had opportunity to try out different API implementations. On it’s own you can use this in your own project, since the proposed implementation depends only on the Spring Boot and Spring Framework’s MethodReturnValueHandler so if you simply using Spring Boot without additional features provided through Spring Cloud feel free to test it out.

Later the code of project become a baseline for the implementation proposed to

Spring Cloud approach

The final approach that has been implemented in Spring Cloud is a bit different, first of all the support for rx.Observable has been removed, instead you can use rx.Single in similar manner like DeferedResult, to which the underlying implementation in fact maps the RxJava type. The reference describes this a bit more in detail: http://cloud.spring.io/spring-cloud-static/spring-cloud.html#netflix-rxjava-springmvc

Funny enough at my company one of my colleagues had to later on migrate the code of one the projects from the rx.Observable to rx.Single, which he wasn’t really happy about 😉

Spring Boot: Tuning your Undertow application for throughput

It’s been some time since the previous blog post, but finally I though that it’s a good time to make a post about very useful and practical aspect. How to prepare your Spring Boot application for production and how to guarantee that it will be able to handle a couple of millions of views each day.

If you think that you have already made all the needed steps by making your application stateless, scaling it out or running it on the high end machine, think twice because it’s quite likely that there are some bottlenecks inside your application that if not treated with proper attention would most likely degradate the performance and the application overall throughput.

Tuning for latency vs tunning for throughput.

Interesting enough in the past, being aware of the Little’s Law I have thought that tuning your application throughput requires nothing more then reducing your application latency as much as possible. It was just after reading the book Java Performance the after I realized that might not be true in all of the cases.

Generally you can improve the latency first by improving your application algorithmic performance, after that you should take a look on access patterns in your application introducing a caching layer or redesign the way your application is accessing the data can have huge impact on the overall performance. If your application is heavely I/O bound performing operations in the parallel can be a way to improve things a bit.

Also a good idea for improving you application latency is to configure asynchronous logging whether you using Logback or Log4J2, but of them provide proper functionality.

Thread pools

Undertow

Undertow uses XNIO as the default connector. XNIO has some interesting characteristics apart from the default configuration which by default is I/O threads initialized to the number of your logical threads and the worker thread equal to 8 * CPU cores. So on typical 4 cores Intel CPU with hypert-hreading  you will end up with 8 I/O threads and 64 working threads. Is this enough? Well, it depends. Considerably the Tomcat’s and Jetty defaults are 100 and 1000 threads respectively. If you need to be able to handle more request per second this is the first thing that need to consider to increase.

Hystrix

The Hystrix documentation states that:

Most of the time the default value of 10 threads will be fine (often it could be made smaller).

After working with couple of the projects, I found it hardly to believe that this could be a true statement. The defaults for Hystrix is 10 threads per pool, which quickly might turn out to become a bottleneck. In fact the same documentation also states that in other to establish the correct size of hysterix thread pool you should use the fallowing formula:

requests per second at peak when healthy × 99th percentile latency in seconds + some breathing room

So let’s assume that you have a system that has to handle let’s say 24 000 rps, divided by the number of instances, for instance 8, you can establish the appropriate pool size for single instance. This will vary greatly on the latency of your system.

RxJava

Memory usage

All of this is not given without a price. Each of the newly allocated threads consumes memory. Through Java you can configure this property through -Xss property with the default for 64 bit VM being 1 MB. So if you let’s say configure your Undertow thread pool with 512 working threads, be ready that your memory consumption (only for allocating the thread stacks) will be increased to that number.

Connection pools

HTTP

Do you use for instance RestTemplate, or maybe RestEasy JAX-RS client. In fact there is a well known issue reported in RestEasy that uses exactly ONE connection for all of your calls. The good advice is to align that value with the number of working threads of your application server, otherwise when performing the HTTP calls the threads will be waiting for acquiring the underlying HTTP connection from the pool, which will cause unnecessary and probably unintended delay.

Cache

The same basic principal applies to any other kind of service that is being communicated over TCP connection. For instance Memcached clients like XMemcache has nice capabilities of using a multiplexed TCP connection with binary protocol on top of it, giving a throughput of roughly 50 requests per connection, though still if you need to be able to handle greater throughput you need to configure your client to maintain a entire pool of connections.

Garbage collection

If you opt for low latency, probably you should consider optimizing the Garbage Collector as the last kind of resort. As much as garbage collection could be optimized through different settings this does not handles the true problem,  if you can address those issue first you should be able to be just find and tune the garbage collector afterwards for the best overall performance.

Final thoughts

Equipped with this practical knowledge how you will be able to tell if your application is your application faces any of those problems, first of all equipped with proper tools. Stress test are one of them, you can either decide to treat the application as a black box and use for instance Gatling to measure the throughput of your application, if you need need more fined grained tools the jmh project that could used for running benchmarks of the individual Java methods. Finally use profiler to understand where your application is spending the most time, is it for instance a RestTemplate call, or maybe your cache access time sky rockets whenever you? A good advice on how to measure the characteristics of the application is to use the doubling approach, run your benchmark with for instance 64 RPS – monitor the results, and repeat the experiment with double the request number. Continue as long as you haven’t reached the desired throughput level.

With all of this being said, the true is that this in fact describes the hard way, there is also a simple and fast path to solve your heavy load problems especially for HTTP:

Use a caching reverse proxy.

Either if it’s Nginx or Varnish, both of them should take the load out of your backing services and if you can decrease your load  you do not need spend so much time on the optimizations.