Apache Mesos : Writing your own distributed framework
(In mesos, a framework is any application running on it.)
This post explains about a framework called "mesos-pinspider" which fetches the user profile information and user board information of a pinterest page of a user.
Mesos Framework
In general, a Mesos framework has three basic components.- Driver which submits the tasks to the framework
- Scheduler which registers with the master to be offered resources, takes the tasks and runs them on executor
- Executor process that is launched on slave nodes to run the framework’s tasks
Pinspider Framework Example
You may check the code here on github. Let's break it down to PinDriver, PinScheduler and PinUserProfileExecutor.
Driver
The driver component of the framework is PinDriver.- Create Executor Info
Protos.ExecutorInfo userProfileExecutorInfo = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue("PinUserProfileExecutor")).setCommand(commandInfoUserProfile).setName("PinUserProfileExecutor Java").setSource("java").build();
- Create Framework Info
Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser("").setName("Pinspider Framework");
- Instantiate Scheduler
Scheduler scheduler = args.length == 1 ?new PinScheduler(userProfileExecutorInfo,userBoardExecutorInfo) : new PinScheduler(userProfileExecutorInfo, userBoardExecutorInfo, Integer.parseInt(args[1]), args[2]);
Note: Please note that two ExecutorInfo are used ie. one for fetching user profile information and the other one for user board information for demonstration. This explanation involves only one executorinfo - userProfileExecutorInfo
- Starting the mesos scheduler driver.
MesosSchedulerDriver schedulerDriver = new MesosSchedulerDriver(scheduler,frameworkBuilder.build(), args[0]);int status = schedulerDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;schedulerDriver.stop();System.exit(status);
Executor Implementation
The Executor component of the framework is PinUserProfileExecutor.Executor is a callback interface which is implemented by frameworks' executors. In our implementation, let us concentrate on launchTask()
@Override public void launchTask(final ExecutorDriver executorDriver, final Protos.TaskInfo taskInfo) {}
- Set the task status by setting the ID and the state with a builder pattern.
Protos.TaskStatus taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_RUNNING).build();
- Send the status update to the framework scheduler retrying as necessary until an acknowledgement has been received or the executor is terminated, in which case, a TASK_LOST status update will be sent.
executorDriver.sendStatusUpdate(taskStatus);
- Get the data from the tasks and run your logic.
try { message = ("userprofile :" + getUserProfileInfo(url)).getBytes(); } catch (IOException e) { LOGGER.error("Error parsing the Pinterest URL :" + e.getMessage()); }
- Send the framework the message.
executorDriver.sendFrameworkMessage(message);
- Mark the state of the task as finished and send the status update to the framework scheduler.
taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()) .setState(Protos.TaskState.TASK_FINISHED).build();executorDriver.sendStatusUpdate(taskStatus);
- main() method to create an instance of MesosExecutorDriver and run
mesosExecutorDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1
Scheduler Implementation
The Scheduler component of the framework is PinScheduler.Scheduler is a callback interface to be implemented by frameworks' schedulers. In our implemenation, let us concentrate on resourceOffers(), statusUpdate() and frameworkMessage()
- Constructor : construct with the executor information and the number of launch tasks.
public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor , Protos.ExecutorInfo pinUserBoardExecutor ) {
this(pinUserProfileExecutor,pinUserBoardExecutor, 5, "http://www.pinterest.com/techcrunch");
}
public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor,Protos.ExecutorInfo pinUserBoardExecutor, int totalTasks, String url) { this.pinUserProfileExecutor = pinUserProfileExecutor; this.pinUserBoardExecutor = pinUserBoardExecutor; this.totalTasks = totalTasks; this.crawlQueue = Collections.synchronizedList(new ArrayList<String>()); this.crawlQueue.add(url); }
- Resource Offers
- A resource offer can be resources like CPU, memory etc. From the offers list, get the scalar value of the resources. We need to give our requirements of resources for the tasks while setting the task info.
for (Protos.Offer offer : list) { List<Protos.TaskInfo> taskInfoList = new ArrayList<Protos.TaskInfo>(); double offerCpus = 0; double offerMem = 0; for (Protos.Resource resource : offer.getResourcesList()) { if (resource.getName().equals("cpus")) { offerCpus += resource.getScalar().getValue(); } else if (resource.getName().equals("mem")) { offerMem += resource.getScalar().getValue(); } } LOGGER.info("Received Offer : " + offer.getId().getValue() + " with cpus = " + offerCpus + " and mem =" + offerMem);
- Create task ID.
Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();
- Create task info by setting task ID, adding resources, setting data and setting executor.
Protos.TaskInfo pinUserProfileTaskInfo = Protos.TaskInfo.newBuilder().setName("task " + taskID.getValue()).setTaskId(taskID).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(CPUS_PER_TASK))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(MEM_PER_TASK))).setData(ByteString.copyFromUtf8(crawlQueue.get(0))).setExecutor(Protos.ExecutorInfo.newBuilder(pinUserProfileExecutor)).build();
- Launch the tasks through the SchedulerDriver.
... taskInfoList.add(pinUserProfileTaskInfo);taskInfoList.add(pinUserBoardTaskInfo);}schedulerDriver.launchTasks(offer.getId(), taskInfoList);
- Status update
@Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {... }
- Stop the SchedulerDriver if tasks are finished
if (taskStatus.getState() == Protos.TaskState.TASK_FINISHED) { finishedTasks++; LOGGER.info("Finished tasks : " + finishedTasks); if (finishedTasks == totalTasks) { schedulerDriver.stop(); } }
- Abort the SchedulerDriver if the tasks are killed, lost or failed
if (taskStatus.getState() == Protos.TaskState.TASK_FAILED
|| taskStatus.getState() == Protos.TaskState.TASK_KILLED
|| taskStatus.getState() == Protos.TaskState.TASK_LOST) {
LOGGER.error("Aborting because the task " + taskStatus.getTaskId().getValue() + " is in unexpected state : "
+ taskStatus.getState().getValueDescriptor().getName() + "with reason : " + taskStatus.getReason()
.getValueDescriptor()
.getName()
+ " from source : " + taskStatus.getSource().getValueDescriptor().getName() + " with message : "
+ taskStatus.getMessage());
schedulerDriver.abort();
}
- Framework Message
- Handle your message
Complete code is available here with the instructions to run and sample output.@Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, byte[] bytes) { String data = new String(bytes); System.out.println(data); LOGGER.info("User Profile Information : " + data); }
Happy Learning! :)