Datasources with PythonCollector - Samples¶
Skeleton (no protocol)¶
import time
from Products.ZenEvents import ZenEventClasses
from ZenPacks.zenoss.PythonCollector.datasources.PythonDataSource \
import PythonDataSourcePlugin
class MyPlugin(PythonDataSourcePlugin):
"""Explanation of what MyPlugin does."""
# List of device attributes you'll need to do collection.
proxy_attributes = (
'zCommandUsername',
'zCommandPassword',
)
@classmethod
def config_key(cls, datasource, context):
"""
Return a tuple defining collection uniqueness.
This is a classmethod that is executed in zenhub. The datasource and
context parameters are the full objects.
This example implementation is the default. Split configurations by
device, cycle time, template id, datasource id and the Python data
source's plugin class name.
You can omit this method from your implementation entirely if this
default uniqueness behavior fits your needs. In many cases it will.
"""
return (
context.device().id,
datasource.getCycleTime(context),
datasource.rrdTemplate().id,
datasource.id,
datasource.plugin_classname,
)
@classmethod
def params(cls, datasource, context):
"""
Return params dictionary needed for this plugin.
This is a classmethod that is executed in zenhub. The datasource and
context parameters are the full objects.
This example implementation will provide no extra information for
each data source to the collect method.
You can omit this method from your implementation if you don't require
any additional information on each of the datasources of the config
parameter to the collect method below. If you only need extra
information at the device level it is easier to just use
proxy_attributes as mentioned above.
"""
return {}
def collect(self, config):
"""
No default collect behavior. You must implement this method.
This method must return a Twisted deferred. The deferred results will
be sent to the onResult then either onSuccess or onError callbacks
below.
"""
ds0 = config.datasources[0]
return somethingThatReturnsADeferred(
username=ds0.zCommandUsername,
password=ds0.zCommandPassword)
def onResult(self, result, config):
"""
Called first for success and error.
You can omit this method if you want the result of the collect method
to be used without further processing.
"""
return result
def onSuccess(self, result, config):
"""
Called only on success. After onResult, before onComplete.
You should return a data structure with zero or more events, values
and maps.
"""
collectionTime = time.time()
return {
'events': [{
'summary': 'successful collection',
'eventKey': 'myPlugin_result',
'severity': ZenEventClasses.Clear,
},{
'summary': 'first event summary',
'eventKey': 'myPlugin_result',
'severity': ZenEventClasses.Info,
},{
'summary': 'second event summary',
'eventKey': 'myPlugin_result',
'severity': ZenEventClasses.Warning,
}],
'values': {
None: {
# datapoints for the device (no component)
'datasource1_datapoint1': (123.4, collectionTime),
'datasource1_datapoint2': (5.678, collectionTime),
},
'cpu1': {
# datapoints can be specified per datasource...
'datasource1_user': (12.1, collectionTime),
'datasource2_user': (13.2, collectionTime),
# or just by id
'datasource1_system': (1.21, collectionTime),
'io': (23, collectionTime),
}
},
'maps': [
ObjectMap(...),
RelationshipMap(..),
],
# Optional attribute, in most cases it's used when you want to change
# the execution interval of a task during the data collection.
'interval': 300,
}
def onError(self, result, config):
"""
Called only on error. After onResult, before onComplete.
You can omit this method if you want the error result of the collect
method to be used without further processing. It recommended to
implement this method to capture errors.
"""
return {
'events': [{
'summary': 'error: %s' % result,
'eventKey': 'myPlugin_result',
'severity': 4,
}],
}
def onComplete(self, result, config):
"""
Called last for success and error.
You can omit this method if you want the result of either the
onSuccess or onError method to be used without further processing.
"""
return result
def cleanup(self, config):
"""
Called when collector exits, or task is deleted or changed.
"""
return
Weather Underground - HTTP API¶
Implement events, datapoints and modeling
# Logging
import logging
LOG = logging.getLogger('zen.WeatherUnderground')
# stdlib Imports
import json
import time
# Twisted Imports
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.web.client import getPage
# PythonCollector Imports
from Products.DataCollector.plugins.DataMaps import ObjectMap
from ZenPacks.zenoss.PythonCollector.datasources.PythonDataSource import PythonDataSourcePlugin
class Alerts(PythonDataSourcePlugin):
"""Weather Underground alerts data source plugin."""
@classmethod
def config_key(cls, datasource, context):
return (
context.device().id,
datasource.getCycleTime(context),
context.id,
'wunderground-alerts',
)
@classmethod
def params(cls, datasource, context):
return {
'api_key': context.zWundergroundAPIKey,
'api_link': context.api_link,
'location_name': context.title,
}
@inlineCallbacks
def collect(self, config):
data = self.new_data()
for datasource in config.datasources:
try:
response = yield getPage(
'http://api.wunderground.com/api/{api_key}/alerts{api_link}.json'
.format(
api_key=datasource.params['api_key'],
api_link=datasource.params['api_link']))
response = json.loads(response)
except Exception:
LOG.exception(
"%s: failed to get alerts data for %s",
config.id,
datasource.location_name)
continue
for alert in response['alerts']:
severity = None
if int(alert['expires_epoch']) <= time.time():
severity = 0
elif alert['significance'] in ('W', 'A'):
severity = 3
else:
severity = 2
data['events'].append({
'device': config.id,
'component': datasource.component,
'severity': severity,
'eventKey': 'wu-alert-{}'.format(alert['type']),
'eventClassKey': 'wu-alert',
'summary': alert['description'],
'message': alert['message'],
'wu-description': alert['description'],
'wu-date': alert['date'],
'wu-expires': alert['expires'],
'wu-phenomena': alert['phenomena'],
'wu-significance': alert['significance'],
'wu-type': alert['type'],
})
returnValue(data)
class Conditions(PythonDataSourcePlugin):
"""Weather Underground conditions data source plugin."""
@classmethod
def config_key(cls, datasource, context):
return (
context.device().id,
datasource.getCycleTime(context),
context.id,
'wunderground-conditions',
)
@classmethod
def params(cls, datasource, context):
return {
'api_key': context.zWundergroundAPIKey,
'api_link': context.api_link,
'location_name': context.title,
}
@inlineCallbacks
def collect(self, config):
data = self.new_data()
for datasource in config.datasources:
try:
response = yield getPage(
'http://api.wunderground.com/api/{api_key}/conditions{api_link}.json'
.format(
api_key=datasource.params['api_key'],
api_link=datasource.params['api_link']))
response = json.loads(response)
except Exception:
LOG.exception(
"%s: failed to get conditions data for %s",
config.id,
datasource.location_name)
continue
current_observation = response['current_observation']
for datapoint_id in (x.id for x in datasource.points):
if datapoint_id not in current_observation:
continue
try:
value = current_observation[datapoint_id]
if isinstance(value, basestring):
value = value.strip(' %')
value = float(value)
except (TypeError, ValueError):
# Sometimes values are NA or not available.
continue
dpname = '_'.join((datasource.datasource, datapoint_id))
data['values'][datasource.component][dpname] = (value, 'N')
data['maps'].append(
ObjectMap({
'relname': 'wundergroundLocations',
'modname': 'ZenPacks.training.WeatherUnderground.WundergroundLocation',
'id': datasource.component,
'weather': current_observation['weather'],
}))
returnValue(data)
BMC device - Process¶
See the excellent posts of Andrés Álvarez here : http://aalvarez.me/blog/posts/working-with-zenoss-python-data-sources.html
# Logging
import logging
log = logging.getLogger('zen.MyZenPack')
# Twisted Imports
from twisted.internet.defer import inlineCallbacks, returnValue
# PythonCollector Imports
from Products.DataCollector.plugins.DataMaps import ObjectMap
from ZenPacks.zenoss.PythonCollector.datasources.PythonDataSource import (
PythonDataSourcePlugin,
)
import subprocess
class BmcPowerStatus(PythonDataSourcePlugin):
"""BMC power status data source plugin."""
# List of device attributes needed for collection
proxy_attribures = (
'zBmcAddress',
'zIpmiUsername',
'zIpmiPassword',
)
@classmethod
def config_key(cls, datasource, context):
return (
context.device().id,
datasource.getCycleTime(context),
context.id,
'myzenpack-powerstatus',
)
@classmethod
def params(cls, datasource, context):
return {
'zBmcAddress': context.zBmcAddress,
'zIpmiUsername': context.zIpmiUsername,
'zIpmiPassword': context.zIpmiPassword,
}
@inlineCallbacks
def collect(self, config):
log.debug("Collect for BMC Power Status ({0})".format(config.id))
ds0 = config.datasources[0]
results = {}
# Collect using ipmitool
power_status = False
cmd_result = ''
try:
cmd = 'ipmitool -H {0} -I lanplus -U {1} -P {2} power status'.format(ds0.zBmcAddress, ds0.zIpmiUsername, ds0.zIpmiPassword)
cmd_result = yield subprocess.check_output(cmd, shell=True).rstrip()
log.info('Power Status for Device {0}: {1}'.format(ds0.zBmcAddress, cmd_result))
except:
log.error('Error when running ipmitool when collecting Power Status on BMC Address {0}'.format(ds0.zBmcAddress))
if cmd_result == 'Chassis Power is on':
power_status = True
results['power_status'] = power_status
returnValue(results)
def onSuccess(self, result, config):
data = self.new_data()
power_status = result['power_status']
data['maps'].append(
ObjectMap({
'modname': 'ZenPacks.itri.BmcMonitor.BmcServer',
'power_status': power_status,
}))
if power_status:
data['events'].append({
'device': config.id,
'summary': '{0} BMC power status is now UP'.format(config.id),
'severity': ZenEventClasses.Clear,
'eventClassKey': 'bmcPowerStatus',
})
else:
data['events'].append({
'device': config.id,
'summary': '{0} BMC power status is DOWN!'.format(config.id),
'severity': ZenEventClasses.Critical,
'eventClassKey': 'bmcPowerStatus',
})
data['events'].append({
'device': config.id,
'summary': 'BMC Power Status Collector: successful collection',
'severity': ZenEventClasses.Clear,
'eventKey': 'bmcPowerStatusCollectionError',
'eventClassKey': 'bmcMonitorFailure',
})
return data
def onError(self, result, config):
errmsg = 'BMC Power Status Collector: Error trying to collect.'
log.error('{0}: {1}'.format(config.id, errmsg))
data = self.new_data()
data['events'].append({
'device': config.id,
'summary': errmsg,
'severity': ZenEventClasses.Critical,
'eventKey': 'bmcPowerStatusCollectionError',
'eventClassKey': 'bmcMonitorFailure',
})
return data
ISAM - JSON¶
# stdlib Imports
import json
import logging
import base64
# Twisted Imports
from twisted.internet.defer import returnValue, DeferredSemaphore, DeferredList
from twisted.web.client import getPage
# Zenoss imports
from ZenPacks.zenoss.PythonCollector.datasources.PythonDataSource import PythonDataSourcePlugin
# Setup logging
log = logging.getLogger('zen.PythonISAMDevice')
class ISAMDevice(PythonDataSourcePlugin):
proxy_attributes = (
'zISAMUsername',
'zISAMPassword',
)
urls = {
'cpu' : 'https://{}/statistics/systems/cpu.json?timespan={}s',
'memory': 'https://{}/statistics/systems/memory.json?timespan={}s',
'storage': 'https://{}/statistics/systems/storage.json?timespan={}s',
}
@staticmethod
def add_tag(result, label):
return tuple((label, result))
@classmethod
def config_key(cls, datasource, context):
log.debug(
'In config_key context.device().id is %s datasource.getCycleTime(context) is %s datasource.rrdTemplate().id is %s datasource.id is %s datasource.plugin_classname is %s ' % (
context.device().id, datasource.getCycleTime(context), datasource.rrdTemplate().id, datasource.id,
datasource.plugin_classname))
return (
context.device().id,
datasource.getCycleTime(context),
datasource.rrdTemplate().id,
datasource.id,
datasource.plugin_classname,
)
@classmethod
def params(cls, datasource, context):
log.debug('Starting ISAMDevice params')
params = {}
log.debug(' params is %s \n' % (params))
return params
def collect(self, config):
log.debug('Starting ISAM Device collect')
ip_address = config.manageIp
if not ip_address:
log.error("%s: IP Address cannot be empty", device.id)
returnValue(None)
deferreds = []
sem = DeferredSemaphore(1)
for datasource in config.datasources:
timespan = max(120, 2 * datasource.cycletime)
url = self.urls[datasource.datasource].format(ip_address, timespan)
basicAuth = base64.encodestring('{}:{}'.format(datasource.zISAMUsername, datasource.zISAMPassword))
authHeader = "Basic " + basicAuth.strip()
d = sem.run(getPage, url,
headers={
"Accept": "application/json",
"Authorization": authHeader,
"User-Agent": "Mozilla/3.0Gold",
},
)
d.addCallback(self.add_tag, datasource.datasource)
deferreds.append(d)
return DeferredList(deferreds)
def onSuccess(self, result, config):
log.debug('Success - result is {}'.format(result))
ds_data = {}
for success, ddata in result:
if success:
ds = ddata[0]
metrics = json.loads(ddata[1])
ds_data[ds] = metrics
data = self.new_data()
for datasource in config.datasources:
for point in datasource.points:
# TODO: handle failures, try except and fill in data['events']
# TODO Following isn't that nice...
if datasource.datasource == 'memory' and point.dpName == 'memory_used_perc':
value = float(ds_data['memory']['used'])/float(ds_data['memory']['total'])*100
elif datasource.datasource == 'cpu' and point.dpName == 'cpu_total_cpu':
value = float(ds_data['cpu']['user_cpu']) + float(ds_data['cpu']['system_cpu'])
else:
value = float(ds_data[datasource.datasource][point.id])
if datasource.datasource in ['memory']:
value *= 1024*1024
data['values'][None][point.dpName] = (value, 'N')
return data
def onError(self, result, config):
log.error('Error - result is {}'.format(result))
# TODO: send event of collection failure
return {}