接上一遍
客户端代码如下
#-*- coding: UTF-8 -*-
__author__ = 'tiger'
#!/usr/bin/env python
import zmq, time, sys, os, atexit
from signal import SIGTERM
import random
import logging
#import ConfigParser
import subprocess
from logging.handlers import RotatingFileHandler
import socket
import fcntl
import struct
import signal
#定义日志函数
def mylog(logfile):
rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3)
formatter = logging.Formatter(
'%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S')
rthandler.setFormatter(formatter)
log = logging.getLogger()
log.setLevel(logging.INFO)
log.addHandler(rthandler)
return log
#获取指定接口的IP地址
def get_ip_address(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24])
#定义命令或者脚本执行超时所触发的错误
class ProcessTimeout(Exception):
pass
#定义命令或者脚本执行超时所触发的错误句柄
def timeout_handler(signum, frame):
raise ProcessTimeout
#执行命令的函数
def exec_shell(task, rundir=None, timeout=None):
#定义超时
if timeout:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
p = subprocess.Popen(task, bufsize=0, shell=True, cwd=rundir, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
stdout, stderr = p.communicate()
returncode = p.poll()
signal.alarm(0)
except ProcessTimeout:
p.stdout.close()
p.stderr.close()
p.terminate()
stderr = 'Calculation was taking too long, so I killed it dead.\n'
returncode = 'timeout'
del p
#返回的信息,如果成功返回标准输出
#错误返回标准错误
if returncode == 0:
return [returncode, stdout]
else:
return [returncode, stderr]
#任务类别为脚本时调用该函数
def exec_script(fileserver, script, dirpath, rundir=None, timeout=None):
fileurl = fileserver + script['script_name']
#去http服务器下载脚本,直接调用系统的wget命令下载
getfile = 'wget -N -P ' + dirpath + ' ' + fileurl
filename = dirpath + script['script_name']
task = script['script_env'] + ' ' + filename
getfile_res = exec_shell(getfile, rundir=rundir, timeout=timeout)
if getfile_res[0] == 0:
task_res = exec_shell(task, rundir=rundir, timeout=timeout)
try:
os.remove(filename)
except Exception, err:
task_res[1] = task_res[1] + str(err)
return task_res
else:
return getfile_res
#与master建立连接完成与master的通信
def ioloop(sock_file, homedir, mip, stdout):
#定义运行目录,默认情况下,脚或者命令在该目录执行
dirpath = homedir + '/run/'
#定义日常函数
log = mylog(stdout)
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
socket.connect(sock_file)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
#循环
while 1:
#发送请求信息
try:
socket.send_pyobj({'req_type': 'task', 'ip': mip})
except Exception, err:
log.warn(str(err))
socket.close()
context.term()
time.sleep(random.randint(1, 5))
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
socket.connect(sock_file)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
continue
#服务端响应超时
if poller.poll(20 * 1000):
rep = socket.recv_pyobj()
#如果有响应信息,判断响应的类别。
try:
rep_type = rep['rep_type']
except Exception, err:
log.info(str(err))
time.sleep(random.uniform(0.8, 1.2))
continue
#如果响应类别为newtask,则获取本次任务所需的其他信息
if rep_type == 'newtask':
try:
job_id = rep['id']
job_task = rep['task']
job_type = rep['type']
cmd_timeout = rep['cmdtimeout']
rundir = rep['rundir']
log.warn('start new job ' + str(rep))
except Exception, err:
if job_id:
socket.send_pyobj(
{'id': job_id, 'code': '99', 'info': str(err), 'ip': mip, 'req_type': 'report'})
socket.recv_pyobj()
time.sleep(random.uniform(0.8, 1.2))
log.warn(str(err) + str(rep))
continue
#如果任务类别是脚本,则尝试获取执行脚本所需的其他信息
if job_type == 's':
try:
script_env = rep['env']
script = {'script_name': job_task, 'script_env': script_env}
fileserver = rep['fileserver']
#调用运行脚本的函数执行脚本
if rundir == 'None':
res = exec_script(fileserver, script, dirpath, rundir=dirpath, timeout=cmd_timeout)
else:
res = exec_script(fileserver, script, dirpath, rundir=rundir, timeout=cmd_timeout)
except Exception, err:
log.warn(str(err))
continue
#任务类别为其他时则统一当作命令执行
else:
if rundir == 'None':
res = exec_shell(job_task, rundir=dirpath, timeout=cmd_timeout)
else:
res = exec_shell(job_task, rundir=rundir, timeout=cmd_timeout)
#将执行结果返回给master,标记请求类别为report,然master知道该请求是任务报告请求。
socket.send_pyobj({'id': job_id, 'code': res[0], 'info': res[1], 'ip': mip, 'req_type': 'report'})
socket.recv_pyobj()
log.info(str({'id': job_id, 'code': res[0], 'info': res[1], 'ip': mip}))
time.sleep(random.uniform(0.8, 1.2))
else:
time.sleep(random.uniform(0.8, 1.2))
else:
#响应超时时尝试重连master端
log.warn("master server connect time out,will colse current socket,try again.")
socket.close()
context.term()
time.sleep(random.randint(1, 5))
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
socket.connect(sock_file)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
socket.close()
context.term()
def ip_check(ip):
q = ip.split('.')
return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \
map(int, filter(lambda x: x.isdigit(), q)))) == 4
class Daemon:
def __init__(self, pidfile, sock_file, homedir, mip, stdin='/dev/null', stdout='/dev/null',
stderr='/dev/null'):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.pidfile = pidfile
self.homedir = homedir
self.sock_file = sock_file
self.mip = mip
def _daemonize(self):
#脱离父进程
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
#脱离终端
os.setsid()
#修改当前工作目录
os.chdir("/")
#重设文件创建权限
os.umask(0)
#第二次fork,禁止进程重新打开控制终端
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, 'r')
so = file(self.stdout, 'a+')
se = file(self.stderr, 'a+', 0)
#重定向标准输入/输出/错误
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
#注册程序退出时的函数,即删掉pid文件
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile, 'w+').write("%s\n" % pid)
def delpid(self):
os.remove(self.pidfile)
def start(self):
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self._daemonize()
self._run()
def stop(self):
# Get the pid from the pidfile
try:
pf = file(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print str(err)
sys.exit(1)
def restart(self):
self.stop()
self.start()
def _run(self):
pass
#重写Daemon的_run函数实现自己的Daemon进程
class MyDaemon(Daemon):
def _run(self, ):
ioloop(self.sock_file, self.homedir, self.mip, self.stdout)
#定义主函数,创建相关运行目录和定义日志路径等
def main():
homedir = os.getcwd()
for i in ('log', 'run'):
path = homedir + '/' + i
if not os.path.exists(path):
os.makedirs(path, 0755)
stdout = homedir + '/log' + '/oaos_client.log'
stderr = homedir + '/log' + '/oaos_client.err'
pidfile = homedir + '/run' + '/oaos_client.pid'
#master的tcp接口
sock_file = "tcp://192.168.4.194:7777"
#该接口是指用来与master通信的客户端IP接口
ifname = 'eth0'
try:
mip = get_ip_address(ifname)
except Exception, err:
print err
sys.exit(3)
daemon = MyDaemon(pidfile, sock_file, homedir, mip, stdout=stdout, stderr=stderr)
if len(sys.argv) == 2:
if 'start' == sys.argv[1]:
daemon.start()
elif 'stop' == sys.argv[1]:
daemon.stop()
elif 'restart' == sys.argv[1]:
daemon.restart()
else:
print "Unknown command"
sys.exit(2)
sys.exit(0)
else:
print "usage: %s start|stop|restart" % sys.argv[0]
sys.exit(2)
if __name__ == "__main__":
main()增加任务的接口:
#-*- coding: UTF-8 -*-
__author__ = 'tiger'
#!/usr/bin/env python
import zmq, time, os
#import random
import subprocess
import logging, ConfigParser
from logging.handlers import RotatingFileHandler
#定义日志函数
def mylog(logfile):
rthandler = RotatingFileHandler(logfile, 'a', maxBytes=50 * 1024 * 1024, backupCount=3)
formatter = logging.Formatter(
'%(levelname)s %(thread)d %(threadName)s %(process)d %(funcName)s %(asctime)s %(filename)s[line:%(lineno)d] %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S')
rthandler.setFormatter(formatter)
log = logging.getLogger()
log.setLevel(logging.INFO)
log.addHandler(rthandler)
return log
#定义IP检查判断IP是否正确
def ip_check(ip):
q = ip.split('.')
return len(q) == 4 and len(filter(lambda x: x >= 0 and x <= 255, \
map(int, filter(lambda x: x.isdigit(), q)))) == 4
#配置读取函数
def read_task(config_file):
cfg = config_file
config = ConfigParser.SafeConfigParser()
config.read(cfg)
#sections = config.sections()
#生产任务ID
job_id = int(time.time()) + 1
#定义IP字典,记录批量执行的IP地址
ip_dic = {}
#尝试获取相关配置变量
try:
master_sock = config.get("master", 'sock')
ip_list = config.get("ip_list", 'ip').split(',')
job_task = config.get("job_info", 'task')
except Exception, err:
print err
return 0
for i in ip_list:
if ip_check(i):
ip_dic[i] = 'N'
else:
print "ip error :%s" % i
return 0
#尝试获取任务的非必须变量,不存在设置相关默认值
try:
job_type = config.get("job_info", 'type')
except:
job_type = 'c'
try:
job_rundir = config.get("job_info", 'rundir')
except:
job_rundir = 'None'
try:
job_cmdtimeout = int(config.get("job_info", 'cmdtimeout'))
except:
job_cmdtimeout = 10
try:
job_jobtimeout = int(config.get("job_info", 'jobtimeout'))
except:
job_jobtimeout = 20
if job_type == 's':
try:
job_env = config.get("job_info", 'env')
except:
job_env = 'sh'
try:
job_fileserver = config.get("job_info", 'fileserver')
except:
job_fileserver = 'http://192.168.0.227/ser/'
job_info = {'id': job_id, 'type': job_type, 'task': job_task, 'jobtimeout': job_jobtimeout,
'cmdtimeout': job_cmdtimeout,'env': job_env, 'fileserver': job_fileserver, 'rep_type': 'newtask',
'rundir': job_rundir}
else:
job_info = {'id': job_id, 'type': job_type, 'task': job_task, 'jobtimeout': job_jobtimeout,
'cmdtimeout': job_cmdtimeout,'rep_type': 'newtask', 'rundir': job_rundir}
return [ip_dic, job_info, master_sock]
#变量说明,id代表任务的id号是个自增数,根据当前时间生成,job_type代表任务的类别,c代表命令s代表脚本,job_task代表具体的命令,如果是任务
#类别是命令则执行执行,如果是脚本,客户端将去根据脚本名去http服务器下载相关脚本,jobtimeout代表整个任务的超时时间,如果客户端没有在该事件内
#报告任务状态则超时,cmdtimeout代表客户端执行具体任务的超时时间,例如top命令如果不带参数将永远执行,cmdtimeout就是为了避免类似情况
#job_env 代表任务的执行环境,通常只适用于执行脚本,job_fileserver 当任务类别是脚本时,下载脚本的http服务器地址和路径,rep_type响应给客户端的状态
#如果响应的信息为newtask,客户端将知道是有新任务了,会尝试获取任务所需的其他信息。rundir任务运行的具体路径
def add_job(config_file, logfile):
#定义日常函数
log = mylog(logfile)
#读取配置文件
cfg_info = read_task(config_file)
#返回为非0时,表示配置文件读取成功并且符合预期。
if cfg_info != 0:
ip_dic = cfg_info[0]
job_info = cfg_info[1]
job_id = job_info['id']
#task = [ip_dic, job_info]
#生产请求信息,并且发送至master,请求添加新任务。
task = {'req_type': 'addjob', 'job_info': job_info, 'iplist': ip_dic}
#尝试建立与master的连接并且发生相关信息等待响应
sock_file = cfg_info[2]
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
try:
socket.connect(sock_file)
socket.send_pyobj(task)
except Exception, err:
log.info("connect to master error " +str(err))
print "connect to master error %s " %str(err)
#等待请求响应
report = socket.recv_pyobj()
#打印响应信息
print report
#记录到相关的日志,关闭与master的连接
log_file = report.split()[-1]
log.info(report)
socket.close()
context.term()
#尝试读取任务报告,直接使用系统自己带的tail命令
read_log = "tail -f " + log_file
p = subprocess.Popen(read_log, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
returncode = p.poll()
end_fag = 'job ' + str(job_id) + ' end'
print '----------------------job report start--------------------------'
while returncode is None:
line = p.stdout.readline()
returncode = p.poll()
line = line.strip()
print line
if line == end_fag:
break
print '----------------------job report end---------------------------'
else:
return
#定义主函数设置相关运行目录和配置文件名
def main():
config_file = 'task.ini'
logfile = 'log/oaos_add_job.log'
homedir = os.getcwd()
for i in ('log', 'run'):
path = homedir + '/' + i
if not os.path.exists(path):
os.makedirs(path, 0755)
add_job(config_file, logfile)
if __name__ == "__main__":
main()任务配置文件示例如下
[master]
sock=tcp://192.168.4.194:7777
[ip_list]
ip=192.168.4.195,192.168.4.196,192.168.4.197,192.168.4.198
[job_info]
type=c
task=ls -rlt
cmdtimeout=10
jobtimeout=10
env=sh
fileserver=http://192.168.0.227/ser/
rundir=/root/
实例演示见 简易linux集中管理工具三
One Response