Viewing file: client_example.py (13.12 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#! /usr/bin/python # """ @package client_example
This package provide an example of a python script that queries a Cloudmark Authority Server using the pycurl module (which is available from http://http://pycurl.sourceforge.net/ )
""" import sys import getopt import json import pycurl from StringIO import StringIO import mbox
g_raw_output = False g_usage_string = "usage: client_example.py [-r] [-I ip] [-a file] [-m file] [-C type] host"
def do_GET_request(c, url, req, arg) : """ Send a GET request to the server.
Format and send a get request to the server. On success, return the response as a string. On failure print the error returned by curl and return an empty string
Keyword Arguments: c -- the curl handle url -- the url of the host (including port if necessary) req -- the actual request arg -- the arguments to the request as a list of strings or None """ buf = StringIO() # Allocate a stringIO instance to receive the response
# Build up the request. req = url + req if arg is not None: req += ('/' + arg)
try : c.setopt(c.URL, req) #set the host url c.setopt(c.WRITEDATA, buf) #set the receive buffer c.perform() #and let curl do its magic. except pycurl.error, reason : print "Curl error: ", reason[1] return ''
return buf.getvalue() #return the response
def do_POST_request(c , url, req, parts) : """ Send a POST request to the server.
Format and send a get request to the server. On success, return the response as a string. On failure print the error returned by curl and return an empty string
Keyword Arguments: c -- the curl handle url -- the url of the host (including port if necessary) req -- the actual request parts -- the parts of the POST as a dictionary keyed by part name """ buf = StringIO() # Allocate a stringIO instance to receive the response
# extract the form parts from the dictionary and build them up into # a list of tuples as pycurl expects, the add the parts to the form formpart = [] if parts : for k,v in parts.iteritems() : formpart += [(k, (pycurl.FORM_CONTENTS, v))] if formpart : c.setopt(pycurl.HTTPPOST, formpart)
req = url + req try : c.setopt(c.URL, req) #set the host url c.setopt(c.WRITEDATA, buf) #set the receive buffer c.perform() #and let curl do its magic. response = c.getinfo(c.RESPONSE_CODE) # fetch our http response except pycurl.error, reason : print "Curl error: ", reason[1] return (-1, '')
return (response, buf.getvalue()) #return the response
def fetch_element(key, dict) : """ Fetch an element of a dictionary if it exists.
Return the specified element of the dictionary if it exists, else return an empty string.
Keyword Arguments. key -- the key to search for dict -- the dictionary """
return dict[key] if key in dict else ''
def fetch_element_or_none(key, dict) : """ Fetch an element of a dictionary if it exists.
Return the specified element of the dictionary if it exists, else return None.
Keyword Arguments. key -- the key to search for dict -- the dictionary """ return dict[key] if key in dict else None
def check_JSON_response(j) : """ Check for an error in a JSON response
Test for the presence of an HTTP error in a JSON response returned by the server. If one is found, print an appropriate message and return False. Otherwise return True.
Keyword Arguments j -- The JSON string to test """ h = fetch_element_or_none("http_error", j) if h : e = h["error"] d = h["details"] print "Authority Server returned HTTP error: ", e, " details: ", d return False return True
def do_ping(c, host) : """ Send a ping to the server.
Keyword Arguments: c -- the curl handle host -- the server URL """ resp = do_GET_request(c, host, "/score/v2/ping", None) if resp : if g_raw_output : print resp else : x = json.JSONDecoder().decode(resp) if check_JSON_response(x) : print 'Ping OK' else : print "Empty Response"
def do_max_message_size(c, host) : """ Request the maximum message size information from the server.
Keyword Arguments: c -- the curl handle host -- the server URL """ resp = do_GET_request(c, host, "/score/v2/max-message-size", None) if resp : if g_raw_output : print resp else : x = json.JSONDecoder().decode(resp) if check_JSON_response(x) : print "max message size: ", x["maxMessageSize"] else : print "Empty Response"
def do_licensing(c, host): """ Request the licensing information from the server.
Keyword Arguments: c -- the curl handle host -- the server URL """ resp = do_GET_request(c, host, "/score/v2/license", None) if resp : if g_raw_output : print resp else : x = json.JSONDecoder().decode(resp) if check_JSON_response(x) : print "licensing info:" for k,v in x.iteritems() : if k != "query_status" : print " ", k , ": ", v
else : print "Empty Response"
def do_microupdate_versions(c, host): """ Request the current microupdate versions form the server.
Keyword Arguments: c -- the curl handle host -- the server URL """ resp = do_GET_request(c, host, "/score/v2/microupdate-versions", None) if resp : if g_raw_output : print resp else : x = json.JSONDecoder().decode(resp) if check_JSON_response(x) : print "microupdate versions:" for k,v in x.iteritems() : if k != "query_status" : print " ", k , ": ", v
else : print "Empty Response"
def do_supported_attributes(c, host): """ Request the supported attributer information form the server.
Keyword Arguments: c -- the curl handle host -- the server URL """ resp = do_GET_request(c, host, "/score/v2/supported-attributes", None) if resp : if g_raw_output : print resp else : x = json.JSONDecoder().decode(resp) if check_JSON_response(x) : print "supported attributes:" att = fetch_element_or_none("resultAttributes", x) if att : for a in att : print " ", a["attribute"] else : print "Empty Response"
def do_runtime(c, host): """ Request the runtime information form the server.
Keyword Arguments: c -- the curl handle host -- the server URL """ resp = do_GET_request(c, host, "/score/v2/runtime", None) if resp : if g_raw_output : print resp else : x = json.JSONDecoder().decode(resp) if check_JSON_response(x) : print "runtime information" for k,v in x.iteritems() : if type(v) is list : s = "" for ve in v : if s != "" : s += ", " s += ve v = s print " ", k , ": ", v
else : print "Empty Response"
def do_score_ip(c, host, ip) : """ Score an IP address
Keyword Arguments: c -- the curl handle host -- the server URL ip -- the ip addr to score """
resp = do_GET_request(c, host, "/score/v2/ip", ip) if resp : if g_raw_output : print resp else : x = json.JSONDecoder().decode(resp) if check_JSON_response(x) : print "/score/v2/ip response - score: ", fetch_element("score", x)
else : print "Empty Response"
def do_score_analysis(c, host, analysis) : """ Score an analysis string
Keyword Arguments: c -- the curl handle host -- the server URL analysis -- the analysis string to score """ pst = {} pst["analysis"] = analysis #the only form part is the analysis string
(code, resp) = do_POST_request(c, host, "/score/v2/analysis", pst) if resp : if g_raw_output : print resp else : x = json.JSONDecoder().decode(resp) if code >= 400 : print "server returned ", code, ". code: ", fetch_element("code", x), " message: ", fetch_element("message", x) elif check_JSON_response(x) : print "/score/v2/analysis response - score: ", fetch_element("score", x)
else : print "Empty Response"
def do_score_general(c, host, func, rfc822, helo_domain, mail_from, receipt_to, conn_ip) : """ Score a message using either the score_message, score_content or score_sender calls.
Keyword Arguments: c -- the curl handle host -- the server URL func -- either "message", "content" or "sender" rfc822 -- the entire mail, as a string helo_domain -- the HELO domain from the envelope, or None. mail_from -- the MAIL FROM field from the enelope, or None. receipt_to -- a list of receipt_to from the the RCPT TO field in the envelope, or None connip -- the connecting IP address, or None """ pst = {} pst["rfc822"] = rfc822
if helo_domain : pst["helo_domain"] = helo_domain if mail_from : pst["mail_from"] = mail_from if receipt_to : pst["rcpt_to"] = receipt_to if conn_ip : pst["conn_ip"] = conn_ip
(code, resp) = do_POST_request(c, host, func, pst) if resp : if g_raw_output : print resp else : x = json.JSONDecoder().decode(resp) if code >= 400 : print "server returned ", code, ". code: ", fetch_element("code", x), " message: ", fetch_element("message", x) elif check_JSON_response(x) : r = func + " response - score " r += str(fetch_element("score", x)) r += " analysis: " r += fetch_element("analysis", x) print r
else : print "Empty Response"
def main(argv) : """ The mainline """ global g_raw_output ipaddr = None score_method = "message" #set the default vale for the score method mbox_file = None analysis_file = None
c = pycurl.Curl() #initial the curl library
#fetch the command line args. try : opts, args = getopt.gnu_getopt(argv, "rI:a:C:m:", "") except getopt.GetoptError, reason : print reason sys.exit(g_usage_string)
# Must specify a host. if args : host = args[0] else : print "No host specified" sys.exit(g_usage_string)
# Run through our command line args and set the appropriage flags if opts : for o, a in opts : if o == '-I' : # Score an IP address ipaddr = a elif o == '-r' : # Print the raw JSON response g_raw_output = True elif o == '-C' : # Set the score method to use with -m score_method = a elif o == '-m' : # An mbox or single mail to score mbox_file = a elif o == '-a' : # An analysis file to score. analysis_file = a
if ((score_method != "message") & (score_method != "content") & (score_method != "sender")) : print "Arg to -C must be message, content or sender" sys.exit(g_usage_string)
# If no actions specified on the command line, just fetch config # and status information if (not ipaddr) & (not mbox_file) & (not analysis_file) : do_ping(c, host) print do_max_message_size(c, host) print do_licensing(c, host) print do_microupdate_versions(c, host) print do_supported_attributes(c, host) print do_runtime(c, host)
# score an ip if ipaddr : do_score_ip(c, host, ipaddr)
if mbox_file : try : box = mbox.mbox(mbox_file) except IOError, reason: print reason sys.exit(g_usage_string)
for m in box : subj = m.fetch_header("Subject: ") if subj : print subj do_score_general(c, host, "/score/v2/"+score_method, m.text, None, None, None, None) print
if analysis_file : try: f = open(analysis_file, "r") except IOError, reason: print reason sys.exit(g_usage_string)
for l in f : do_score_analysis(c, host, l)
c.close() # close the curl connection if __name__ == "__main__" : main(sys.argv[1:])
|