Source code for hemlock.hemlock_scheduler

#!/usr/bin/env python
#
#   Copyright (c) 2013 In-Q-Tel, Inc/Lab41, All Rights Reserved.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

"""
This module controlls and runs the scheduler for performing actions between
client systems and the Hemlock server.

Created on 30 August 2013
@author: Charlie Lewis
"""

from apscheduler.scheduler import Scheduler
from clients.hemlock_debugger import Hemlock_Debugger
from hemlock import Hemlock

import logging
import MySQLdb as mdb
import os
import signal
import sys

[docs]class Hemlock_Scheduler(): """ This class is responsible for spawning and controlling the the scheduler and all operations that are scheduled in a cron-like fashion. """ def __init__(self): self.log = Hemlock_Debugger() self.sched = self.init_schedule() args = [] for arg in sys.argv: args.append(arg) try: self.path = args[1] except: print "No path provided, defaulting to 'hemlock_creds' in the current working directory" self.path = "hemlock_creds" try: self.server = args[2] except: print "No schedule server was provided." sys.exit(0) try: self.debug = args[3] except: self.debug = 0
[docs] def check_schedules(self): """ Checks for existing schedules, cleans up ones that no longer need to run, starts new ones that need to be scheduled. """ server_dict = {} # check environment variables first, then check for creds file try: server_dict['HEMLOCK_MYSQL_SERVER'] = os.environ['HEMLOCK_MYSQL_SERVER'] server_dict['HEMLOCK_MYSQL_USERNAME'] = os.environ['HEMLOCK_MYSQL_USERNAME'] server_dict['HEMLOCK_MYSQL_PW'] = os.environ['HEMLOCK_MYSQL_PW'] except: # read in hemlock server creds file try: self.log.debug(self.debug, "Opening server_creds file: "+self.path) f = open(self.path, 'r') self.log.debug(self.debug, "Server creds file handle: "+str(f)) for line in f: self.log.debug(self.debug, line) if len(line) > 0 and line[0] != "#" and "=" in line: # split each line on the first '=' line = line.split("=",1) try: server_dict[line[0]] = line[1].strip() except: print "Malformed Server Creds file." self.log.debug(self.debug, sys.exc_info()[0]) sys.exit(0) f.close() except: print "Unable to open "+self.path self.log.debug(self.debug, sys.exc_info()[0]) sys.exit(0) # connect to the mysql server try: m_server = mdb.connect(server_dict['HEMLOCK_MYSQL_SERVER'], server_dict['HEMLOCK_MYSQL_USERNAME'], server_dict['HEMLOCK_MYSQL_PW'], "hemlock") self.log.debug(self.debug, "MySQL Handle: "+str(m_server)) except: self.log.debug(self.debug, sys.exc_info()[0]) print "MySQL server failure" sys.exit(0) cur = m_server.cursor() self.log.debug(self.debug, "MySQL Cursor: "+str(cur)) # limit this to just the jobs for the server that is running this # scheduler query = "SELECT * FROM schedules WHERE schedule_server_id = '"+self.server+"'" cur.execute(query) results = cur.fetchall() self.log.debug(self.debug, str(results)) m_server.commit() m_server.close() # remove all jobs scheduled try: self.sched.unschedule_func(self.job_work) except: print "No jobs scheduled at this time, checking for new jobs to schedule." # read schedules that are stored for schedule in results: self.schedule_job_cron(self.job_work, server_dict, str(schedule[1]), str(schedule[3]), str(schedule[4]), str(schedule[5]), str(schedule[6]), str(schedule[7]))
[docs] def job_work(self, server_dict, name): """ Do the actual work that was scheduled at the scheduled tiem. :param server_dict: dictionary of server credentials :param name: uuid of the client """ # DEBUG # connect to the mysql server try: m_server = mdb.connect(server_dict['HEMLOCK_MYSQL_SERVER'], server_dict['HEMLOCK_MYSQL_USERNAME'], server_dict['HEMLOCK_MYSQL_PW'], "hemlock") self.log.debug(self.debug, "MySQL Handle: "+str(m_server)) except: self.log.debug(self.debug, sys.exc_info()[0]) print "MySQL server failure" sys.exit(0) # !! TODO try/except cur = m_server.cursor() self.log.debug(self.debug, "MySQL Cursor: "+str(cur)) cur.execute("SELECT * FROM schedules_clients WHERE schedule_id = '"+name+"'") results = cur.fetchall() self.log.debug(self.debug, str(results)) try: for cred in server_dict: os.environ[cred] = server_dict[cred] except: print "Unable to source hemmlock server credentials" # check for the client already running a process # if streaming is already running and requested again, ignore # if the job requested, regardless, is still running, skip this run, and log it cmd = "ps auxw | grep "+results[0][1]+" | grep -v color | wc -l" result = os.popen(cmd).read() if result[0] <= "1": # only run the client if there isn't already one running cur.execute("SELECT * FROM clients WHERE uuid = '"+results[0][1]+"'") client_results = cur.fetchall() self.log.debug(self.debug, str(client_results)) if client_results[0][2] == "0": cmd = "hemlock client-run --uuid "+results[0][1] # run without couchbase else: cmd = "hemlock client-run --uuid "+results[0][1]+" -z" result = os.system(cmd) else: # !! TODO try/except f = open('scheduler.log', 'a') f.write("The client: "+results[0][1]+" is already running, skipping this run.\n") f.close() # !! TODO try/except m_server.commit() m_server.close()
[docs] def init_schedule(self): """ Initialize the scheduler. :return: an instance of the scheduler. """ # DEBUG logging.basicConfig(filename='scheduler.log', level=logging.DEBUG) sched = Scheduler() # Start the scheduler sched.start() return sched
[docs] def schedule_job(self, function, periodicity, start_time): """ Schedule a new job. :param function: function to be called that does the work :param periodicity: how often to run the scheduled work :param start_time: when to start the job """ # DEBUG self.sched.add_interval_job(function, seconds=periodicity, start_date=start_time)
[docs] def schedule_job_cron(self, function, server_dict, name, minute, hour, day_of_month, month, day_of_week): """ Schedule a new cron job. :param function: function to be called that does the work :param server_dict: dictionary of server credentials :param name: name of the job :param minute: cron minute to run the job :param hour: cron hour to run the job :param day_of_month: cron day_of_month to run the job :param month: cron month to run the job :param day_of_week: cron day_of_week to run the job """ # DEBUG self.sched.add_cron_job(function, args=[server_dict, name], name=name, minute=minute, hour=hour, day=day_of_month, month=month, day_of_week=day_of_week)
if __name__ == "__main__": hemlock_scheduler = Hemlock_Scheduler() logging.basicConfig(filename='scheduler.log', level=logging.DEBUG) # DEBUG # run every 60 seconds hemlock_scheduler.schedule_job(hemlock_scheduler.check_schedules, 60, '2013-08-29 12:32:43') # APSScheduler.Scheduler only works until the main thread exits signal.pause()