Upload Sensor Data to Xively

Xively is a service that provides the possibility to analyze and visualize the "Internet of Things". It can be used to interconnect different devices over the Internet and can store a history of measured values and can display it with pretty graphs.

This example is mainly based on the Using Python to upload weather data to Xively example.

For this project we are assuming, that you have a Python development environment set up and that you have a rudimentary understanding of the Python language.

If you are totally new to Python itself you should start here. If you are new to the Tinkerforge API, you should start here.

Goals

In this project the temperature and illuminance should be measured and uploaded to Xively in 1 minute intervals. In the following we will show step-by-step how this can be achieved.

Step 1: Create and configure Xively account

To use Xively, we first have to create a Xively account. Go to xively.com and sign up.

Click on "+ Device" and add a description of your device. Next add a new channel ("+ Add Channel"). For every sensor value we have to add a new channel:

Xively datastream configuration

The channels got the IDs AmbientLight and Temperature. Later we will need these IDs to upload measurements.

Step 2: Understanding Xively protocol

A quick search in the Xively API documentation reveals that we can update a datastream by just sending a small JSON package.

The given example is:

{
 "current_value":"294",
 "max_value":"697.0",
 "min_value":"0.0",
 "id":"1"
}

which can be send via HTTP PUT to http://api.xively.com/v2/feeds/<ID>. The API key is put in the header of the HTTP request and the return consists of an HTTP header only.

This seems quite straight forward. To not spam Xively, we should limit the uploading interval to once every minute. This means, that we need to store the measurements and determine the corresponding min and max values for each interval.

Step 3: Storing measurements

To start off, we create a simple value storage. To identify the values, we use the ID that was configured in the Xively account as the Datastream ID:

class Xively:
    def __init__(self):
        self.items = {}

    def put(self, identifier, value):
        try:
            _, min_value, max_value = self.items[identifier]
            if value < min_value:
                min_value = value
            if value > max_value:
                max_value = value
            self.items[identifier] = (value, min_value, max_value)
        except:
            self.items[identifier] = (value, value, value)

We retrieve the old value, min_value and max_value, calculate the new min/max and update it. If there aren't any values stored for a given identifier, we catch the occurring exception and add the identifier as a new key to the dict.

We just have to use the put function whenever a new measurement arrives:

class ServerRoomMonitoring:
    # [...]
    def __init__(self):
        # [...]
        self.xively = Xively()
        # [...]

    # [...]
    def cb_illuminance(self, illuminance):
        # Here we add illuminance to Xively with ID "AmbientLight"
        self.xively.put('AmbientLight', illuminance/10.0)
        log.info('Ambient Light ' + str(illuminance/10.0))
    # [...]

This has to be added for the other measurements accordingly.

Step 4: Uploading measurements

To upload our measurements we first have to define all of the names, URLs, keys and so on:

class Xively:
    HOST = 'api.xively.com'
    AGENT = "Tinkerforge xively 1.0"
    FEED = '196340443.json'
    API_KEY = 'SGpMEW3ZZ6yJVd9jZlaPgex06v1W00lA2UZkv5rgskwlVkr6'

    def __init__(self):
        self.items = {}
        self.headers = {
            "Content-Type"  : "application/x-www-form-urlencoded",
            "X-ApiKey"      : Xively.API_KEY,
            "User-Agent"    : Xively.AGENT,
        }
        self.params = "/v2/feeds/" + str(Xively.FEED)
        threading.Thread(target=self.upload).start()

You have to exchange the FEED and API_KEY values by your own Xively feed and key. Everything else is practically copied from the Xively documentation.

To make uploads in equidistant 1 minute intervals, we are starting the upload function in a thread:

def upload(self):
    while True:
        time.sleep(60) # Upload data every minute
        if len(self.items) == 0:
            continue

        stream_items = []
        for identifier, value in self.items.items():
            stream_items.append({'id': identifier,
                                 'current_value': value[0],
                                 'min_value': value[1],
                                 'max_value': value[2]})

        data = {'version' : '1.0.0',
                'datastreams': stream_items}
        self.items = {}
        body = json.dumps(data)

        try:
            http = httplib.HTTPSConnection(Xively.HOST)
            http.request('PUT', self.params, body, self.headers)
            response = http.getresponse()
            http.close()

            if response.status != 200:
                log.error('Could not upload to xively -> ' +
                          str(response.status) + ': ' + response.reason)
        except Exception as e:
            log.error('HTTP error: ' + str(e))

Here we take the data that was gathered, package it in the JSON format and send a HTTP PUT request with the data and the header that was defined in __init__. We also parse the response code and log it if something went wrong.

Step 5: Everything put together

We are done! There is of course still room for improvement. For example the uploading and adding of data could be protected by a mutex to ensure that we don't add data while it is uploaded and thus remove the newly added data after the uploading finished.

But if you put everything of the above together (download), you have a working Server Room Monitoring Kit that uploads the measurements to Xively:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket
import sys
import time
import math
import logging as log
import httplib
import json
import threading
log.basicConfig(level=log.INFO)

from tinkerforge.ip_connection import IPConnection
from tinkerforge.ip_connection import Error
from tinkerforge.brick_master import Master
from tinkerforge.bricklet_ambient_light import AmbientLight
from tinkerforge.bricklet_temperature import Temperature

class Xively:
    HOST = 'api.xively.com'
    AGENT = "Tinkerforge xively 1.0"
    FEED = '196340443.json'
    API_KEY = 'SGpMEW3ZZ6yJVd9jZlaPgex06v1W00lA2UZkv5rgskwlVkr6'

    def __init__(self):
        self.items = {}
        self.headers = {
            "Content-Type"  : "application/x-www-form-urlencoded",
            "X-ApiKey"      : Xively.API_KEY,
            "User-Agent"    : Xively.AGENT,
        }
        self.params = "/v2/feeds/" + str(Xively.FEED)
        self.upload_thread = threading.Thread(target=self.upload)
        self.upload_thread.daemon = True
        self.upload_thread.start()

    def put(self, identifier, value):
        try:
            _, min_value, max_value = self.items[identifier]
            if value < min_value:
                min_value = value
            if value > max_value:
                max_value = value
            self.items[identifier] = (value, min_value, max_value)
        except:
            self.items[identifier] = (value, value, value)

    def upload(self):
        while True:
            time.sleep(60) # Upload data every minute
            if len(self.items) == 0:
                continue

            stream_items = []
            for identifier, value in self.items.items():
                stream_items.append({'id': identifier,
                                     'current_value': value[0],
                                     'min_value': value[1],
                                     'max_value': value[2]})

            data = {'version': '1.0.0',
                    'datastreams': stream_items}
            self.items = {}
            body = json.dumps(data)

            try:
                http = httplib.HTTPSConnection(Xively.HOST)
                http.request('PUT', self.params, body, self.headers)
                response = http.getresponse()
                http.close()
                log.info('Start upload')

                if response.status != 200:
                    log.error('Could not upload to xively -> ' +
                              str(response.status) + ': ' + response.reason)
            except Exception as e:
                log.error('HTTP error: ' + str(e))

class ServerRoomMonitoring:
    HOST = "ServerMonitoring"
    PORT = 4223

    ipcon = None
    temp = None
    al = None

    def __init__(self):
        self.xively = Xively()
        self.ipcon = IPConnection()
        while True:
            try:
                self.ipcon.connect(ServerRoomMonitoring.HOST, ServerRoomMonitoring.PORT)
                break
            except Error as e:
                log.error('Connection Error: ' + str(e.description))
                time.sleep(1)
            except socket.error as e:
                log.error('Socket error: ' + str(e))
                time.sleep(1)

        self.ipcon.register_callback(IPConnection.CALLBACK_ENUMERATE,
                                     self.cb_enumerate)
        self.ipcon.register_callback(IPConnection.CALLBACK_CONNECTED,
                                     self.cb_connected)

        while True:
            try:
                self.ipcon.enumerate()
                break
            except Error as e:
                log.error('Enumerate Error: ' + str(e.description))
                time.sleep(1)

    def cb_illuminance(self, illuminance):
        self.xively.put('AmbientLight', illuminance/10.0)
        log.info('Ambient Light ' + str(illuminance/10.0))

    def cb_temperature(self, temperature):
        self.xively.put('Temperature', temperature/100.0)
        log.info('Temperature ' + str(temperature/100.0))

    def cb_enumerate(self, uid, connected_uid, position, hardware_version,
                     firmware_version, device_identifier, enumeration_type):
        if enumeration_type == IPConnection.ENUMERATION_TYPE_CONNECTED or \
           enumeration_type == IPConnection.ENUMERATION_TYPE_AVAILABLE:
            if device_identifier == AmbientLight.DEVICE_IDENTIFIER:
                try:
                    self.al = AmbientLight(uid, self.ipcon)
                    self.al.set_illuminance_callback_period(1000)
                    self.al.register_callback(self.al.CALLBACK_ILLUMINANCE,
                                              self.cb_illuminance)
                    log.info('AmbientLight initialized')
                except Error as e:
                    log.error('AmbientLight init failed: ' + str(e.description))
                    self.al = None
            elif device_identifier == Temperature.DEVICE_IDENTIFIER:
                try:
                    self.temp = Temperature(uid, self.ipcon)
                    self.temp.set_temperature_callback_period(1000)
                    self.temp.register_callback(self.temp.CALLBACK_TEMPERATURE,
                                               self.cb_temperature)
                    log.info('Temperature initialized')
                except Error as e:
                    log.error('Temperature init failed: ' + str(e.description))
                    self.temp = None

    def cb_connected(self, connected_reason):
        if connected_reason == IPConnection.CONNECT_REASON_AUTO_RECONNECT:
            log.info('Auto Reconnect')

            while True:
                try:
                    self.ipcon.enumerate()
                    break
                except Error as e:
                    log.error('Enumerate Error: ' + str(e.description))
                    time.sleep(1)

if __name__ == "__main__":
    log.info('Server Room Monitoring: Start')

    server_room_monitoring = ServerRoomMonitoring()

    if sys.version_info < (3, 0):
        input = raw_input # Compatibility for Python 2.x
    input('Press key to exit\n')

    if server_room_monitoring.ipcon != None:
        server_room_monitoring.ipcon.disconnect()

    log.info('Server Room Monitoring: End')
Creative Commons Licence The content of this page is licensed under Creative Commons Attribution 3.0 Unported License.