Funciones en el codigo » History » Version 5
Version 4 (fernanda ventura, 12/29/2023 12:49 PM) → Version 5/7 (fernanda ventura, 12/29/2023 12:49 PM)
h1. Funciones en el codigo
h3. Clase que representa la pantalla del usuario asistido
<pre>
class AsistidoScreen(Screen):
recording = False # @Indica Indica si se está grabando@ grabando
frames = [] # Almacena fragmentos de audio
audio_stream = None # Flujo de entrada de audio
sound = None # Objeto para reproducir sonido
file_name = "grabacion.wav" # Nombre del archivo de audio
# Inicia la grabación de audio en un hilo separado
def iniciar_grabacion(self):
if not self.recording:
self.recording = True
threading.Thread(target=self._grabar_audio).start()
# Detiene la grabación de audio y reproduce el audio grabado en un hilo separado
def detener_grabacion(self):
if self.recording:
self.recording = False
threading.Thread(target=self._reproducir_audio).start()
# Método privado que realiza la grabación de audio
def _grabar_audio(self):
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100
p = pyaudio.PyAudio()
# Crea o sobrescribe el archivo de audio
with open(self.file_name, 'wb'):
pass
# Abre el flujo de entrada de audio
self.audio_stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
# Bucle de grabación
while self.recording:
data = self.audio_stream.read(CHUNK)
self.frames.append(data)
# Detiene y cierra el flujo de entrada de audio
self.audio_stream.stop_stream()
self.audio_stream.close()
p.terminate()
# Guarda los frames grabados en un archivo WAV
with wave.open(self.file_name, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pyaudio.PyAudio().get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(self.frames))
# Reproduce el audio grabado
self._reproducir_audio()
# Limpia los frames grabados
self.frames = []
# Método privado que reproduce el audio grabado
def _reproducir_audio(self):
if self.frames:
sound_data = b''.join(self.frames)
# Carga el sonido y lo reproduce
sound = SoundLoader.load(self.file_name)
if sound:
sound.play()
# Se llama al abandonar la pantalla para detener el sonido y cerrar el flujo de audio
def on_leave(self):
if self.sound:
self.sound.stop()
if self.audio_stream:
self.audio_stream.stop_stream()
self.audio_stream.close()
# Muestra un teclado en una ventana emergente
def show_keyboard(self):
self.keyboard_popup = Popup(title='Teclado',
content=self._build_keyboard_layout(),
size_hint=(None, None),
size=(400, 300),
auto_dismiss=True)
self.keyboard_popup.open()
# Construye el diseño del teclado en la ventana emergente
def _build_keyboard_layout(self):
layout = BoxLayout(orientation='vertical', spacing=10, padding=10)
text_input = TextInput(multiline=False, readonly=True, halign='center', font_size=24)
layout.add_widget(text_input)
buttons_layout = GridLayout(cols=10, spacing=5, size_hint_y=None, height=150)
# Diseño del teclado QWERTY
qwerty_layout = [
'Q', 'W', 'E', 'R', 'T', 'Y', 'U', 'I', 'O', 'P',
'A', 'S', 'D', 'F', 'G', 'H', 'J', 'K', 'L',
'Z', 'X', 'C', 'V', 'B', 'N', 'M',
'Space', 'Borrar', 'Enter'
]
# Agrega botones al diseño
for button_text in qwerty_layout:
buttons_layout.add_widget(Button(text=button_text, on_press=self._on_button_press))
layout.add_widget(buttons_layout)
return layout
# Maneja el evento de presionar un botón en el teclado
def _on_button_press(self, instance):
current_text_input = self.find_text_input(self.keyboard_popup.content)
# Acciones según el botón presionado
if instance.text == 'Enter':
self.reproducir_texto(current_text_input.text)
current_text_input.text = ''
elif instance.text == 'Space':
current_text_input.text += ' '
elif instance.text == 'Borrar':
current_text_input.text = ''
else:
current_text_input.text += instance.text
# Encuentra el TextInput entre los hijos de un layout
def find_text_input(self, layout):
for child in layout.children:
if isinstance(child, TextInput):
return child
elif len(child.children) > 0:
text_input = self.find_text_input(child)
if text_input:
return text_input
return None
# Inicia la reproducción de texto en un hilo separado
def reproducir_texto(self, texto):
threading.Thread(target=self._reproducir_voz, args=(texto,)).start()
# Método privado que reproduce el texto utilizando la biblioteca pyttsx3
def _reproducir_voz(self, texto):
engine = pyttsx3.init()
engine.say(texto)
engine.runAndWait()
</pre>
h3. Clase que representa la pantalla del cuidador
<pre>
class CuidadorScreen(Screen):
video = None # Widget de video
motion_label = None # Etiqueta de movimiento
camera_initialized = False # Indica si la cámara ha sido inicializada
# Se llama al entrar en la pantalla para inicializar la cámara y detectar el movimiento
def on_enter(self, *args):
super(CuidadorScreen, self).on_enter(*args)
# Crea el widget de video si no se ha inicializado previamente
if not self.camera_initialized:
self.video = Video(
source="http://172.20.10.3:5000/video_feed",
state="play",
options={"allow_stretch": True},
)
self.video.bind(texture=self.texture_callback)
self.ids.camera_layout.add_widget(self.video)
# Crea la etiqueta para la alerta de movimiento
self.motion_label = Label(
text="No se detecta movimiento", font_size="15sp", color=(1, 0, 0, 1)
)
self.ids.camera_layout.add_widget(self.motion_label)
# Inicia el procesamiento de la cámara en un hilo separado
threading.Thread(target=self.detect_motion).start()
# Marca la cámara como inicializada para evitar la duplicación
self.camera_initialized = True
# Callback para actualizar la textura del widget de video
def texture_callback(self, instance, value):
self.video.texture = value
# Método que detecta el movimiento en el feed de la cámara
def detect_motion(self):
cap = cv2.VideoCapture("http://172.20.10.3:5000/video_feed")
_, prev_frame = cap.read()
while True:
_, frame = cap.read()
diff = cv2.absdiff(prev_frame, frame)
gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, 30, 255, cv2.THRESH_BINARY)
contours, _ = cv2.findContours(
thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
# Verifica si hay algún contorno de movimiento
motion_detected = any(cv2.contourArea(cnt) > 1000 for cnt in contours)
if motion_detected:
# Programa la muestra de la alerta
Clock.schedule_once(self.show_alert, 0)
prev_frame = frame.copy()
# Muestra una alerta cuando se detecta movimiento
def show_alert(self, *args):
# Función asíncrona para enviar un mensaje a través de Telegram
async def send_telegram_message(token, chat_id, message_text):
bot = Client("my_account", api_id="your_api_id", api_hash="your_api_hash", bot_token=token)
await bot.send_message(chat_id=chat_id, text=message_text)
await bot.run()
TOKEN = "6879188437:AAFO-HGke4OIoUZMGN_EAl6_vgcMfDufqhY"
CHAT_ID = "5081858998"
MESSAGE_TEXT = "¡Se detectó movimiento!"
# Llama a la función asíncrona para enviar el mensaje a Telegram
asyncio.run(send_telegram_message(TOKEN, CHAT_ID, MESSAGE_TEXT))
# Actualiza la etiqueta de movimiento
self.motion_label.text = "¡Se detectó movimiento!"
# Programa la reinicialización de la alerta después de 5 segundos
Clock.schedule_once(self.reset_alert, 5)
# Reinicia la etiqueta y la detección de movimiento para la próxima alerta
def reset_alert(self, *args):
self.motion_label.text = "No se detecta movimiento"
threading.Thread(
target=self.detect_motion
).start() # Reiniciar la detección para la próxima alerta
</pre>
h3. # Clase que representa la pantalla de información
<pre>
class InformacionScreen(Screen):
# Abre el archivo PDF
def open_pdf(self):
pdf_file_path = "hola.pdf"
try:
os.startfile(pdf_file_path)
except Exception as e:
print(f"Error al abrir el archivo PDF: {e}")
</pre>
h3. Clase que representa la pantalla del usuario asistido
<pre>
class AsistidoScreen(Screen):
recording = False # @Indica Indica si se está grabando@ grabando
frames = [] # Almacena fragmentos de audio
audio_stream = None # Flujo de entrada de audio
sound = None # Objeto para reproducir sonido
file_name = "grabacion.wav" # Nombre del archivo de audio
# Inicia la grabación de audio en un hilo separado
def iniciar_grabacion(self):
if not self.recording:
self.recording = True
threading.Thread(target=self._grabar_audio).start()
# Detiene la grabación de audio y reproduce el audio grabado en un hilo separado
def detener_grabacion(self):
if self.recording:
self.recording = False
threading.Thread(target=self._reproducir_audio).start()
# Método privado que realiza la grabación de audio
def _grabar_audio(self):
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100
p = pyaudio.PyAudio()
# Crea o sobrescribe el archivo de audio
with open(self.file_name, 'wb'):
pass
# Abre el flujo de entrada de audio
self.audio_stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
# Bucle de grabación
while self.recording:
data = self.audio_stream.read(CHUNK)
self.frames.append(data)
# Detiene y cierra el flujo de entrada de audio
self.audio_stream.stop_stream()
self.audio_stream.close()
p.terminate()
# Guarda los frames grabados en un archivo WAV
with wave.open(self.file_name, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pyaudio.PyAudio().get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(self.frames))
# Reproduce el audio grabado
self._reproducir_audio()
# Limpia los frames grabados
self.frames = []
# Método privado que reproduce el audio grabado
def _reproducir_audio(self):
if self.frames:
sound_data = b''.join(self.frames)
# Carga el sonido y lo reproduce
sound = SoundLoader.load(self.file_name)
if sound:
sound.play()
# Se llama al abandonar la pantalla para detener el sonido y cerrar el flujo de audio
def on_leave(self):
if self.sound:
self.sound.stop()
if self.audio_stream:
self.audio_stream.stop_stream()
self.audio_stream.close()
# Muestra un teclado en una ventana emergente
def show_keyboard(self):
self.keyboard_popup = Popup(title='Teclado',
content=self._build_keyboard_layout(),
size_hint=(None, None),
size=(400, 300),
auto_dismiss=True)
self.keyboard_popup.open()
# Construye el diseño del teclado en la ventana emergente
def _build_keyboard_layout(self):
layout = BoxLayout(orientation='vertical', spacing=10, padding=10)
text_input = TextInput(multiline=False, readonly=True, halign='center', font_size=24)
layout.add_widget(text_input)
buttons_layout = GridLayout(cols=10, spacing=5, size_hint_y=None, height=150)
# Diseño del teclado QWERTY
qwerty_layout = [
'Q', 'W', 'E', 'R', 'T', 'Y', 'U', 'I', 'O', 'P',
'A', 'S', 'D', 'F', 'G', 'H', 'J', 'K', 'L',
'Z', 'X', 'C', 'V', 'B', 'N', 'M',
'Space', 'Borrar', 'Enter'
]
# Agrega botones al diseño
for button_text in qwerty_layout:
buttons_layout.add_widget(Button(text=button_text, on_press=self._on_button_press))
layout.add_widget(buttons_layout)
return layout
# Maneja el evento de presionar un botón en el teclado
def _on_button_press(self, instance):
current_text_input = self.find_text_input(self.keyboard_popup.content)
# Acciones según el botón presionado
if instance.text == 'Enter':
self.reproducir_texto(current_text_input.text)
current_text_input.text = ''
elif instance.text == 'Space':
current_text_input.text += ' '
elif instance.text == 'Borrar':
current_text_input.text = ''
else:
current_text_input.text += instance.text
# Encuentra el TextInput entre los hijos de un layout
def find_text_input(self, layout):
for child in layout.children:
if isinstance(child, TextInput):
return child
elif len(child.children) > 0:
text_input = self.find_text_input(child)
if text_input:
return text_input
return None
# Inicia la reproducción de texto en un hilo separado
def reproducir_texto(self, texto):
threading.Thread(target=self._reproducir_voz, args=(texto,)).start()
# Método privado que reproduce el texto utilizando la biblioteca pyttsx3
def _reproducir_voz(self, texto):
engine = pyttsx3.init()
engine.say(texto)
engine.runAndWait()
</pre>
h3. Clase que representa la pantalla del cuidador
<pre>
class CuidadorScreen(Screen):
video = None # Widget de video
motion_label = None # Etiqueta de movimiento
camera_initialized = False # Indica si la cámara ha sido inicializada
# Se llama al entrar en la pantalla para inicializar la cámara y detectar el movimiento
def on_enter(self, *args):
super(CuidadorScreen, self).on_enter(*args)
# Crea el widget de video si no se ha inicializado previamente
if not self.camera_initialized:
self.video = Video(
source="http://172.20.10.3:5000/video_feed",
state="play",
options={"allow_stretch": True},
)
self.video.bind(texture=self.texture_callback)
self.ids.camera_layout.add_widget(self.video)
# Crea la etiqueta para la alerta de movimiento
self.motion_label = Label(
text="No se detecta movimiento", font_size="15sp", color=(1, 0, 0, 1)
)
self.ids.camera_layout.add_widget(self.motion_label)
# Inicia el procesamiento de la cámara en un hilo separado
threading.Thread(target=self.detect_motion).start()
# Marca la cámara como inicializada para evitar la duplicación
self.camera_initialized = True
# Callback para actualizar la textura del widget de video
def texture_callback(self, instance, value):
self.video.texture = value
# Método que detecta el movimiento en el feed de la cámara
def detect_motion(self):
cap = cv2.VideoCapture("http://172.20.10.3:5000/video_feed")
_, prev_frame = cap.read()
while True:
_, frame = cap.read()
diff = cv2.absdiff(prev_frame, frame)
gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, 30, 255, cv2.THRESH_BINARY)
contours, _ = cv2.findContours(
thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
# Verifica si hay algún contorno de movimiento
motion_detected = any(cv2.contourArea(cnt) > 1000 for cnt in contours)
if motion_detected:
# Programa la muestra de la alerta
Clock.schedule_once(self.show_alert, 0)
prev_frame = frame.copy()
# Muestra una alerta cuando se detecta movimiento
def show_alert(self, *args):
# Función asíncrona para enviar un mensaje a través de Telegram
async def send_telegram_message(token, chat_id, message_text):
bot = Client("my_account", api_id="your_api_id", api_hash="your_api_hash", bot_token=token)
await bot.send_message(chat_id=chat_id, text=message_text)
await bot.run()
TOKEN = "6879188437:AAFO-HGke4OIoUZMGN_EAl6_vgcMfDufqhY"
CHAT_ID = "5081858998"
MESSAGE_TEXT = "¡Se detectó movimiento!"
# Llama a la función asíncrona para enviar el mensaje a Telegram
asyncio.run(send_telegram_message(TOKEN, CHAT_ID, MESSAGE_TEXT))
# Actualiza la etiqueta de movimiento
self.motion_label.text = "¡Se detectó movimiento!"
# Programa la reinicialización de la alerta después de 5 segundos
Clock.schedule_once(self.reset_alert, 5)
# Reinicia la etiqueta y la detección de movimiento para la próxima alerta
def reset_alert(self, *args):
self.motion_label.text = "No se detecta movimiento"
threading.Thread(
target=self.detect_motion
).start() # Reiniciar la detección para la próxima alerta
</pre>
h3. # Clase que representa la pantalla de información
<pre>
class InformacionScreen(Screen):
# Abre el archivo PDF
def open_pdf(self):
pdf_file_path = "hola.pdf"
try:
os.startfile(pdf_file_path)
except Exception as e:
print(f"Error al abrir el archivo PDF: {e}")
</pre>