Computing Platform (2): Slow Lane Job Keeper

BACKGROUND


A reminder: Lambda/Kappa Architecture


The Computing Platform extends the lambda architecture layout as it is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream-processing methods. This approach to architecture attempts to balance latency, throughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data. The two view outputs may be joined before presentation. The rise of lambda architecture is correlated with the growth of big data, real-time analytics, and the drive to mitigate the drawbacks of relational database-centric architectures and not efficient software components, producing an excessive latency, footprint and cost.


The "job"


The "job" concept is absolutely essential here. It is a unit of computing. It counts with a specific configuration including the cluster to be executed against, the resources to be assigned, name, schedule, frequency and others. Since the Computing Platform uses Spark and Databricks to provide parallel computing features by default the Spark job includes the driver application that contains the logic for a given operation managed by the job.

The architecture needs a component that can manage the job requests as they arrive and handle the list of the job requests  sent before and stored in a waiting queue , monitoring their execution and managing the generation and devolution of the outputs.

DESCRIPTION


To manage the batch processes the Computing Platform follows a typical Big Data approach based on the submission of job requests to a component that acts as a traffic manager, administering the jobs, querying the status and level of occupation of the clusters and constantly providing information about what is being managed (the requests for execution of jobs).

From a layout point of view the Job Keeper is receiving the job requests once they have been validated and pre-processed by the endpoints and mediators. The endpoint and the mediator are generic roles that is in charge of applying needed information like the priority of the job request, correlation ID, processing times, etc.

This is an example of a job request as it is received by an endpoint:









{  
   "correlation_id":"<TO BE INSERTED LATER>",
   "notification_queue":"<TO BE INSERTED LATER>",
   "created_time":<TO BE INSERTED LATER>,
   "user_id":"geoff@fexco.com",
   "priority":"<TO BE INSERTED LATER>",
   "job": 
      "job_id":"175",
      "job_context": 
         "tenant_id":"333",
         "location_id":"64",
         "pos_id":"64_2",
         "client_id":"",
         "job_operation": 
            "operation_id":"",
            "operation_name":"",
            "cluster_type":"business",
            "email_notifications": 
               "jamessullivan@fexco.com",
               "donalconnor@fexco.com"
            ],
            "python_params": 
               "a",
               "b"
            ],
            "jar_params": 
               "a",
               "b"
            ],
            "notebook_params": 
               "a":"1",
               "b":"2"
            },
            "max_concurrent_runs":1
         }
      }
   }
}

 Note: All options for the different types of driver application (python_params, jar_params and notebook_params) have been included but obviously only one of them is required in real cases.









The Job Request contains the description and parameterization of the driver application (notebook, python script, JAR file), maximum number of concurrent running jobs (depending of the criticality and expected load of the operation), description of the submitter (user, tenant, etc.).



Source: Microsoft

In below diagram you can find the position of the Job Keeper, listening the queues (following the Competing Consumers pattern) managed by the mediators. In fact, only the mediators are authorized to send messages through these queues.



Since the mediators are extremely tiny components, with a very low consumption of resources and footprint, they are able to be in charge of the pre-processing of the job requests, freeing the endpoints from specific operations and keeping them as agnostic and generic as possible.

Internally the Job Keeper flow is based on several key factors:

  1. State of the Spark clusters

  2. Requested job features

  3. Priority

  4. Number of maximum concurrent running jobs

  5. Settings about the level of occupation of the clusters


These factors will be consider along the flow. These are the main phases and steps:



As it was said above, the mediators (1) send the job requests with all needed information (pre-processing) via Service Bus queues or HTTP. HTTP should be used by mediators only when the service bus is not available.

Once the job request arrives (2) the Job Keeper does several things:

it stores the job request in persistence (3) as a new entity (job request process)

it checks out (4) the state of the target cluster (active for business time and high priority jobs, passive for out of business time and low priority jobs). To do this the Job Keeper acts as a client for the Azure Databricks API to retrieve information about the available clusters, defined jobs and state of runs (executions of jobs).

From the information from the Databricks API the Job Keeper decides what to do with the job request (5) and accordingly to the made decision it will apply the subsequent policy (6).

A policy is just a set of actions to perform for a specific case/decision. they include several types of actions like:

  • Running the job request immediately (8) or (if the level of activity is too high)..

  • ..sending the job request to the waiting queue (9).

  • Creating a new cluster if the current ones are too busy or for some reason there is not any cluster available.

  • Notifying the SRE team about a lack of a declared job in the Databricks API.


..and some more actions. Besides, the settings in the configuration allow the Computing Platform managers to change the behavior of the Job Keeper policies.

The job requests in the queue are handled in a scheduled tasks (10). Every few minutes the Job Keeper checks the level of activity of the available clusters and if they are OK it gets the list of waiting job requests to repeat the handling process one more time. This kind of unattended queue manager makes the Job Keeper a central piece of software in the Computing Platform.

Finally and if the job requires a feedback (e.g. generation of reports) once the job requests is completed and succeeded, the Job Keeper gets the output reference (path in the data sink), composes a new entity (Job Response) that contains the needed information foor the client. The job response is sent through the return Service Bus queue included in the job request data structure and eventually it reaches the client that can download the required output by using the reference (URI) in the job response.

KPIs



  • Capacity to manage a number of jobs sent to the Computing Platform, able to manage how the computing clusters are being used.

  • Control and monitoring of sent jobs.

  • Management of feedback and status of executions.

  • Flexibility to manage different situations according to available resources dynamically.

  • Auto-scalability of resources according to priority and criticality of jobs.

  • Full integration with the driver applications containing the operations logic code and being managed by the job abstraction (Spark Databricks).


CONCLUSIONS


The Job Keeper is a key component with a clear function: to prevent an uncontrolled submission of long running  and high consuming resources jobs against the available computing resources.

It does not require a big amount of resources, is accessible through different ways (Service Bus queues, HTTP) and manages the submission of jobs and the generated outputs.

The software application is written in Scala, using Akka, Akka http and Azure Common libraries. The build tools is SBT including containerization.

NEXT STEPS


The Job Keeper will be the center of the strategy of management of job requests. We are learning to apply the best settings to get the best results as possible.

The decision/policy flow is the target for Machine Learning techniques in order to (for instance):

  • Preemptively create Spark clusters and manage the life-cycle in order to speed up the execution of jobs.

  • Use predictions and a more intelligent behavior for job requests handling, allowing the Job Keeper to make less requests to the Databricks API in search of information.

  • Manage priorities and types of jobs in order to assign the resources before to the most critical jobs.

Comments