In the previous post, we saw what mesos is, how is it useful and getting started with it. In this post, we shall see how to write your own framework on mesos.
(In mesos, a framework is any application running on it.) 
This post explains about a framework called "mesos-pinspider" which fetches the user profile information and user board information of a pinterest page of a user.


Mesos Framework

In general, a Mesos framework has three basic components. 
- Driver which submits the tasks to the framework
- Scheduler which registers with the master to be offered resources, takes the tasks and runs them on executor
- Executor process that is launched on slave nodes to run the framework’s tasks


Pinspider Framework Example


You may check the code here on github. Let's break it down to PinDriver, PinScheduler and PinUserProfileExecutor.


Driver

The driver component of the framework is PinDriver. 
    • Create Executor Info
Describe the information about the executor using the Builder pattern and mesos use Google Protocol Buffers for the data interchange. Here, we need to set the executorID, command which is basically a shell command, executed via: '/bin/sh -c value'. Any URIs specified are fetched before executing the command. The name is set by setName(). The source is set by setSource(), an identifier style string used by frameworks to track the source of an executor. This is useful when it's possible for different executor ids to be related semantically.
Protos.ExecutorInfo userProfileExecutorInfo = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue("PinUserProfileExecutor")).setCommand(commandInfoUserProfile).setName("PinUserProfileExecutor Java").setSource("java").build();
    • Create Framework Info
Describe the framework information. The user field is used to determine the Unix user that an executor/task should be launched as. If the user field is set to an empty string Mesos will auto-magically set it to the current user. The amount of time that the master will wait for the scheduler to fail-over before removing the framework is specified by setFailoverTimeout(). The name of the framework is set by setName()
Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser("").setName("Pinspider Framework"); 
    • Instantiate Scheduler
You need to instantiate the Scheduler with the number of tasks that needs to be submitted for the executor to run.
Scheduler scheduler = args.length == 1 ?new PinScheduler(userProfileExecutorInfo,userBoardExecutorInfo) : new PinScheduler(userProfileExecutorInfo, userBoardExecutorInfo, Integer.parseInt(args[1]), args[2]);

Note: Please note that two ExecutorInfo are used ie. one for fetching user profile information and the other one for user board information for demonstration. This explanation involves only one executorinfo - userProfileExecutorInfo
    • Starting the mesos scheduler driver.
MesosSchedulerDriver is an implementation of SchedulerDriver which is an abstract interface to connect scheduler to mesos. This is done by managing the life-cycle of the scheduler ( start, stop and wait for tasks to finish) and also to interact with Mesos (launch tasks, kill tasks etc). 
MesosSchedulerDriver schedulerDriver = new MesosSchedulerDriver(scheduler,frameworkBuilder.build(), args[0]);
int status = schedulerDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;schedulerDriver.stop();
System.exit(status);

Executor Implementation

The Executor component of the framework is PinUserProfileExecutor.
Executor is a callback interface which is implemented by frameworks' executors. In our implementation, let us concentrate on launchTask()
@Override public void launchTask(final ExecutorDriver executorDriver, final Protos.TaskInfo taskInfo) { 
}
    •  Set the task status by setting the ID and the state with a builder pattern.
Protos.TaskStatus taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_RUNNING).build();
    •  Send the status update to the framework scheduler retrying as necessary until an acknowledgement has been received or the executor is terminated, in which case, a TASK_LOST status update will be sent.
executorDriver.sendStatusUpdate(taskStatus);
    • Get the data from the tasks and run your logic.
try { message = ("userprofile :" + getUserProfileInfo(url)).getBytes(); } catch (IOException e) { LOGGER.error("Error parsing the Pinterest URL :" + e.getMessage()); }
    •  Send the framework the message.
executorDriver.sendFrameworkMessage(message);
    •  Mark the state of the task as finished and send the status update to the framework scheduler.
taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()) .setState(Protos.TaskState.TASK_FINISHED).build();executorDriver.sendStatusUpdate(taskStatus);
    •  main() method to create an instance of MesosExecutorDriver and run 
mesosExecutorDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1

Scheduler Implementation

The Scheduler component of the framework is PinScheduler.
Scheduler is a callback interface to be implemented by frameworks' schedulers. In our implemenation, let us concentrate on resourceOffers(), statusUpdate() and frameworkMessage()
  • Constructor : construct with the executor information and the number of launch tasks.

public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor , Protos.ExecutorInfo pinUserBoardExecutor ) {
this(pinUserProfileExecutor,pinUserBoardExecutor, 5, "http://www.pinterest.com/techcrunch");
}
public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor,Protos.ExecutorInfo pinUserBoardExecutor,  int totalTasks, String url) { this.pinUserProfileExecutor = pinUserProfileExecutor; this.pinUserBoardExecutor = pinUserBoardExecutor; this.totalTasks = totalTasks; this.crawlQueue = Collections.synchronizedList(new ArrayList<String>()); this.crawlQueue.add(url); }
  •  Resource Offers
    •  A resource offer can be resources like CPU, memory etc. From the offers list, get the scalar value of the resources. We need to give our requirements of resources for the tasks while setting the task info.
for (Protos.Offer offer : list) { List<Protos.TaskInfo> taskInfoList = new ArrayList<Protos.TaskInfo>(); double offerCpus = 0; double offerMem = 0; for (Protos.Resource resource : offer.getResourcesList()) { if (resource.getName().equals("cpus")) { offerCpus += resource.getScalar().getValue(); } else if (resource.getName().equals("mem")) { offerMem += resource.getScalar().getValue(); } } LOGGER.info("Received Offer : " + offer.getId().getValue() + " with cpus = " + offerCpus + " and mem =" + offerMem);
    • Create task ID.
Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();
    • Create task info by setting task ID, adding resources, setting data and setting executor.
Protos.TaskInfo pinUserProfileTaskInfo = Protos.TaskInfo.newBuilder().setName("task " + taskID.getValue()).setTaskId(taskID).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(CPUS_PER_TASK))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(MEM_PER_TASK))).setData(ByteString.copyFromUtf8(crawlQueue.get(0))).setExecutor(Protos.ExecutorInfo.newBuilder(pinUserProfileExecutor)).build();
    •  Launch the tasks through the SchedulerDriver.
...  taskInfoList.add(pinUserProfileTaskInfo);taskInfoList.add(pinUserBoardTaskInfo);}schedulerDriver.launchTasks(offer.getId(), taskInfoList);
  •  Status update
This is invoked when the status of a task has changed ie., a slave is lost and so the task is lost, a task finishes and an executor sends a status update saying so.
@Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {... }
    • Stop the SchedulerDriver if tasks are finished
if (taskStatus.getState() == Protos.TaskState.TASK_FINISHED) { finishedTasks++; LOGGER.info("Finished tasks : " + finishedTasks); if (finishedTasks == totalTasks) { schedulerDriver.stop(); } }
    •  Abort the SchedulerDriver if the tasks are killed, lost or failed
if (taskStatus.getState() == Protos.TaskState.TASK_FAILED
|| taskStatus.getState() == Protos.TaskState.TASK_KILLED
|| taskStatus.getState() == Protos.TaskState.TASK_LOST) {
LOGGER.error("Aborting because the task " + taskStatus.getTaskId().getValue() + " is in unexpected state : "
+ taskStatus.getState().getValueDescriptor().getName() + "with reason : " + taskStatus.getReason()
 .getValueDescriptor()
 .getName()
+ " from source : " + taskStatus.getSource().getValueDescriptor().getName() + " with message : "
+ taskStatus.getMessage());
schedulerDriver.abort();
}
  •  Framework Message
This is invoked when an executor sends a message.
    • Handle your message
@Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, byte[] bytes) { String data = new String(bytes); System.out.println(data); LOGGER.info("User Profile Information : " + data); }
Complete code is available here with the instructions to run and sample output.
Happy Learning! :)