#!/usr/bin/env python
#
# Copyright (c) 2013 In-Q-Tel, Inc/Lab41, All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module is the main controller code for running clients that sit in this
directory.
Created on 19 August 2013
@author: Charlie Lewis
"""
from multiprocessing import Pool
from socket import *
from hemlock_debugger import Hemlock_Debugger
import ast
import datetime
import hashlib
import MySQLdb as mdb
import sys
import time
[docs]class Hemlock_Base():
"""
This class is responsible for validating clients and controlling the
orchestration between clients the Hemlock metadata/data store.
"""
def __init__(self):
self.log = Hemlock_Debugger()
self.SERVER_CREDS_FILE = '../hemlock_creds'
[docs] def client_import(self, debug, client):
"""
Imports the client specific as a python module.
:param debug: instance of
:class:`~hemlock.clients.hemlock_debugger.Hemlock_Debugger`
:param client: string containing the name of the technology of the
client i.e. mysql
:return: returns string name of client credential file and instance of
the client class
"""
self.log.debug(debug, "Importing: h"+client)
exec "import h"+client
cmd = "h"+client+".H"+client.title()+"()"
self.log.debug(debug, "Initializing: "+cmd)
c_inst = eval(cmd)
self.log.debug(debug, "Client handle: "+str(c_inst))
return client+'_creds', c_inst
[docs] def get_creds(self, debug, CLIENT_CREDS_FILE):
"""
Gets the credentials for connecting the client and the credentials for
connecting to the Hemlock server.
:param debug: instance of
:class:`~hemlock.clients.hemlock_debugger.Hemlock_Debugger`
:param CLIENT_CREDS_FILE: path to file containing the client
credentials
:return: returns two dictionaries of the client and server credentials
"""
client_dict = {}
server_dict = {}
# read in client creds file
try:
self.log.debug(debug, "Opening client_creds file: "+CLIENT_CREDS_FILE)
f = open(CLIENT_CREDS_FILE, 'r')
self.log.debug(debug, "Client creds file handle: "+str(f))
for line in f:
self.log.debug(debug, line)
if len(line) > 0 and line[0] != "#" and "=" in line:
# split each line on the first '='
line = line.split("=",1)
try:
client_dict[line[0]] = line[1].strip()
except:
print "Malformed Client Creds file."
self.log.debug(debug, str(sys.exc_info()[0]))
sys.exit(0)
f.close()
except:
print "Unable to open "+CLIENT_CREDS_FILE
self.log.debug(debug, str(sys.exc_info()[0]))
sys.exit(0)
# read in hemlock server creds file
try:
self.log.debug(debug, "Opening server_creds file: "+self.SERVER_CREDS_FILE)
f = open(self.SERVER_CREDS_FILE, 'r')
self.log.debug(debug, "Server creds file handle: "+str(f))
for line in f:
self.log.debug(debug, line)
if len(line) > 0 and line[0] != "#" and "=" in line:
# split each line on the first '='
line = line.split("=",1)
try:
server_dict[line[0]] = line[1].strip()
except:
print "Malformed Server Creds file."
self.log.debug(debug, str(sys.exc_info()[0]))
sys.exit(0)
f.close()
except:
print "Unable to open "+self.SERVER_CREDS_FILE
self.log.debug(debug, str(sys.exc_info()[0]))
sys.exit(0)
return client_dict, server_dict
[docs] def verify_system(self, debug, client_uuid, server_dict):
"""
Verifies that the system supplied exists in the Hemlock system.
:param debug: instance of
:class:`~hemlock.clients.hemlock_debugger.Hemlock_Debugger`
:param client_uuid: uuid of the client system that is being verified
:param server_dict: credentials for connecting to the Hemlock server to
be able to verify the client system
"""
# verify the client system is registered
# required fields in the server creds file are as follows:
# HEMLOCK_MYSQL_SERVER
# HEMLOCK_MYSQL_USERNAME
# HEMLOCK_MYSQL_PW
try:
h_server = mdb.connect(server_dict['HEMLOCK_MYSQL_SERVER'],
server_dict['HEMLOCK_MYSQL_USERNAME'],
server_dict['HEMLOCK_MYSQL_PW'],
"hemlock")
self.log.debug(debug, "MySQL connection handle: "+str(h_server))
cur = h_server.cursor()
self.log.debug(debug, "MySQL cursor handle: "+str(cur))
query = "SELECT * from systems WHERE uuid='"+client_uuid+"'"
self.log.debug(debug, "Executing mysql query: "+query)
a = cur.execute(query)
if a == 0:
print client_uuid,"is not a valid system."
sys.exit(0)
h_server.commit()
h_server.close()
self.log.debug(debug, "Successfully closed the mysql connection.")
except:
print "Failure connecting to the Hemlock server"
self.log.debug(debug, str(sys.exc_info()[0]))
sys.exit(0)
return
[docs] def connect_server(self, debug, server_dict, no_couchbase):
"""
Connects to the Hemlock couchbase server.
:param debug: instance of
:class:`~hemlock.clients.hemlock_debugger.Hemlock_Debugger`
:param server_dict: credentials for connecting to the Hemlock server to
be able to verify the client system
:param no_couchbase: flag of whether or not to use a couchbase
connection or an elasticsearch connection
:return: returns an instance of the couchbase connection
"""
# connect to the hemlock server
# required fields in the server creds file are as follows:
# HEMLOCK_COUCHBASE_SERVER
# HEMLOCK_COUCHBASE_BUCKET
# HEMLOCK_COUCHBASE_USERNAME
# HEMLOCK_COUCHBASE_PW
h_server = ""
# make using couchbase optional
if no_couchbase == 1:
import pyes
# connect to the elasticsearch server
try:
h_server = pyes.ES(("http", server_dict['HEMLOCK_ELASTICSEARCH_ENDPOINT'], "9200"))
self.log.debug(debug, "ElasticSearch connection handle: "+str(h_server))
except:
print "Failure connecting to the Hemlock server"
self.log.debug(debug, str(sys.exc_info()[0]))
sys.exit(0)
else:
import couchbase
try:
h_server = couchbase.Couchbase.connect(host=server_dict['HEMLOCK_COUCHBASE_SERVER'],
bucket=server_dict['HEMLOCK_COUCHBASE_BUCKET'],
username=server_dict['HEMLOCK_COUCHBASE_USERNAME'],
password=server_dict['HEMLOCK_COUCHBASE_PW'])
self.log.debug(debug, "Couchbase connection handle: "+str(h_server))
except:
print "Failure connecting to the Hemlock server"
self.log.debug(debug, str(sys.exc_info()[0]))
sys.exit(0)
return h_server
[docs] def send_data(self, debug, data_list, desc_list, h_server, client_uuid, no_couchbase):
"""
Sends data to the Hemlock couchbase server that is recieved from the
client system.
:param debug: instance of
:class:`~hemlock.clients.hemlock_debugger.Hemlock_Debugger`
:param data_list: array of arrays containing data from the client
:param desc_list: list containing a corresponding schema to the data,
can be empty
:param h_server: instnace of the couchbase connection
:param client_uuid: uuid of the client system
:param no_couchbase: flag of whether or not to use a couchbase
connection or an elasticsearch connection
"""
j_dict = {}
j = 0
i = 0
e = 0
if no_couchbase:
import pyes
else:
import couchbase
# DEBUG
for table_data in data_list:
t_dict = {}
for record in table_data:
j_dict = {}
k = 0
while k < len(record):
rec = record[k]
if type(rec) == datetime.datetime:
rec = str(rec)
j_dict[desc_list[j][k][0]] = rec
k += 1
uid = hashlib.sha1(repr(sorted(j_dict.items())))
j_dict['hemlock-system'] = client_uuid
j_dict['hemlock-date'] = time.strftime('%Y-%m-%d %H:%M:%S')
if no_couchbase:
h_server.index(j_dict, 'hemlock', 'hemlockDocument', uid.hexdigest(), bulk=True)
# !! TODO this should be a parameter, not hardcoded
if i % 250000 == 0:
h_server.refresh()
else:
t_dict[uid.hexdigest()] = j_dict
# requires couchbase 1.0 client
# !! TODO this should be a parameter, not hardcoded
if len(t_dict) > 250000:
try:
h_server.set_multi(t_dict, format=couchbase.FMT_JSON)
except:
e += 1
print "Failure."
t_dict = {}
i += 1
if no_couchbase:
h_server.refresh()
else:
# requires couchbase 1.0 client
if t_dict:
try:
h_server.set_multi(t_dict, format=couchbase.FMT_JSON)
except:
e += 1
print "Failure."
j += 1
# DEBUG
print i,"records"
print e,"errors"
return
[docs] def update_hemlock(self, debug, client_uuid, server_dict):
"""
Sends data to the Hemlock couchbase server that is recieved from the
client system.
:param debug: instance of
:class:`~hemlock.clients.hemlock_debugger.Hemlock_Debugger`
:param client_uuid: uuid of the client system
:param server_dict: credentials for connecting to the Hemlock server to
be able to verify the client system
"""
# update mysql record to say when data was last updated for this system
# DEBUG
try:
h_server = mdb.connect(server_dict['HEMLOCK_MYSQL_SERVER'],
server_dict['HEMLOCK_MYSQL_USERNAME'],
server_dict['HEMLOCK_MYSQL_PW'],
"hemlock")
cur = h_server.cursor()
query = "UPDATE systems SET updated_data='"+time.strftime('%Y-%m-%d %H:%M:%S')+"' WHERE uuid='"+client_uuid+"'"
cur.execute(query)
h_server.commit()
h_server.close()
except:
print "Failure connecting to the Hemlock server"
sys.exit(0)
return
[docs] def stream_callback(self, data):
"""
Callback for hstream_odd, should only happen if something failed.
:param data: data that failed
"""
print data
[docs] def stream_workers(self, debug):
"""
Spawns asyncronous workers when calling an hstream_odd client.
:param debug: instance of
:class:`~hemlock.clients.hemlock_debugger.Hemlock_Debugger`
"""
# DEBUG
objects= [0] * 10
pool = Pool(processes=4)
for obj in objects:
pool.apply_async(call_worker, callback=self.stream_callback)
pool.close()
pool.join()
# #Hemlock_Base().send_data(data_list, desc_list, h_server, client_uuid)
# #Hemlock_Base().update_hemlock(client_uuid, server_dict)
[docs] def print_help(self):
"""
Prints out help for the hemlock_base class.
"""
print "--uuid \t<uuid of system> (use 'system-list' on the Hemlock server)"
print "--client \t <name of client> (client file must exist in the clients folder)"
print "-h \thelp\n"
sys.exit(0)
[docs] def process_args(self, debug, args):
"""
Processes the arguments passed in to ensure that the right ones are
supplied before trying to execute against them.
:param debug: instance of
:class:`~hemlock.clients.hemlock_debugger.Hemlock_Debugger`
:param args: list of arguments that are passed in
:return: returns client system uuid, the client technology to use, and
the number of splits (defaults to -1 if not supplied)
"""
# process args
splits = -1
client = None
client_uuid = None
i = 0
# DEBUG
if not args:
self.print_help()
while i < len(args):
if args[i] == "--uuid":
try:
client_uuid = args[i+1]
i += 1
except:
self.print_help()
elif args[i] == "--client":
try:
client = args[i+1]
i += 1
except:
self.print_help()
elif args[i] == "--splits":
try:
splits = args[i+1]
i += 1
except:
splits = -1
else:
self.print_help()
i += 1
if not client or not client_uuid:
self.print_help()
return client_uuid, client, splits
[docs] def get_args(self, debug):
"""
Gets the arguments from the command line.
:param debug: instance of
:class:`~hemlock.clients.hemlock_debugger.Hemlock_Debugger`
:return: returns list of arguments
"""
# DEBUG
args = []
for arg in sys.argv:
args.append(arg)
return args[1:]