#!/usr/bin/python3 #-*- coding: utf-8 -*- # coding: utf-8 # pylint: disable=C0103,C0111,W0621 from __future__ import print_function from __future__ import unicode_literals import requests import os import json import hmac import time import argparse import sys import time from time import strftime, gmtime from datetime import datetime from hashlib import sha1 # To install the latest version of Unidecode from the Python package index, use these commands: # $ pip install unidecode from unidecode import unidecode # if sys.version_info >= (3, 0): import configparser as configp else: import ConfigParser as configp # # Freebox API SDK / Docs: http://dev.freebox.fr/sdk/os/login/ # version 8 # VERSION = "0.6.1 2021/04/27" # version 059 # prise en compte api v8 (option -H en particulier) # meilleure prise en compte des autres plateforme que Fbox Rev. # avec en particulier meilleur traitement des listes de paramètres. # version 060 # le Rasp Pi vu sans nom depuis le DHCP freebox n'apparait pas dans la liste des hotes ! # verifier ce qui se passe si on a l'addr MAC mais pas de nom # => aller prendre les données DHCP statiques et dynamiques # version 061 # prise en compte de l agregation xdsl / lte # prise en compte corrections xdsl en cas de connexion state=down # prise en compte test présence disk def get_creation_date(file): stat = os.stat(file) return stat.st_mtime def get_challenge(freebox_app_id): api_url = '%s/login/authorize/%s' % (ENDPOINT, freebox_app_id) r = requests.get(api_url) if r.status_code == 200: return r.json() else: print("Failed request: %s\n" % r.text) def open_session(password, freebox_app_id): api_url = '%s/login/session/' % ENDPOINT app_info = { 'app_id': freebox_app_id, 'password': password } json_payload = json.dumps(app_info) r = requests.post(api_url, data=json_payload) if r.status_code == 200: return r.json() else: print("Failed request: %s\n" % r.text) def get_internal_disk_stats(headers): api_url = '%s/storage/disk/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print("Failed request: %s\n" % r.text) def get_connection_stats(headers): api_url = '%s/connection/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print("Failed request: %s\n" % r.text) def get_ftth_status(headers): api_url = '%s/connection/ftth/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_xdsl_status(headers): api_url = '%s/connection/xdsl/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_lteconfig_status(headers): api_url = '%s/connection/lte/config' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_cnx_status(headers): api_url = '%s/connection/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_system_config(headers): api_url = '%s/system/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_switch_status(headers): api_url = '%s/switch/status/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_switch_port_stats(headers, port): # -P => update pour avec POP api_url = '%s/switch/port/%s/stats/' % (ENDPOINT, port) r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_lan_config(headers): api_url = '%s/lan/config/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_wifi_stats(headers, num): api_url = '%s/wifi/ap/%s/stations' % (ENDPOINT, num) r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_wifi_statsx(headers): api_url = '%s/wifi/ap/' % (ENDPOINT) r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_lan_interfaces(headers): api_url = '%s/lan/browser/interfaces/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_interfaces_hosts(headers, interf): api_url = '%s/lan/browser/%s/' % (ENDPOINT, interf) r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_static_dhcp(headers): api_url = '%s/dhcp/static_lease/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_dynamic_dhcp(headers): api_url = '%s/dhcp/dynamic_lease/' % ENDPOINT r = requests.get(api_url, headers=headers) if r.status_code == 200: return r.json() else: print('Failed request: %s\n' % r.text) def get_and_print_metrics(creds, s_switch, s_ports, s_sys, s_disk, s_lan, s_wifi, s_lan_interfaces, s_interfaces_hosts, s_static_dhcp, s_dynamic_dhcp, s_xdsl_tunnel): #freebox_app_id = "fr.freebox.seximonitor" #freebox_app_id = "fr.freebox.grafanamonitor" freebox_app_id = creds['app_id'] # # setup output dataformat, default Graphite # tag for influxdb # regle de nommage : mettre les informations (rx, tx, port 1, ...) dans les tags. # ne mettre que des noms de valeurs de variables generiques : bytes, bits, rate, bandwidth, firmware ..... # les tags sont la pour donnner le contexte # cela permettra des tris et regroupements plus complets sous grafana # 3 tags séparé par des "." # chaque valeur aura donc un nom de la forme : tag1.tag2.tag3.valeur # tag1=tag2=tag3="" dataformat='influxdb' # Fetch challenge resp = get_challenge(creds['track_id']) challenge = resp['result']['challenge'] # Generate session password if sys.version_info >= (3, 0): h = hmac.new(bytearray(creds['app_token'], 'ASCII'), bytearray(challenge, 'ASCII'), sha1) else: h = hmac.new(creds['app_token'], challenge, sha1) password = h.hexdigest() # Fetch session_token resp = open_session(password, freebox_app_id) session_token = resp['result']['session_token'] # Setup headers with the generated session_token headers = { 'X-Fbx-App-Auth': session_token } # Setup hashtable for results my_data = {} # Fetch connection stats json_raw = get_connection_stats(headers) #Additionnal informations when state is down connection_media = "" if 'result' in json_raw: if 'state' in json_raw['result']: if json_raw['result']['state'] == 'down': json_raw['result']['ipv4'] = 'None' json_raw['result']['ipv6'] = 'None' json_raw['result']['ipv4_port_range'] = [0,0] connection_media = 'None' # fbx telegraf docker info tag1="python" tag2="version" tag3="NULL" my_data[tag1+"."+tag2+"."+tag3+"."+'version_script'] = VERSION tag2="fichier" my_data[tag1+"."+tag2+"."+tag3+"."+'nom_fichier'] = __file__ tag2="derniere modification" # Convertir Timestamp en datetime update_date = datetime.fromtimestamp(get_creation_date(__file__)) update_str = datetime.ctime(update_date) my_data[tag1+"."+tag2+"."+tag3+"."+'last_updated'] = update_str # Generic datas, same for FFTH or xDSL # ffth for FFTH (default) # xdsl for xDSL, with or without 4G if connection_media != 'None' : connection_media = json_raw['result']['media'] tag1="box" if 'result' in json_raw: tag3 = "NULL" tag2 = "down" my_data[tag1+"."+tag2+"."+tag3+"."+'bytes'] = json_raw['result']['bytes_down'] # total in bytes since last connection my_data[tag1+"."+tag2+"."+tag3+"."+'rate'] = json_raw['result']['rate_down'] # current rate in byte/s my_data[tag1+"."+tag2+"."+tag3+"."+'bandwidth'] = json_raw['result']['bandwidth_down'] # available bw in bit/s my_data[tag1+"."+tag2+"."+tag3+"."+'bytes'] = json_raw['result']['bytes_down'] tag2 = "up" my_data[tag1+"."+tag2+"."+tag3+"."+'bytes'] = json_raw['result']['bytes_up'] my_data[tag1+"."+tag2+"."+tag3+"."+'rate'] = json_raw['result']['rate_up'] my_data[tag1+"."+tag2+"."+tag3+"."+'bandwidth'] = json_raw['result']['bandwidth_up'] my_data[tag1+"."+tag2+"."+tag3+"."+'bytes'] = json_raw['result']['bytes_up'] tag2 = "NULL" my_data[tag1+"."+tag2+"."+tag3+"."+'media'] = connection_media my_data[tag1+"."+tag2+"."+tag3+"."+'ipv4'] = json_raw['result']['ipv4'] my_data[tag1+"."+tag2+"."+tag3+"."+'ipv6'] = json_raw['result']['ipv6'] tag2 = "ip_port_range" my_data[tag1+"."+tag2+"."+tag3+"."+'ipv4_port_range_low'] = json_raw['result']['ipv4_port_range'][0] my_data[tag1+"."+tag2+"."+tag3+"."+'ipv4_port_range_up'] = json_raw['result']['ipv4_port_range'][1] tag2 = "state" my_data[tag1+"."+tag2+"."+tag3+"."+'cnx_state'] = json_raw['result']['state'] cnx_status = get_cnx_status(headers) # FTTH specific if connection_media == "ftth": json_raw = get_ftth_status(headers) if 'result' in json_raw: tag1="box" tag2="signal" tag3 = "NULL" my_data[tag1+"."+tag2+"."+tag3+"."+'sfp_has_signal'] = json_raw['result']['sfp_has_signal'] # BrW : cet attribu est bien présent: boolean # xDSL specific (galm : ajout condition state=up) if connection_media == "xdsl" and json_raw['result']['state'] == 'up': json_raw = get_xdsl_status(headers) tag1="box" tag2="xDSL" tag3 = "NULL" if 'result' in json_raw: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_modulation'] = json_raw['result']['status']['modulation'] + " ("+json_raw['result']['status']['protocol']+")" # in seconds my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_uptime'] = json_raw['result']['status']['uptime'] # in seconds my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_status_string'] = json_raw['result']['status']['status'] if json_raw['result']['status']['status'] == "down": # unsynchronized my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_status'] = 0 elif json_raw['result']['status']['status'] == "training": # synchronizing step 1/4 my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_status'] = 1 elif json_raw['result']['status']['status'] == "started": # synchronizing step 2/4 my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_status'] = 2 elif json_raw['result']['status']['status'] == "chan_analysis": # synchronizing step 3/4 my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_status'] = 3 elif json_raw['result']['status']['status'] == "msg_exchange": # synchronizing step 4/4 my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_status'] = 4 elif json_raw['result']['status']['status'] == "showtime": # ready my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_status'] = 5 elif json_raw['result']['status']['status'] == "disabled": # disabled my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_status'] = 6 else: # unknown my_data['xdsl_status'] = 999 if 'es' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_es'] = json_raw['result']['down']['es'] # increment if 'attn' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_attn'] = json_raw['result']['down']['attn'] # in dB if 'snr' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_snr'] = json_raw['result']['down']['snr'] # in dB if 'rate' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_rate'] = json_raw['result']['down']['rate'] # ATM rate in kbit/s if 'hec' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_hec'] = json_raw['result']['down']['hec'] # increment if 'crc' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_crc'] = json_raw['result']['down']['crc'] # increment if 'ses' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_ses'] = json_raw['result']['down']['ses'] # increment if 'fec' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_fec'] = json_raw['result']['down']['fec'] # increment if 'maxrate' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_maxrate'] = json_raw['result']['down']['maxrate'] # ATM max rate in kbit/s if 'rtx_tx' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_rtx_tx'] = json_raw['result']['down']['rtx_tx'] # G.INP on/off if 'rtx_c' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_rtx_c'] = json_raw['result']['down']['rtx_c'] # G.INP corrected if 'rtx_uc' in json_raw['result']['down']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_down_rtx_uc'] = json_raw['result']['down']['rtx_uc'] # G.INP uncorrected if 'es' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_es'] = json_raw['result']['up']['es'] if 'attn' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_attn'] = json_raw['result']['up']['attn'] if 'snr' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_snr'] = json_raw['result']['up']['snr'] if 'rate' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_rate'] = json_raw['result']['up']['rate'] if 'hec' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_hec'] = json_raw['result']['up']['hec'] if 'crc' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_crc'] = json_raw['result']['up']['crc'] if 'ses' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_ses'] = json_raw['result']['up']['ses'] if 'fec' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_fec'] = json_raw['result']['up']['fec'] if 'maxrate' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_maxrate'] = json_raw['result']['up']['maxrate'] if 'rtx_tx' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_rtx_tx'] = json_raw['result']['up']['rtx_tx'] if 'rtx_c' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_rtx_c'] = json_raw['result']['up']['rtx_c'] # G.INP corrected if 'rtx_uc' in json_raw['result']['up']: my_data[tag1+"."+tag2+"."+tag3+"."+'xdsl_up_rtx_uc'] = json_raw['result']['up']['rtx_uc'] # G.INP uncorrected # -4 4G lte xdsl tunnel if s_xdsl_tunnel and connection_media == "xdsl" : json_raw2 = get_lteconfig_status(headers) tag1="4G_lte" tag2="NULL" tag3="NULL" if 'result' in json_raw2: if 'antenna' in json_raw2['result']: my_data[tag1+"."+tag2+"."+tag3+"."+'antenna_']=json_raw2['result']['antenna'] if 'enabled' in json_raw2['result']: my_data[tag1+"."+tag2+"."+tag3+"."+'enabled_']=json_raw2['result']['enabled'] if 'fsm_state' in json_raw2['result']: my_data[tag1+"."+tag2+"."+tag3+"."+'fsm_state_']=json_raw2['result']['fsm_state'] if 'has_external_antennas' in json_raw2['result']: my_data[tag1+"."+tag2+"."+tag3+"."+'has_external_antennas_']=json_raw2['result']['has_external_antennas'] if 'state' in json_raw2['result']: my_data[tag1+"."+tag2+"."+tag3+"."+'state']=json_raw2['result']['state'] if 'network' in json_raw2['result']: tag2="network" if 'has_ipv4' in json_raw2['result']['network']: my_data[tag1+"."+tag2+"."+tag3+"."+'network_has_ipv4']=json_raw2['result']['network']['has_ipv4'] if 'has_ipv6' in json_raw2['result']['network']: my_data[tag1+"."+tag2+"."+tag3+"."+'network_has_ipv6']=json_raw2['result']['network']['has_ipv6'] if 'ipv4' in json_raw2['result']['network']: my_data[tag1+"."+tag2+"."+tag3+"."+'network_ipv4']=json_raw2['result']['network']['ipv4'] if 'ipv4_dns' in json_raw2['result']['network']: my_data[tag1+"."+tag2+"."+tag3+"."+'network_ipv4_dns']=json_raw2['result']['network']['ipv4_dns'] if 'ipv4_netmask' in json_raw2['result']['network']: my_data[tag1+"."+tag2+"."+tag3+"."+'network_ipv4_netmask']=json_raw2['result']['network']['ipv4_netmask'] if 'ipv6' in json_raw2['result']['network']: my_data[tag1+"."+tag2+"."+tag3+"."+'network_ipv6']=json_raw2['result']['network']['ipv6'] if 'ipv6_dns' in json_raw2['result']['network']: my_data[tag1+"."+tag2+"."+tag3+"."+'network_ipv6_dns']=json_raw2['result']['network']['ipv6_dns'] if 'ipv6_netmask' in json_raw2['result']['network']: my_data[tag1+"."+tag2+"."+tag3+"."+'network_ipv6_netmask']=json_raw2['result']['network']['ipv6_netmask'] if 'pdn_up' in json_raw2['result']['network']: my_data[tag1+"."+tag2+"."+tag3+"."+'network_pdn_up']=json_raw2['result']['network']['pdn_up'] if 'radio' in json_raw2['result']: tag2="radio" if 'associated' in json_raw2['result']['radio']: my_data[tag1+"."+tag2+"."+tag3+"."+'radio_associated']=json_raw2['result']['radio']['associated'] if 'bands' in json_raw2['result']['radio']: # liste l=len(json_raw2['result']['radio']['bands']) i=0 while i