#!/usr/bin/env python3 """ GRGS Office Cloud Print Agent ============================= This agent runs on your local network and securely sends printer data to the GRGS Office cloud backend. Installation: 1. Install Python 3.7+ on a machine in your network 2. Install required packages: pip install requests pysnmp 3. Configure the AGENT_TOKEN and CLOUD_URL below 4. Run: python3 grgs_office_agent.py Features: - Auto-discovers printers on your network via SNMP - Sends printer metrics (page counts, toner levels) to cloud - Reports print jobs when detected - Secure token-based authentication - Multi-tenant support Author: GRGS Office Version: 1.0.4 """ import os import sys import time import json import socket import logging import argparse import threading import re import asyncio import subprocess import shutil from datetime import datetime, timezone from typing import Dict, List, Optional, Set try: import requests except ImportError: print("Please install requests: pip install requests") sys.exit(1) SNMP_BACKEND = None SNMP_BACKEND_ERROR = None LEGACY_CMDGEN = None SNMPGET_PATH = None SNMPWALK_PATH = None getCmd = None nextCmd = None SnmpEngine = None CommunityData = None UdpTransportTarget = None ContextData = None ObjectType = None ObjectIdentity = None def _normalize_networks(value) -> List[str]: if value is None: return [] if isinstance(value, (list, tuple)): items = [str(v).strip() for v in value if str(v).strip()] return items raw = str(value) parts = [p.strip() for p in raw.replace(';', ',').split(',')] return [p for p in parts if p] def init_snmp_backend() -> None: global SNMP_BACKEND, SNMP_BACKEND_ERROR, LEGACY_CMDGEN global SNMPGET_PATH, SNMPWALK_PATH global getCmd, nextCmd, SnmpEngine, CommunityData, UdpTransportTarget global ContextData, ObjectType, ObjectIdentity if SNMP_BACKEND is not None: return pysnmp_path = None try: import pysnmp pysnmp_path = getattr(pysnmp, '__file__', 'unknown') except Exception as e: SNMP_BACKEND = None SNMP_BACKEND_ERROR = f"pysnmp import failed: {e} (python: {sys.executable})" return hlapi_error = asyncore_error = legacy_error = None try: from pysnmp.hlapi import ( getCmd as _getCmd, nextCmd as _nextCmd, SnmpEngine as _SnmpEngine, CommunityData as _CommunityData, UdpTransportTarget as _UdpTransportTarget, ContextData as _ContextData, ObjectType as _ObjectType, ObjectIdentity as _ObjectIdentity ) getCmd = _getCmd; nextCmd = _nextCmd; SnmpEngine = _SnmpEngine CommunityData = _CommunityData; UdpTransportTarget = _UdpTransportTarget ContextData = _ContextData; ObjectType = _ObjectType; ObjectIdentity = _ObjectIdentity SNMP_BACKEND = 'sync'; SNMP_BACKEND_ERROR = None return except Exception as e: hlapi_error = e try: from pysnmp.hlapi.asyncore import ( getCmd as _getCmd, nextCmd as _nextCmd, SnmpEngine as _SnmpEngine, CommunityData as _CommunityData, UdpTransportTarget as _UdpTransportTarget, ContextData as _ContextData, ObjectType as _ObjectType, ObjectIdentity as _ObjectIdentity ) getCmd = _getCmd; nextCmd = _nextCmd; SnmpEngine = _SnmpEngine CommunityData = _CommunityData; UdpTransportTarget = _UdpTransportTarget ContextData = _ContextData; ObjectType = _ObjectType; ObjectIdentity = _ObjectIdentity SNMP_BACKEND = 'sync'; SNMP_BACKEND_ERROR = None return except Exception as e: asyncore_error = e try: from pysnmp.entity.rfc3413.oneliner import cmdgen LEGACY_CMDGEN = cmdgen SNMP_BACKEND = 'legacy'; SNMP_BACKEND_ERROR = None return except Exception as e: legacy_error = e SNMPGET_PATH = shutil.which('snmpget') SNMPWALK_PATH = shutil.which('snmpwalk') if SNMPGET_PATH and SNMPWALK_PATH: SNMP_BACKEND = 'cli'; SNMP_BACKEND_ERROR = None return SNMP_BACKEND = None SNMP_BACKEND_ERROR = ( f"hlapi import failed: {hlapi_error}; " f"asyncore hlapi import failed: {asyncore_error}; " f"legacy cmdgen import failed: {legacy_error}; " f"pysnmp: {pysnmp_path}; python: {sys.executable}; " f"snmpget: {SNMPGET_PATH}; snmpwalk: {SNMPWALK_PATH}" ) def _parse_snmp_value(raw: str) -> Optional[str]: if not raw: return None if ' = ' in raw: raw = raw.split(' = ', 1)[1] if ':' in raw: raw = raw.split(':', 1)[1] raw = raw.strip() if raw.startswith('"') and raw.endswith('"'): raw = raw[1:-1] return raw if raw else None def _snmp_get_cli(ip: str, oid: str, timeout: int = 2, retries: int = 1) -> Optional[str]: if not SNMPGET_PATH: return None version_flag = '2c' if str(SNMP_VERSION).lower() == '2c' else '1' cmd = [SNMPGET_PATH, '-v', version_flag, '-c', SNMP_COMMUNITY, '-t', str(timeout), '-r', str(retries), ip, oid] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout + 1) if result.returncode != 0: return None return _parse_snmp_value(result.stdout.strip()) except Exception: return None def _snmp_walk_cli(ip: str, oid: str, timeout: int = 2, retries: int = 0, max_rows: int = 64) -> List[str]: if not SNMPWALK_PATH: return [] version_flag = '2c' if str(SNMP_VERSION).lower() == '2c' else '1' cmd = [SNMPWALK_PATH, '-v', version_flag, '-c', SNMP_COMMUNITY, '-t', str(timeout), '-r', str(retries), ip, oid] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout + 2) if result.returncode != 0: return [] values = [] for line in result.stdout.splitlines(): value = _parse_snmp_value(line) if value is not None: values.append(value) if len(values) >= max_rows: break return values except Exception: return [] # ============================================================================= # CONFIGURATION - Edit these values # ============================================================================= AGENT_TOKEN = "YOUR_AGENT_TOKEN_HERE" CLOUD_URL = "https://your-ngrok-url.ngrok-free.app" NETWORK_RANGE = "192.168.1.0/24" SNMP_COMMUNITY = "public" SNMP_VERSION = "2c" PRINTER_SYNC_INTERVAL = 300 METRICS_SYNC_INTERVAL = 30 HEARTBEAT_INTERVAL = 30 LOG_LEVEL = logging.INFO LOG_FILE = os.path.join(os.path.dirname(__file__), "grgs_office_agent.log") # ============================================================================= # SNMP OIDs (all use leading-dot notation) # ============================================================================= SNMP_OIDS = { 'sysDescr': '.1.3.6.1.2.1.1.1.0', 'sysObjectID': '.1.3.6.1.2.1.1.2.0', 'sysName': '.1.3.6.1.2.1.1.5.0', 'hrDeviceDescr': '.1.3.6.1.2.1.25.3.2.1.3.1', 'hrPrinterDetectedErrorState': '.1.3.6.1.2.1.25.3.5.1.2.1', 'prtGeneralPrinterName': '.1.3.6.1.2.1.43.5.1.1.16.1', 'prtGeneralSerialNumber': '.1.3.6.1.2.1.43.5.1.1.17.1', # Canon specific (older iR and newer imageRUNNER) 'canonModelName': '.1.3.6.1.4.1.1602.1.1.1.1.0', 'canonSerialNumber': '.1.3.6.1.4.1.1602.1.3.1.1.1.1', 'canonSerialNumberAlt': '.1.3.6.1.4.1.1602.1.2.2.1.0', 'canonSerialNumberAlt2': '.1.3.6.1.4.1.1602.1.1.1.2.0', # Standard page counters 'prtMarkerLifeCount': '.1.3.6.1.2.1.43.10.2.1.4.1.1', # marker 1 (total on most; B&W on OKI) 'prtMarkerLifeCountIdx2': '.1.3.6.1.2.1.43.10.2.1.4.1.2', # marker 2 (color on OKI & many MFPs) # Supplies walk bases (no trailing instance) 'prtMarkerSuppliesDesc': '.1.3.6.1.2.1.43.11.1.1.6', 'prtMarkerSuppliesType': '.1.3.6.1.2.1.43.11.1.1.5', 'prtMarkerSuppliesMaxCapacity': '.1.3.6.1.2.1.43.11.1.1.8', 'prtMarkerSuppliesLevel': '.1.3.6.1.2.1.43.11.1.1.9', # Paper Trays (prtInputTable) 'prtInputDescription': '.1.3.6.1.2.1.43.8.2.1.13', 'prtInputName': '.1.3.6.1.2.1.43.8.2.1.18', 'prtInputMaxCapacity': '.1.3.6.1.2.1.43.8.2.1.9', 'prtInputCurrentLevel': '.1.3.6.1.2.1.43.8.2.1.10', } # ============================================================================= # Vendor profiles # sys_object_ids – enterprise OID prefix (no leading dot, dot-boundary matched) # color_oids – ordered list of OIDs tried for color page count # bw/color_*_oids – per-function counters (Develop/KM only) # page_total_oids – vendor grand-total OID (Develop/KM only) # toner_black_oids – vendor fallback black toner level # toner_black_max_oids – vendor fallback black toner max # ============================================================================= VENDOR_PROFILES = { 'canon': { 'sys_object_ids': ['1.3.6.1.4.1.1602'], 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], 'toner_black_oids': ['.1.3.6.1.2.1.43.11.1.1.9.1.1'], 'toner_black_max_oids': ['.1.3.6.1.2.1.43.11.1.1.8.1.1'], }, 'hp': { 'sys_object_ids': ['1.3.6.1.4.1.11'], 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], }, 'ricoh': { 'sys_object_ids': ['1.3.6.1.4.1.367'], # IM C series: color at standard .4.1.2 — trust it directly (no guard) 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], }, 'olivetti': { 'sys_object_ids': ['1.3.6.1.4.1.268'], 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], }, 'kyocera': { 'sys_object_ids': ['1.3.6.1.4.1.1347'], # 2506ci: .4.1.2 may return small garbage; private OID not reliable either. # Use only .4.1.2 with sanity check. 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], }, 'xerox': { 'sys_object_ids': ['1.3.6.1.4.1.253'], 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], }, 'develop': { 'sys_object_ids': ['1.3.6.1.4.1.18334'], 'color_oids': [], 'bw_print_oids': ['.1.3.6.1.4.1.18334.1.1.1.5.7.2.2.1.5.1.1'], 'bw_copy_oids': ['.1.3.6.1.4.1.18334.1.1.1.5.7.2.2.1.5.1.2'], 'color_print_oids': ['.1.3.6.1.4.1.18334.1.1.1.5.7.2.2.1.5.2.1'], 'color_copy_oids': ['.1.3.6.1.4.1.18334.1.1.1.5.7.2.2.1.5.2.2'], 'page_total_oids': ['.1.3.6.1.4.1.18334.1.1.1.5.7.2.1.1.0'], }, 'samsung': { 'sys_object_ids': ['1.3.6.1.4.1.236'], 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], }, 'brother': { 'sys_object_ids': ['1.3.6.1.4.1.2435'], 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], }, 'lexmark': { 'sys_object_ids': ['1.3.6.1.4.1.641'], 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], }, 'sharp': { 'sys_object_ids': ['1.3.6.1.4.1.683'], 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], }, 'oki': { 'sys_object_ids': ['1.3.6.1.4.1.2001'], 'color_oids': ['.1.3.6.1.2.1.43.10.2.1.4.1.2'], 'bw_print_oids': [], 'bw_copy_oids': [], 'color_print_oids': [], 'color_copy_oids': [], }, } # ============================================================================= # Toner colour classification # # Handles: # • English / Italian / French / German / Spanish / Portuguese / Dutch words # • Short codes: bk, cy, mg, yl # • Model-suffix patterns: CK-8511C → cyan, CK-8511K → black # • Standalone word tokens: TONER C MF2555 → cyan, TONER K MF2555 → black # ============================================================================= _COLOR_KEYWORDS = { # BLACK 'black': 'black', 'nero': 'black', 'noir': 'black', 'schwarz': 'black', 'negro': 'black', 'preto': 'black', 'zwart': 'black', 'negru': 'black', 'bk': 'black', 'blk': 'black', # CYAN 'cyan': 'cyan', 'ciano': 'cyan', 'cyaan': 'cyan', 'cian': 'cyan', 'zyan': 'cyan', 'cy': 'cyan', # MAGENTA 'magenta': 'magenta', 'mag': 'magenta', 'mg': 'magenta', # YELLOW 'yellow': 'yellow', 'giallo': 'yellow', 'jaune': 'yellow', 'gelb': 'yellow', 'amarillo': 'yellow', 'amarelo': 'yellow', 'geel': 'yellow', 'ye': 'yellow', 'yl': 'yellow', } _SINGLE_LETTER_MAP = {'k': 'black', 'c': 'cyan', 'm': 'magenta', 'y': 'yellow'} def _classify_toner_color(description: str) -> Optional[str]: """ Return 'black', 'cyan', 'magenta', 'yellow', or None. Steps: 1. Tokenise on whitespace/hyphen/underscore/slash. 2. Match multi-char keywords first (longest wins over single-letter). 3. Match digit+letter suffix (e.g. '8511c' → cyan). 4. Match standalone single letter token (e.g. 'TONER C MF2555' → cyan). """ d = description.lower() tokens = re.split(r'[\s\-_/()\[\]{}]+', d) # Step 2: multi-char keyword match for tok in tokens: if tok in _COLOR_KEYWORDS and len(tok) > 1: return _COLOR_KEYWORDS[tok] # Step 3: digit+letter suffix e.g. "ck-8511c" → token "8511c" ends with 'c' for tok in tokens: m = re.match(r'^\d+([cmyk])$', tok) if m: return _SINGLE_LETTER_MAP[m.group(1)] # Step 4: standalone single letter e.g. token "c" in ["toner", "c", "mf2555"] for tok in tokens: if tok in _SINGLE_LETTER_MAP: return _SINGLE_LETTER_MAP[tok] return None # ============================================================================= # Agent Class # ============================================================================= class GRGSOfficeAgent: def add_printer_direct(self, printer: dict) -> bool: """Add a printer directly (from server push), autodetect model/info, sync immediately.""" ip = printer.get('ip') if not ip: self.logger.error("No IP provided for direct printer add.") return False autodetected = self.discover_printers(ip) if autodetected: info = autodetected[0] for k in ['name', 'model', 'serial', 'location', 'status']: if printer.get(k): info[k] = printer[k] self.printers[ip] = info self.logger.info(f"Directly added printer (autodetected): {ip} ({info.get('name', '')})") else: self.printers[ip] = printer self.logger.info(f"Directly added printer (manual): {ip} ({printer.get('name', '')})") metrics = self.get_printer_metrics(ip) if metrics: self._api_call('sync_metrics', {'metrics': [metrics]}) return True def __init__(self, token: str, cloud_url: str): self.token = token self.cloud_url = cloud_url.rstrip('/') self.session = requests.Session() self.session.headers.update({'X-Agent-Token': token, 'Content-Type': 'application/json'}) self.printers: Dict[str, dict] = {} self.assigned_ips: Set[str] = set() self.state_file = os.path.join(os.path.dirname(__file__), 'grgs_office_agent_state.json') self.job_state: Dict[str, dict] = self._load_state() self.running = False self.logger = logging.getLogger('grgs_office_agent') def _api_call(self, action: str, data: dict = None, method: str = 'POST') -> Optional[dict]: url = f"{self.cloud_url}/api/agent.php?action={action}" headers = {'Content-Type': 'application/json', 'X-Agent-Token': self.token} payload = data.copy() if data else {} if method != 'GET': payload['token'] = self.token try: self.logger.debug(f"API Request URL: {url}") if method == 'GET': response = self.session.get(url, headers=headers, timeout=30) else: self.logger.debug(f"API Request Payload: {json.dumps(payload)}") response = self.session.post(url, headers=headers, json=payload, timeout=30) if response.status_code != 200: self.logger.error(f"API call failed ({action}): HTTP {response.status_code} - {response.text}") return None try: return response.json() except Exception as e: self.logger.error(f"JSON decode error for {action}: {e}\nRaw: {response.text}") return None except requests.exceptions.RequestException as e: self.logger.error(f"API call failed ({action}): {e}") return None def _load_state(self) -> Dict[str, dict]: if not os.path.exists(self.state_file): return {} try: with open(self.state_file, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): return data except Exception as e: self.logger.debug(f"Failed to load state file: {e}") return {} def fetch_assigned_printers(self) -> None: """Fetch all printers currently assigned to this agent in the cloud.""" try: resp = self._api_call('assigned_printers', method='GET') if resp and resp.get('success'): printers = resp.get('printers', []) new_ips = {p['ip'] for p in printers if p.get('ip')} old_count = len(self.assigned_ips) self.assigned_ips = new_ips if len(new_ips) != old_count: self.logger.info(f"Updated assigned printers from cloud: {len(new_ips)} IPs monitored.") except Exception as e: self.logger.error(f"Failed to fetch assigned printers: {e}") def _save_state(self) -> None: try: with open(self.state_file, 'w', encoding='utf-8') as f: json.dump(self.job_state, f) except Exception as e: self.logger.debug(f"Failed to save state file: {e}") def _snmp_get(self, ip: str, oid: str, timeout: int = 2, retries: int = 1): if SNMP_BACKEND is None: init_snmp_backend() if SNMP_BACKEND is None: return None try: snmp_model = 1 if str(SNMP_VERSION).lower() == '2c' else 0 if SNMP_BACKEND == 'sync': iterator = getCmd( SnmpEngine(), CommunityData(SNMP_COMMUNITY, mpModel=snmp_model), UdpTransportTarget((ip, 161), timeout=timeout, retries=retries), ContextData(), ObjectType(ObjectIdentity(oid)) ) errorIndication, errorStatus, errorIndex, varBinds = next(iterator) if errorIndication or errorStatus: return None return varBinds[0][1] if SNMP_BACKEND == 'legacy' and LEGACY_CMDGEN is not None: errorIndication, errorStatus, errorIndex, varBinds = LEGACY_CMDGEN.CommandGenerator().getCmd( LEGACY_CMDGEN.CommunityData(SNMP_COMMUNITY, mpModel=snmp_model), LEGACY_CMDGEN.UdpTransportTarget((ip, 161), timeout=timeout, retries=retries), LEGACY_CMDGEN.ObjectType(LEGACY_CMDGEN.ObjectIdentity(oid)) ) if errorIndication or errorStatus: return None return varBinds[0][1] if SNMP_BACKEND == 'cli': return _snmp_get_cli(ip, oid, timeout=timeout, retries=retries) async def _get_async(): return await getCmd( SnmpEngine(), CommunityData(SNMP_COMMUNITY, mpModel=snmp_model), UdpTransportTarget((ip, 161), timeout=timeout, retries=retries), ContextData(), ObjectType(ObjectIdentity(oid)) ) errorIndication, errorStatus, errorIndex, varBinds = asyncio.run(_get_async()) if errorIndication or errorStatus: return None return varBinds[0][1] except Exception: return None def _snmp_walk(self, ip: str, oid: str, timeout: int = 2, retries: int = 0, max_rows: int = 64): if SNMP_BACKEND is None: init_snmp_backend() if SNMP_BACKEND is None: return [] try: snmp_model = 1 if str(SNMP_VERSION).lower() == '2c' else 0 results = [] if SNMP_BACKEND == 'sync': for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd( SnmpEngine(), CommunityData(SNMP_COMMUNITY, mpModel=snmp_model), UdpTransportTarget((ip, 161), timeout=timeout, retries=retries), ContextData(), ObjectType(ObjectIdentity(oid)), lexicographicMode=False ): if errorIndication or errorStatus: break for varBind in varBinds: results.append(varBind[1]) if len(results) >= max_rows: return results return results if SNMP_BACKEND == 'legacy' and LEGACY_CMDGEN is not None: for (errorIndication, errorStatus, errorIndex, varBinds) in LEGACY_CMDGEN.CommandGenerator().nextCmd( LEGACY_CMDGEN.CommunityData(SNMP_COMMUNITY, mpModel=snmp_model), LEGACY_CMDGEN.UdpTransportTarget((ip, 161), timeout=timeout, retries=retries), LEGACY_CMDGEN.ObjectType(LEGACY_CMDGEN.ObjectIdentity(oid)), lexicographicMode=False ): if errorIndication or errorStatus: break for varBind in varBinds: results.append(varBind[1]) if len(results) >= max_rows: return results return results if SNMP_BACKEND == 'cli': return _snmp_walk_cli(ip, oid, timeout=timeout, retries=retries, max_rows=max_rows) async def _walk_async(): async for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd( SnmpEngine(), CommunityData(SNMP_COMMUNITY, mpModel=snmp_model), UdpTransportTarget((ip, 161), timeout=timeout, retries=retries), ContextData(), ObjectType(ObjectIdentity(oid)), lexicographicMode=False ): if errorIndication or errorStatus: break for varBind in varBinds: results.append(varBind[1]) if len(results) >= max_rows: return asyncio.run(_walk_async()) return results except Exception: return [] def heartbeat(self) -> bool: result = self._api_call('heartbeat', method='GET') if result and result.get('success'): self.logger.debug(f"Heartbeat OK - Company: {result.get('company')}") return True self.logger.error(f"Heartbeat failed: {result}") return False def get_config(self) -> Optional[dict]: result = self._api_call('get_config', method='GET') if result and result.get('success'): return result.get('config', {}) return None # ------------------------------------------------------------------------- # _detect_vendor — dot-boundary safe prefix matching # ------------------------------------------------------------------------- def _detect_vendor(self, ip: str): """ Returns (vendor_key_UPPER, vendor_profile_dict, oid_str). Dot-boundary matching prevents HP '1.3.6.1.4.1.11' matching Kyocera '1.3.6.1.4.1.1347'. """ raw = self._snmp_get(ip, SNMP_OIDS['sysObjectID'], timeout=2, retries=1) if raw is None: return 'GENERIC', None, None oid_str = str(raw).lstrip('.') # Some SNMP backends return "iso.3.6.1..." instead of "1.3.6.1..." if oid_str.startswith('iso.'): oid_str = '1.' + oid_str[4:] best_key, best_len, best_profile = 'GENERIC', 0, None for v_name, v_data in VENDOR_PROFILES.items(): for prefix in v_data.get('sys_object_ids', []): p = prefix.lstrip('.') if (oid_str == p or oid_str.startswith(p + '.')) and len(p) > best_len: best_len = len(p) best_key = v_name.upper() best_profile = v_data self.logger.info(f"Vendor detect {ip}: sysObjectID={oid_str} → {best_key}") return best_key, best_profile, oid_str # ------------------------------------------------------------------------- # _auto_detect_model — returns (name, model, serial) # ------------------------------------------------------------------------- def _auto_detect_model(self, ip: str): """ Priority for model: prtGeneralPrinterName > hrDeviceDescr > sysDescr > sysName """ def clean(v): if v is None: return None s = str(v).strip() lower_s = s.lower() if not s: return None # Filter out "No Such Instance" variations if "no such instance" in lower_s or "no such object" in lower_s: return None return s sys_name = self._snmp_get(ip, SNMP_OIDS['sysName'], timeout=2, retries=1) sys_descr = self._snmp_get(ip, SNMP_OIDS['sysDescr'], timeout=2, retries=1) prt_name = self._snmp_get(ip, SNMP_OIDS['prtGeneralPrinterName'], timeout=2, retries=1) prt_serial = self._snmp_get(ip, SNMP_OIDS['prtGeneralSerialNumber'], timeout=2, retries=1) # Canon specific candidate check canon_name = self._snmp_get(ip, SNMP_OIDS['canonModelName'], timeout=2, retries=1) canon_ser = self._snmp_get(ip, SNMP_OIDS['canonSerialNumber'], timeout=2, retries=1) if not clean(canon_ser): canon_ser = self._snmp_get(ip, SNMP_OIDS['canonSerialNumberAlt'], timeout=1, retries=0) if not clean(canon_ser): canon_ser = self._snmp_get(ip, SNMP_OIDS['canonSerialNumberAlt2'], timeout=1, retries=0) # hrDeviceDescr: try device indexes 1–5 hr_descr = None for _idx in range(1, 6): _v = self._snmp_get(ip, f'.1.3.6.1.2.1.25.3.2.1.3.{_idx}', timeout=2, retries=0) _s = clean(_v) if _s: hr_descr = _s break serial = clean(prt_serial) or clean(canon_ser) if not serial and sys_descr: m = re.search(r'(?:SN|Serial)[:\s]*([A-Za-z0-9\-]+)', str(sys_descr), re.IGNORECASE) if m: serial = m.group(1)[:128] model = (clean(prt_name) or clean(canon_name) or hr_descr or clean(sys_descr) or clean(sys_name)) if model: model = model[:128] # Smart Branding: if the name is generic, try to use the brand clean_name = clean(sys_name) vendor_key, _, _ = self._detect_vendor(ip) # If name is "Printer at ..." or just the same as IP, use brand is_generic = not clean_name or "printer at" in clean_name.lower() or clean_name == ip if is_generic: if vendor_key and vendor_key != 'GENERIC': name = vendor_key.title() elif model: # Fallback: extract first word of model if it looks like a brand brands = ['Develop', 'HP', 'Canon', 'Epson', 'Brother', 'Lexmark', 'Xerox', 'Samsung', 'Kyocera', 'Ricoh', 'Konica', 'Minolta', 'Sharp', 'Zebra', 'OKI', 'Toshiba', 'Olivetti'] first = model.split()[0] if first in brands: name = first else: name = model.split()[0] if model else f"Printer-{ip}" else: name = f"Printer-{ip}" else: name = clean_name return name, model, serial # ------------------------------------------------------------------------- # get_printer_metrics # ------------------------------------------------------------------------- def get_printer_metrics(self, ip: str) -> Optional[dict]: """ Collect page counters and toner levels from a single printer via SNMP. Page counter rules: DEVELOP/KM → private per-function BW+color counters ALL OTHERS → .4.1.1 = B&W marker, .4.1.2 = color marker; grand total = sum IMPORTANT: The vendor block sets color_total ONCE. Step 7 does NOT re-query color OIDs. This prevents garbage values from Kyocera private OIDs or .6.1.1 returning small non-zero numbers from corrupting the result. Toner classification via _classify_toner_color() handles EN/IT/FR/DE/ES/PT/NL color words and model-code suffix patterns (CK-8511C, TONER C MF2555, etc.) """ if SNMP_BACKEND is None: init_snmp_backend() if SNMP_BACKEND is None: err = SNMP_BACKEND_ERROR or "pysnmp not installed" self.logger.warning(f"pysnmp unavailable ({err}). Metrics collection disabled.") return None def to_int(value) -> Optional[int]: if value is None: return None try: return int(value) except Exception: m = re.search(r'-?\d+', str(value)) return int(m.group(0)) if m else None def safe_add(*values) -> Optional[int]: nums = [v for v in values if v is not None] return sum(nums) if nums else None def calc_percentage(level, max_cap) -> Optional[int]: if level is None: return None if level == -3: return 100 if level == -2: return 0 if level == -1: return None if max_cap and max_cap > 0 and level >= 0: return int(round((level * 100) / max_cap, 0)) if (max_cap is None or max_cap == 0) and 0 <= level <= 100: return level return None metrics = { 'ip': ip, 'name': None, 'model': None, 'serial': None, 'pages_total': None, 'bw_total': None, 'bw_print': None, 'bw_copy': None, 'color_total': None, 'color_print': None, 'color_copy': None, 'fax': None, 'duplex': None, 'toner_black': None, 'toner_cyan': None, 'toner_magenta': None, 'toner_yellow': None, 'error_state': None, 'supplies': [], 'paper_trays': [], 'status': 'online', } try: # ------ 1. Identity ------ name, model, serial = self._auto_detect_model(ip) metrics['name'] = name metrics['model'] = model metrics['serial'] = serial # ------ 2. Error state ------ error_state = self._snmp_get(ip, SNMP_OIDS['hrPrinterDetectedErrorState'], timeout=2, retries=1) if error_state is not None: metrics['error_state'] = str(error_state)[:128] # ------ 3. Vendor detection & Connectivity Check ------ # Perform a quick reachability test via sysObjectID raw_obj_id = self._snmp_get(ip, SNMP_OIDS['sysObjectID'], timeout=1, retries=0) if raw_obj_id is None: self.logger.info(f"Printer {ip} appears offline (SNMP timeout). Reporting status: offline.") metrics['status'] = 'offline' return metrics vendor, vendor_profile, _ = self._detect_vendor(ip) self.logger.info(f"Vendor detect for {ip}: {vendor}") # ------ 4. Page counters — set ONCE, never re-queried in step 7 ------ # For all vendors: .4.1.1 = B&W marker, .4.1.2 = color marker # Grand total = B&W + color if vendor == 'DEVELOP': # Konica Minolta / Develop bizhub — exact per-function summation BASE = '.1.3.6.1.4.1.18334.1.1.1.5.7.2' def get_pos(oid_suffix): v = to_int(self._snmp_get(ip, f'{BASE}{oid_suffix}')) return v if v is not None and v >= 0 else 0 bw_print = get_pos('.2.1.5.1.2') bw_copy = get_pos('.2.1.5.1.1') bw_total = bw_print + bw_copy col_print_full = get_pos('.2.1.5.2.2') col_copy_full = get_pos('.2.1.5.2.1') col_print_sgl = get_pos('.2.1.5.3.2') col_copy_sgl = get_pos('.2.1.5.3.1') col_print_two = get_pos('.2.1.5.4.2') col_copy_two = get_pos('.2.1.5.4.1') color_print = col_print_full + col_print_sgl + col_print_two color_copy = col_copy_full + col_copy_sgl + col_copy_two color_total = color_print + color_copy total = bw_total + color_total # Vendor grand-total vnd_total = to_int(self._snmp_get(ip, f'{BASE}.1.1.0')) if vnd_total is not None and vnd_total > total: total = vnd_total if bw_total == 0 and color_total == 0: std_total = to_int(self._snmp_get(ip, SNMP_OIDS['prtMarkerLifeCount'])) if std_total: total = std_total; bw_total = std_total; color_total = 0 self.logger.info(f"Develop exact summary {ip}: bw={bw_total}, color={color_total}, total={total}") metrics.update({ 'bw_print': bw_print, 'bw_copy': bw_copy, 'color_print': color_print, 'color_copy': color_copy, 'bw_total': bw_total, 'color_total': color_total, 'pages_total': total, }) else: # Standard marker life count: .4.1.1 = TOTAL pages (all 3 printers # in this fleet only have ONE marker entry — no .4.1.2). total = to_int(self._snmp_get(ip, SNMP_OIDS['prtMarkerLifeCount'])) bw = None color = None if vendor == 'KYOCERA': # Kyocera strictly divides counts by Copier vs Printer components kyo_bw_print = to_int(self._snmp_get(ip, '.1.3.6.1.4.1.1347.42.3.1.2.1.1.1.1')) kyo_col_print = to_int(self._snmp_get(ip, '.1.3.6.1.4.1.1347.42.3.1.2.1.1.1.3')) kyo_bw_copy = to_int(self._snmp_get(ip, '.1.3.6.1.4.1.1347.42.3.1.2.1.1.2.1')) kyo_col_copy = to_int(self._snmp_get(ip, '.1.3.6.1.4.1.1347.42.3.1.2.1.1.2.3')) self.logger.info(f"Kyocera exact counters {ip}: bw_prn={kyo_bw_print}, col_prn={kyo_col_print}, bw_cop={kyo_bw_copy}, col_cop={kyo_col_copy}") kyo_bw = safe_add(kyo_bw_print, kyo_bw_copy) kyo_col = safe_add(kyo_col_print, kyo_col_copy) kyo_tot = safe_add(kyo_bw, kyo_col) if kyo_tot is not None: total = kyo_tot bw = kyo_bw color = kyo_col else: # Fallback to the generic System Total if specific components aren't readable fallback_tot = to_int(self._snmp_get(ip, '.1.3.6.1.4.1.1347.43.10.1.1.12.1.1')) if fallback_tot is not None: total = fallback_tot bw = fallback_tot elif vendor == 'RICOH': # Ricoh: sum color and B&W from Copier and Printer (Full Color + Two-color, B&W) copier_color = to_int(self._snmp_get(ip, '.1.3.6.1.4.1.367.3.2.1.2.19.5.1.9.5')) printer_color = to_int(self._snmp_get(ip, '.1.3.6.1.4.1.367.3.2.1.2.19.5.1.9.11')) printer_two_color = to_int(self._snmp_get(ip, '.1.3.6.1.4.1.367.3.2.1.2.19.5.1.9.10')) copier_bw = to_int(self._snmp_get(ip, '.1.3.6.1.4.1.367.3.2.1.2.19.5.1.9.3')) printer_bw = to_int(self._snmp_get(ip, '.1.3.6.1.4.1.367.3.2.1.2.19.5.1.9.9')) color = 0 bw = 0 if copier_color is not None: color += copier_color if printer_color is not None: color += printer_color if printer_two_color is not None: color += printer_two_color if copier_bw is not None: bw += copier_bw if printer_bw is not None: bw += printer_bw total = color + bw self.logger.info(f"Ricoh panel-matched counters {ip}: copier_color={copier_color}, printer_color={printer_color}, printer_two_color={printer_two_color}, copier_bw={copier_bw}, printer_bw={printer_bw}, color_total={color}, bw_total={bw}, total={total}") # Fallback: if no private counters, total is from .4.1.1, bw=total if bw is None and color is None: bw = total self.logger.info(f"Page counters for {ip}: total={total}, B&W={bw}, color={color}") metrics.update({'bw_total': bw, 'color_total': color, 'pages_total': total}) # ------ 5. Supplies / toner walk ------ type_map = { 1: 'other', 2: 'unknown', 3: 'toner', 4: 'wasteToner', 5: 'ink', 6: 'inkCartridge', 7: 'inkRibbon', 8: 'wasteInk', 9: 'opc', 10: 'developer', 11: 'fuserOil', 12: 'solidWax', 13: 'ribbonWax', 14: 'wasteWax', 15: 'fuser', 16: 'coronaWire', 17: 'fuserOilWick', 18: 'cleanerUnit', 19: 'fuserCleaningPad', 20: 'transferUnit', 21: 'tonerCartridge', 22: 'fuserOiler', 23: 'water', 24: 'wasteWater', 25: 'glueWaterAdditive', 26: 'wastePaper', 27: 'bindingSupply', 28: 'bandingSupply', 29: 'stitchingWire', 30: 'shrinkWrap', 31: 'paperWrap', 32: 'staples', 33: 'inserts', 34: 'covers', } desc_list = self._snmp_walk(ip, SNMP_OIDS['prtMarkerSuppliesDesc'], timeout=2, retries=0, max_rows=32) max_list = self._snmp_walk(ip, SNMP_OIDS['prtMarkerSuppliesMaxCapacity'], timeout=2, retries=0, max_rows=32) level_list = self._snmp_walk(ip, SNMP_OIDS['prtMarkerSuppliesLevel'], timeout=2, retries=0, max_rows=32) type_list = self._snmp_walk(ip, SNMP_OIDS['prtMarkerSuppliesType'], timeout=2, retries=0, max_rows=32) # Use desc+max+level alignment; type is optional (may have fewer rows) count = min(len(desc_list), len(max_list), len(level_list)) for i in range(count): raw_desc = desc_list[i] desc_lower = str(raw_desc).lower() max_cap = to_int(max_list[i]) level = to_int(level_list[i]) # type_list may be shorter — use None if index out of range sup_type = to_int(type_list[i]) if i < len(type_list) else None if level is None: continue pct = calc_percentage(level, max_cap) sup_type_str = type_map.get(sup_type, 'unknown') if sup_type is not None else 'unknown' sup_status = ('unknown' if pct is None else 'low' if pct <= 10 else 'warning' if pct <= 25 else 'ok') metrics['supplies'].append({ 'description': str(raw_desc), 'type': sup_type_str, 'max_capacity': max_cap, 'current_level': level, 'percentage': pct, 'status': sup_status, }) # Is this a toner/ink cartridge? # Accept type 3 (toner), 5 (ink), 21 (tonerCartridge) # OR description contains 'toner'/'ink' and is not waste/developer is_toner = (sup_type in (3, 5, 21) or (('toner' in desc_lower or 'ink' in desc_lower) and 'waste' not in desc_lower and 'developer' not in desc_lower)) if not is_toner: continue slot = _classify_toner_color(str(raw_desc)) if slot == 'black' and metrics['toner_black'] is None: metrics['toner_black'] = pct elif slot == 'cyan' and metrics['toner_cyan'] is None: metrics['toner_cyan'] = pct elif slot == 'magenta' and metrics['toner_magenta'] is None: metrics['toner_magenta'] = pct elif slot == 'yellow' and metrics['toner_yellow'] is None: metrics['toner_yellow'] = pct elif slot is None and metrics['toner_black'] is None and pct is not None: # Unclassified single toner → mono printer black metrics['toner_black'] = pct # ------ 5.5. Paper Trays ------ tray_name_list = self._snmp_walk(ip, SNMP_OIDS['prtInputName'], timeout=2, retries=0, max_rows=16) if not tray_name_list: tray_name_list = self._snmp_walk(ip, SNMP_OIDS['prtInputDescription'], timeout=2, retries=0, max_rows=16) tray_max_list = self._snmp_walk(ip, SNMP_OIDS['prtInputMaxCapacity'], timeout=2, retries=0, max_rows=16) tray_level_list = self._snmp_walk(ip, SNMP_OIDS['prtInputCurrentLevel'], timeout=2, retries=0, max_rows=16) tray_count = min(len(tray_name_list), len(tray_max_list), len(tray_level_list)) for i in range(tray_count): raw_name = tray_name_list[i] max_cap = to_int(tray_max_list[i]) level = to_int(tray_level_list[i]) if level is None or max_cap is None: continue pct = calc_percentage(level, max_cap) status = 'unknown' if pct is not None: status = 'empty' if pct <= 0 else 'low' if pct <= 20 else 'ok' elif level == -3: pct = 100 status = 'ok' elif level == -2: status = 'unknown' if str(raw_name).strip(): metrics['paper_trays'].append({ 'name': str(raw_name).strip(), 'max_capacity': max_cap, 'current_level': level, 'percentage': pct, 'status': status, }) # ------ 6. Vendor toner fallback (black only) ------ if metrics['toner_black'] is None and vendor_profile: l_oid = (vendor_profile.get('toner_black_oids') or [None])[0] m_oid = (vendor_profile.get('toner_black_max_oids') or [None])[0] if l_oid: lv = to_int(self._snmp_get(ip, l_oid, timeout=2, retries=0)) mv = to_int(self._snmp_get(ip, m_oid, timeout=2, retries=0)) if m_oid else None if lv is not None: if mv and mv > 0: metrics['toner_black'] = int(round(lv * 100 / mv, 0)) elif 0 <= lv <= 100: metrics['toner_black'] = lv # ------ 7. color_total post-processing ------ # Determine printer type from toner information is_mono = (metrics['toner_cyan'] is None and metrics['toner_magenta'] is None and metrics['toner_yellow'] is None) # For confirmed mono printers: force color_total = 0 if is_mono and metrics['color_total'] is None: metrics['color_total'] = 0 # Sanity clamp only if (metrics['color_total'] is not None and metrics['pages_total'] is not None and metrics['color_total'] > metrics['pages_total']): metrics['color_total'] = metrics['pages_total'] # Derive bw_total if missing if (metrics['bw_total'] is None and metrics['pages_total'] is not None and metrics['color_total'] is not None): metrics['bw_total'] = max(0, metrics['pages_total'] - metrics['color_total']) if is_mono and metrics['bw_total'] is None and metrics['pages_total'] is not None: metrics['bw_total'] = metrics['pages_total'] # Derive pages_total from parts if primary read failed if (metrics['pages_total'] is None and metrics['bw_total'] is not None and metrics['color_total'] is not None): metrics['pages_total'] = metrics['bw_total'] + metrics['color_total'] except Exception as e: self.logger.error(f"Failed to get metrics for {ip}: {e}", exc_info=True) metrics['status'] = 'error' return metrics def report_jobs_from_metrics(self, metrics_list: List[dict]) -> None: for metric in metrics_list: ip = metric.get('ip') if not ip or metric.get('status') == 'error': continue current_total = int(metric.get('pages_total') or 0) prev = self.job_state.get(ip, {}) prev_total = int(prev.get('pages_total') or 0) if current_total <= 0: continue if prev_total <= 0: self.job_state[ip] = {'pages_total': current_total, 'timestamp': datetime.now(timezone.utc).isoformat()} continue if current_total < prev_total: self.job_state[ip] = {'pages_total': current_total, 'timestamp': datetime.now(timezone.utc).isoformat()} continue delta = current_total - prev_total if delta <= 0: continue job = { 'printer_ip': ip, 'document_name': f'Auto SNMP job ({delta} pages)', 'total_pages': delta, 'bw_pages': delta, 'color_pages': 0, 'duplex_pages': 0, 'timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') } result = self._api_call('report_job', {'job': job}) if result and result.get('success'): self.logger.info(f"Reported job for {ip}: {delta} pages") else: self.logger.debug(f"Failed to report job for {ip}: {result}") self.job_state[ip] = {'pages_total': current_total, 'timestamp': datetime.now(timezone.utc).isoformat()} self._save_state() def sync_printers(self) -> bool: printers = self.discover_printers(NETWORK_RANGE) if not printers: self.logger.info("No printers found on network") return False for p in printers: self.printers[p['ip']] = p result = self._api_call('sync_printers', {'printers': list(self.printers.values())}) self.logger.info(f"sync_printers API response: {result}") if result and result.get('success'): self.logger.info(f"Synced {result.get('synced',0)} printers " f"(created: {result.get('created',0)}, updated: {result.get('updated',0)})") return True self.logger.error(f"Failed to sync printers: {result}") return False def sync_metrics(self) -> bool: """ Sync printer metrics to cloud. Scans both the configured NETWORK_RANGE and any cloud-assigned IPs. """ # 1. Discover printers on configured network discovered = self.discover_printers(NETWORK_RANGE) for p in discovered: self.printers[p['ip']] = p # 2. Collect metrics for BOTH discovered and explicitly assigned IPs target_ips = set(self.printers.keys()) | self.assigned_ips metrics = [] for ip in target_ips: m = self.get_printer_metrics(ip) if m: metrics.append(m) if not metrics: return False result = self._api_call('sync_metrics', {'metrics': metrics}) if result and result.get('success'): self.logger.info(f"Synced metrics for {result.get('recorded',0)} printers") self.report_jobs_from_metrics(metrics) return True self.logger.error(f"Failed to sync metrics: {result}") return False def discover_printers(self, network: str = NETWORK_RANGE) -> List[dict]: """Discover printers on local network using port scan + SNMP identification.""" discovered = [] if SNMP_BACKEND is None: init_snmp_backend() has_snmp = SNMP_BACKEND is not None if not has_snmp: err = SNMP_BACKEND_ERROR or "pysnmp not installed" self.logger.warning(f"pysnmp unavailable ({err}). Using basic port scan instead.") try: import ipaddress networks = _normalize_networks(network) if not networks: self.logger.error("No network ranges provided for scanning.") return [] except Exception as e: self.logger.error(f"Invalid network range: {e}") return [] hosts = [] for net in networks: try: hosts.extend(list(ipaddress.ip_network(net, strict=False).hosts())[:254]) except Exception as e: self.logger.error(f"Invalid network range '{net}': {e}") if not hosts: return [] self.logger.info(f"Scanning {len(hosts)} hosts in {', '.join(networks)}...") for ip in hosts: ip_str = str(ip) is_printer = False for port in [9100, 515, 631, 80, 161]: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(0.5) if sock.connect_ex((ip_str, port)) == 0: is_printer = True sock.close() if is_printer: break except Exception: pass if not is_printer: continue printer_info = { 'ip': ip_str, 'name': f'Printer at {ip_str}', 'model': 'Unknown', 'serial': None, 'status': 'online', 'location': 'Auto-discovered' } if has_snmp: try: name, model, serial = self._auto_detect_model(ip_str) if name: printer_info['name'] = name if model: printer_info['model'] = model if serial: printer_info['serial'] = serial except Exception as e: self.logger.debug(f"SNMP failed for {ip_str}: {e}") discovered.append(printer_info) self.logger.info(f"Found printer: {printer_info['name']} ({ip_str}) model={printer_info['model']}") return discovered def run(self, direct_add_poll_interval: int = 10): self.logger.info("=" * 50) self.logger.info("GRGS Office Cloud Print Agent Starting") self.logger.info("=" * 50) if not self.heartbeat(): self.logger.error("Failed to connect to cloud. Check your token and URL.") return self.logger.info("Connected to cloud successfully!") config = self.get_config() if config: self.logger.info(f"Company: {config.get('company_name')}") self.logger.info(f"Agent: {config.get('agent_name')}") self.sync_printers() self.running = True last_printer_sync = last_metrics_sync = last_heartbeat = time.time() def listen_for_direct_add(): while self.running: try: resp = self._api_call('get_direct_add', method='GET') if resp and resp.get('success') and resp.get('printer'): self.add_printer_direct(resp['printer']) except Exception as e: self.logger.debug(f"Direct add poll failed: {e}") time.sleep(direct_add_poll_interval) threading.Thread(target=listen_for_direct_add, daemon=True).start() while self.running: try: now = time.time() if now - last_heartbeat >= HEARTBEAT_INTERVAL: self.heartbeat() self.fetch_assigned_printers() # Refresh assigned list with heartbeat last_heartbeat = now if now - last_printer_sync >= PRINTER_SYNC_INTERVAL: self.sync_printers() last_printer_sync = now if now - last_metrics_sync >= METRICS_SYNC_INTERVAL: self.sync_metrics() last_metrics_sync = now time.sleep(5) except KeyboardInterrupt: self.logger.info("Shutting down..."); self.running = False except Exception as e: self.logger.error(f"Error in main loop: {e}"); time.sleep(10) self.logger.info("Agent stopped") # ============================================================================= # Main Entry Point # ============================================================================= def load_config_file(): config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'agent_config.json') if os.path.exists(config_path): try: with open(config_path, 'r') as f: return json.load(f) except Exception as e: print(f"Warning: Could not load config file: {e}") return {} def main(): global NETWORK_RANGE file_config = load_config_file() parser = argparse.ArgumentParser( description='GRGS Office Cloud Print Agent', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 grgs_office_agent.py --token YOUR_TOKEN --url https://your-url.ngrok-free.app python3 grgs_office_agent.py --network 10.0.0.0/24 python3 grgs_office_agent.py --network 192.168.180.141/32 # Scan only one printer IP python3 grgs_office_agent.py --scan-only # Just scan, don't sync Config file: Create agent_config.json with token, cloud_url, network keys """ ) parser.add_argument('--token', '-t', default=file_config.get('token', AGENT_TOKEN)) parser.add_argument('--url', '-u', default=file_config.get('cloud_url', CLOUD_URL)) parser.add_argument('--network', '-n', default=file_config.get('network')) parser.add_argument('--scan-only', action='store_true') parser.add_argument('--debug', '-d', action='store_true') parser.add_argument('--log-file', '-l', default=LOG_FILE) args = parser.parse_args() log_level = logging.DEBUG if args.debug else LOG_LEVEL handlers = [logging.StreamHandler()] try: handlers.append(logging.FileHandler(args.log_file)) except PermissionError: fb = os.path.join('/tmp', os.path.basename(args.log_file) or 'grgs_office_agent.log') handlers.append(logging.FileHandler(fb)) print(f"Warning: log not writable ({args.log_file}), using {fb}.") logging.basicConfig(level=log_level, format='%(asctime)s [%(levelname)s] %(message)s', handlers=handlers) if args.token == "YOUR_AGENT_TOKEN_HERE" or not args.token: print("\nERROR: Please configure your agent token!") print("Get your token from the GRGS Office dashboard -> Cloud Agents -> Add Agent") print("\nUsage: python3 grgs_office_agent.py --token YOUR_TOKEN --url https://your-url.ngrok-free.app") sys.exit(1) if args.url == "https://your-ngrok-url.ngrok-free.app" or not args.url: print("\nERROR: Please configure your cloud URL!") sys.exit(1) printer_ip = file_config.get('printer_ip') agent = GRGSOfficeAgent(args.token, args.url) poll_interval = int(os.environ.get('DIRECT_ADD_POLL_INTERVAL', '10')) if printer_ip: result = agent._api_call('sync_printers', {'printers': [{ 'ip': printer_ip, 'name': f'Printer at {printer_ip}', 'model': None, 'location': 'Manual', 'status': 'online' }]}) print(f"\nSynced manual printer {printer_ip}: {result}") else: if args.network: NETWORK_RANGE = args.network else: print("\nERROR: No network range provided! Please specify --network or set it in agent_config.json.") sys.exit(1) if args.scan_only: print(f"\nScanning network {NETWORK_RANGE}...") printers = agent.discover_printers(NETWORK_RANGE) print(f"\nFound {len(printers)} printer(s):\n") for p in printers: print(f" {p['ip']:15} {p['name'][:40]:40} {p['model'][:30]}") else: agent.run(direct_add_poll_interval=poll_interval) if __name__ == '__main__': main()