API

This lists all classes and associated methods implemented in yacs.

Master

class yacs.component.master.Master(logger: logging.RootLogger)

Logic that implements a master node in the cluster. It handles the following:

  • Listen for job requests (on port 5000)

  • Listen for updates from workers (on port 5001)

  • Perform scheduling of tasks based on the specifed policy

Parameters

logger (logging.RootLogger) – instance of logger with custom formatting

config(path_to_config: str) → object

Read in config file for worker information and set data structures realted to workers and initialise the number of free slots per worker For format of config file, please refer Dev Docs

Parameters

path_to_config (str) – path to config file on local system

get_sched_polices() → List[str]

Return a list of available scheduling policies

The following are available: * LL: Least Loaded * RR: Round Robin * R : Random

set_master_ip(ip: str = '127.0.0.1') → object

Set ip address of master

Parameters

ip (str) – ip address of master node

set_sched_policy(sched_policy: str = 'LL') → object

Sets the scheduling policy for the Master

Parameters

sched_policy (str) – scheduling policy to be configured for the Master

start() → None

Responsible for spawning threads and catching signals for graceful termination. Threads spawned:

  • Listen to incoming job requests

  • Listen to updates from workers

  • Poll job queue

  • Poll update queue

  • Send tasks to workers to be executed

For more details refer Master

Schedulers

class yacs.component.scheduler.LeastLoaded(master: object)

Class implementing logic for the Least Loaded (LL) scheduling policy

The state of all workers is checked and the worker with most number of free slots is selected and a task is scheduled and subsequently launched there. If no worker has free slots available then the scheduler will wait for 1 second and repeats the same process again.

Parameters

master (Master) – reference to the Master object to access and modify relevant data structures

is_buffer_empty() → bool

Check if task pool is empty

schedule_tasks() → None

Schedule tasks from the task pool according to the Least Loaded (LL) policy. This will run till all the tasks in the buffer have been scheduled.

class yacs.component.scheduler.Random(master: object)

Class implementing logic for the Random (R) scheduling policy.

A worker machine is picked at random and checked for availability of free slots If slots are available, a task is scheduled and subsequently launched on this machine, if no free slots are found then it chooses another machine at random and this process continues till a free slot is found.

Parameters

master (Master) – reference to the Master object to access and modify relevant data structures

is_buffer_empty() → bool

Check if task pool is empty

schedule_tasks() → None

Schedule tasks from the task pool according to the Random (R) policy. This will run till all the tasks in the buffer have been scheduled.

class yacs.component.scheduler.RoundRobin(master: object)

Class implementing logic for the Round Robin (RR) scheduling policy

The worker machines are ordered based on the Worker IDs. A worker is picked in a round robin fashion and if the worker does not have free slots then the next worker is picked. This continues till a worker with free slots is found.

Parameters

master (Master) – reference to the Master object to access and modify relevant data structures

is_buffer_empty() → bool

Check if task pool is empty

schedule_tasks() → None

Schedule tasks from the task pool according to the Round Robin (RR) policy. This will run till all the tasks in the buffer have been scheduled.

class yacs.component.scheduler.Scheduler(master: object, policy: str = 'LL')

A single point of access for scheduling needs, acts as a wrapper around the three scheduling policies.

Parameters
  • master (Master) – reference to the Master object to access and modify relevant data structures

  • policy (str) – scheduling policy to use

schedule_tasks() → None

Interface for triggering scheduling functionality of the respective scheduler based on the scheduling policy used

Listener

class yacs.component.listener.Listener(sock: socket.socket, client_addr: tuple, queue: queue.Queue, name: str = '', buff_size: int = 2048)

Implements an abstraction for a connection to either a client or a worker sending updates. A Listener inherits from the threading.Thread class and maintains each connection in a seperate thread.

Parameters
  • sock (socket.socket) – socket of the incoming connection

  • client_addr (tuple) – (ip, port) of the incoming connection

  • queue (queue.Queue) – shared queue to put received jobs/updates. For more information on this, refer Dev Docs.

  • name (str) – name of the type of listener (JOB_LISTENER, UDPATE_LISTENER)

  • buff_size (int) – maximum chunk of data that can be recieved at once.

run() → None

Recieves data and enqueues it in its appropriate queue.

Worker

class yacs.component.worker.Worker(port: int, worker_id: str, logger: logging.RootLogger, host: str = '127.0.0.1', master_port: int = 5001)

Logic that implements a worker node in the cluster.

It handles the following:

  • Listen for task submissions from the Master on port

  • Simulate execution of tasks and send updates to Master on port 5001

Parameters
  • port (str) – port on which the Worker listens for incoming tasks.

  • worker_id – unique identifier for the worker.

  • logger (logging.RootLogger) – instance of logger with custom formatting

  • host (str) – ip address of the Master node.

  • master_port (int) – port of Master to which the Worker should send updates on.

initialize_connection() → None

Create a socket to listen for task submissions from the Master

start_worker() → None

Start the worker by spawning threads for the following:

  • Listen for incoming tasks from the Master

  • Send updates of completed tasks to Master

  • Simulate execution of tasks

For more information on this please refer Worker

Utils

exception yacs.component.utils.errors.ThreadTerminate

Wrapper exception for when a thread terminates

yacs.component.utils.errors.handle_thread_terminate(signum: int, frame: object) → None

Handler function for graceful termination