Servidor » History » Version 1
Version 1/3
-
Next » -
Current version
cristobal hernandez, 12/18/2025 01:30 AM
Servidor¶
import time
import glob
import json
import requests
import os
import random
from datetime import datetime
import grovepi
from grovepi import *
from grove_rgb_lcd import *
from gpiozero import AngularServo
import firebase_admin
from firebase_admin import credentials, firestore, messaging
# Archivos Locales
FILE_ID = "savenemo_id.txt"
FILE_CONFIG_LOCAL = "config.json"
FILE_CREDENTIALS = "serviceAccountKey.json"
# Configuracion Sensor Temperatura
SENSOR_TEMP_ID = "28-00000053d2ea"
SENSOR_TEMP_PATH = f"/sys/bus/w1/devices/{SENSOR_TEMP_ID}/w1_slave"
# Puertos GrovePi
PORT_ULTRASONIC = 8 # D8
PORT_RELAY_CAL = 3 # D3 (Calefactor)
PORT_PH = 0 # A0
PORT_SENSOR_LUZ = 1 # A1
SERVO_PIN = 18 # Pin GPIO (Servomotor)
# Variables Globales de Hardware
alimentador_servo = None
# Configuracion Inicial por Defecto
DEFAULT_CONFIG = {
"nombre_pez": "Esperando App...",
"temp_min": 24.0, "temp_max": 28.0,
"ph_min": 6.5, "ph_max": 7.5,
"nivel_luz": 50, "nivel_agua": 80,
"horarios_comida": [],
"sistema_alimentacion_on": True
}
# Estado del Sistema
config_actual = DEFAULT_CONFIG.copy()
nemo_id = ""
db = None
firebase_activo = False
ultimo_minuto_alimentacion = ""
ya_se_envio_notificacion = False
# Funciones BD
def obtener_id_unico():
"""Lee o genera un ID unico para identificar esta pecera."""
if os.path.exists(FILE_ID):
with open(FILE_ID, 'r') as f:
return f.read().strip()
else:
nuevo_id = f"NEMO-{random.randint(1000, 9999)}"
with open(FILE_ID, 'w') as f:
f.write(nuevo_id)
return nuevo_id
def verificar_internet():
try:
requests.get('https://www.google.com', timeout=3)
return True
except: return False
def esperar_internet():
print("Buscando conexion a internet...")
while not verificar_internet():
time.sleep(3)
print("Conexion establecida.")
def iniciar_firebase():
global db, firebase_activo
try:
if os.path.exists(FILE_CREDENTIALS):
cred = credentials.Certificate(FILE_CREDENTIALS)
if not firebase_admin._apps:
firebase_admin.initialize_app(cred)
db = firestore.client()
firebase_activo = True
print("Firebase conectado exitosamente.")
else:
print("Falta archivo serviceAccountKey.json")
except Exception as e:
print(f"Error Firebase: {e}")
firebase_activo = False
def sincronizar_configuracion():
global config_actual
if not firebase_activo: return
try:
doc = db.collection('acuarios').document(nemo_id).collection('data').document('config').get()
if doc.exists:
datos = doc.to_dict()
if datos != config_actual: # Solo actualizar si hay cambios
print("Configuracion actualizada desde la nube.")
config_actual.update(datos)
with open(FILE_CONFIG_LOCAL, 'w') as f: json.dump(config_actual, f)
except: pass
def subir_estado(temp, ph, luz, agua, alertas):
"""Sube los datos de sensores a Firestore."""
if not firebase_activo: return
try:
db.collection('acuarios').document(nemo_id).collection('data').document('estado').set({
'temp_actual': temp, 'ph_actual': ph, 'luz_actual': luz, 'agua_nivel': agua,
'alertas': alertas, 'ultima_actualizacion': datetime.now().isoformat()
})
except: pass
def enviar_notificacion_push(titulo, mensaje):
if not firebase_activo: return
try:
tokens_ref = db.collection('acuarios').document(nemo_id).collection('data').document('tokens')
doc = tokens_ref.get()
token = doc.to_dict().get('token_celular') if doc.exists else None
if token:
msg = messaging.Message(
notification=messaging.Notification(title=titulo, body=mensaje),
token=token,
)
messaging.send(msg)
print(f"Push enviado: {titulo}")
except Exception as e:
print(f"Error push: {e}")
def guardar_historial_evento(titulo, mensaje):
if not firebase_activo: return
try:
db.collection('acuarios').document(nemo_id).collection('historial').add({
'titulo': titulo,
'mensaje': mensaje,
'fecha': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'timestamp': datetime.now().timestamp()
})
except: pass
# Hardware
def inicializar_hardware():
global alimentador_servo
try:
# Relay
pinMode(PORT_RELAY_CAL, "OUTPUT")
digitalWrite(PORT_RELAY_CAL, 0)
# LCD
try:
setText("INICIANDO\nSISTEMA...")
setRGB(0, 0, 255) # Azul
except: print("LCD no detectado (continuando...)")
# Servo Motor
alimentador_servo = AngularServo(SERVO_PIN, min_angle=0, max_angle=180,
min_pulse_width=0.0006, max_pulse_width=0.0024)
print("Hardware inicializado correctamente.")
except Exception as e:
print(f"Error Hardware: {e}")
alimentador_servo = None
# Sensores
def leer_temperatura():
try:
if not os.path.exists(SENSOR_TEMP_PATH):
print("Sensor de temperatura no encontrado (Cable desconectado?)")
return 25.0
with open(SENSOR_TEMP_PATH, 'r') as f:
lineas = f.readlines()
if lineas[0].strip()[-3:] == 'YES':
posicion_t = lineas[1].find('t=')
if posicion_t != -1:
temp_string = lineas[1][posicion_t+2:]
temp_c = float(temp_string) / 1000.0
return round(temp_c, 2)
except Exception as e:
print(f"Error leyendo temp: {e}")
return 25.0
def leer_ph():
VOLTAJE_NEUTRO = 2.50
FACTOR_CONVERSION = 4.17
try:
valor_raw = grovepi.analogRead(PORT_PH)
voltaje = float(valor_raw) * 5.0 / 1023
ph_actual = 7.0 + ((voltaje - VOLTAJE_NEUTRO) * FACTOR_CONVERSION)
return round(ph_actual, 1)
except: return 7.0
def leer_nivel_agua():
try:
altura_total = 40
dist = ultrasonicRead(PORT_ULTRASONIC)
if dist > altura_total: dist = 40
porc = int(100 - ((dist / altura_total) * 100))
return max(0, min(100, porc))
except: return 80
def leer_luz():
try:
lectura = analogRead(PORT_SENSOR_LUZ)
return int((lectura / 800.0) * 100)
except: return 50
# Actuadores
def gestionar_calefactor(temp_actual):
target = float(config_actual.get('temp_min', 24.0))
if temp_actual < target:
digitalWrite(PORT_RELAY_CAL, 1) # Encender
return True
elif temp_actual > target + 0.5:
digitalWrite(PORT_RELAY_CAL, 0) # Apagar
return False
return False
def mover_servo_alimentar():
if alimentador_servo is None: return False
try:
print("Dispensando comida...")
alimentador_servo.angle = 90
time.sleep(0.8)
alimentador_servo.angle = 110
time.sleep(0.3)
alimentador_servo.angle = 90
time.sleep(0.3)
alimentador_servo.angle = 0
time.sleep(1)
alimentador_servo.value = None
return True
except Exception as e:
print(f"Error Servo: {e}")
return False
def gestionar_comida():
global ultimo_minuto_alimentacion
if not config_actual.get('sistema_alimentacion_on', True):
return False
hora_actual = datetime.now().strftime("%H:%M")
horarios = config_actual.get("horarios_comida", [])
if (hora_actual in horarios) and (hora_actual != ultimo_minuto_alimentacion):
print(f"Intentando alimentar ({hora_actual})...")
exito = mover_servo_alimentar()
if exito:
ultimo_minuto_alimentacion = hora_actual
# EMOJI REMOVIDO AQUI
enviar_notificacion_push("Hora de comer!", f"Tu pez ha sido alimentado a las {hora_actual}")
guardar_historial_evento("Comida Servida", "Dispensador automatico activado con exito.")
return True
return False
def actualizar_lcd(nemo_id, temp, ph, luz, agua, calefactor_on, comida_servida):
try:
if comida_servida:
setText(" DISPENSANDO\n COMIDA...")
time.sleep(3)
return
if calefactor_on: setRGB(255, 60, 0) # Naranja
else: setRGB(0, 255, 0) # Verde
setText(f"ID:{nemo_id}\nT:{temp}C pH:{ph}")
time.sleep(4)
setText(f"ID:{nemo_id}\nLuz:{luz}% Agua:{agua}%")
time.sleep(4)
except: pass
def main():
global nemo_id, ya_se_envio_notificacion
print("\n--- INICIANDO SAVENEMO ---")
inicializar_hardware()
nemo_id = obtener_id_unico()
print(f"ID DISPOSITIVO: {nemo_id}")
esperar_internet()
iniciar_firebase()
print("Loop principal iniciado.\n")
while True:
try:
sincronizar_configuracion()
temp = leer_temperatura()
ph = leer_ph()
agua = leer_nivel_agua()
luz = leer_luz()
alertas = []
cfg = config_actual
if temp < float(cfg.get('temp_min', 20)): alertas.append("Temp Baja")
if temp > float(cfg.get('temp_max', 35)): alertas.append("Temp Alta")
if agua < int(cfg.get('nivel_agua', 80)): alertas.append("Nivel Bajo")
if ph < int(cfg.get('ph_min', 80)): alertas.append("PH Bajo")
if ph > int(cfg.get('ph_max', 80)): alertas.append("PH Alto")
calefactor_on = gestionar_calefactor(temp)
comida_servida = gestionar_comida()
if alertas:
if not ya_se_envio_notificacion:
print(f"Alertas: {alertas}")
# EMOJI REMOVIDO AQUI
enviar_notificacion_push("ALERTA ACUARIO ", f"Atencion: {', '.join(alertas)}")
guardar_historial_evento("Alerta Detectada", ', '.join(alertas))
ya_se_envio_notificacion = True
else:
if ya_se_envio_notificacion: ya_se_envio_notificacion = False
subir_estado(temp, ph, luz, agua, alertas)
cal_status = 'ON' if calefactor_on else 'OFF'
# Solo para ver por terminal
print(f"[{datetime.now().strftime('%H:%M')}] T:{temp}C pH:{ph} L:{luz}% A:{agua}% Cal:{cal_status}")
actualizar_lcd(nemo_id, temp, ph, luz, agua, calefactor_on, comida_servida)
except KeyboardInterrupt:
print("\nApagando sistema...")
try:
digitalWrite(PORT_RELAY_CAL, 0)
setRGB(0, 0, 0)
setText("")
if alimentador_servo: alimentador_servo.detach()
except: pass
break
except Exception as e:
print(f"Error en Loop: {e}")
time.sleep(5)
if __name__ == "__main__":
main()