Sistema de Detección de Archivos Duplicados mediante Escaneo Paralelo

En sistemas operativos como Windows, es común encontrar archivos con el mismo nombre en diferentes ubicaciones, generados por descargas repetidas, copias o cachés de aplicaciones. Esto puede desperdiciar espacio en disco y dificultar la gestión de archivos. Este artículo presenta el diseño de una herramienta en Python que escanea múltiples unidades de disco para identificar archivos homónimos, proporcionando una base para la limpieza manual posterior.

Principios de Implementación

El sistema emplea una combinación de escaneo concurrente y un índice basado en hash para localizar archivos con nombres idénticos de manera eficiente. A continuación se detallan los conceptos clave.

1. Estructura de Datos para el Índice

Se utiliza un diccionario con listas como valores para agrupar archivos por nombre. La clave es el nombre del archivo convertido a minúsculas (para manejar la insensibilidad a mayúsculas de Windows), y el valor es una lista de tuplas con la ruta completa, el tamaño y la fecha de modificación de cada ocurrencia.

self.indice_archivos = defaultdict(list)

2. Escaneo Concurrente por Unidad de Disco

Cada unidad (por ejemplo, C:, D:) se asigna a un hilo independiente. Esto permite aprovechar la paralización de operaciones de E/S, acelerando el proceso en entornos con múltiples discos.

3. Control de Hilos con Bloqueos

Dado que varios hilos escriben simultáneamente en el índice compartido, se emplea un bloqueo para garantizar la integridad de los datos.

self.bloqueo = threading.Lock()

4. Optimización del Escaneo y Manejo de Excepciones

Para reducir el tiempo de escaneo, se omiten directorios del sistema como System*, Recovery* y Windows. Además, se capturan excepciones de permisos u otros errores de E/S, permitiendo que el escaneo continúe sin interrupciones.

dirs[:] = [d for d in dirs if not d.startswith(('$', 'System', 'Recovery', 'Windows'))]

5. Obtención Eficiente de Metadatos

Durante el recorrido, se accede una sola vez a los metadatos del archivo mediante os.stat(), obteniendo tamaño y fecha de modificación para evitar operaciones redundantes.

6. Filtrado de Archivos Duplicados

Tras el escaneo, se filtran las entradas del índice donde el mismo nombre corresponde a más de una ubicación, considerándolos como duplicados potenciales basados únicamente en el nombre.

duplicados = {
    nombre: rutas
    for nombre, rutas in self.indice_archivos.items()
    if len(rutas) > 1
}

7. Presentación de Resultados

Los duplicados se ordenan por fecha de modificación descendente dentro de cada grupo, y los tamaños se formatean para una lectura más clara (KB, MB, etc.).

Implementación en Código

La siguiente clase encapsula la lógica del escaneo y la detección de duplicados. Se han renombrado variables y métodos para ilustrar una implementación alternativa, manteniendo la funcionalidad esencial.

import os
import threading
from collections import defaultdict
from datetime import datetime

class EscanerDeDuplicados:
    def __init__(self):
        self.indice = defaultdict(list)
        self.lock = threading.Lock()

    def _formatear_tamano(self, bytes_tamano):
        unidades = ['B', 'KB', 'MB', 'GB', 'TB']
        for unidad in unidades:
            if bytes_tamano < 1024.0:
                return f"{bytes_tamano:.1f} {unidad}"
            bytes_tamano /= 1024.0
        return f"{bytes_tamano:.1f} PB"

    def _escanear_unidad(self, letra_unidad):
        ruta_unidad = f"{letra_unidad}:\\"
        if not os.path.exists(ruta_unidad):
            return
        try:
            for dir_raiz, subdirs, archivos in os.walk(ruta_unidad, topdown=True):
                subdirs[:] = [d for d in subdirs if not d.startswith(('$', 'System', 'Recovery', 'Windows'))]
                for nombre_archivo in archivos:
                    ruta_completa = os.path.join(dir_raiz, nombre_archivo)
                    try:
                        info = os.stat(ruta_completa)
                        tamano = info.st_size
                        modificado = datetime.fromtimestamp(info.st_mtime)
                        clave = nombre_archivo.lower()
                        with self.lock:
                            self.indice[clave].append((ruta_completa, tamano, modificado))
                    except (PermissionError, OSError):
                        continue
        except Exception:
            pass

    def buscar_duplicados(self, lista_unidades=None):
        if lista_unidades is None:
            lista_unidades = ['C', 'D']
        hilos = []
        for letra in lista_unidades:
            hilo = threading.Thread(target=self._escanear_unidad, args=(letra,))
            hilos.append(hilo)
            hilo.start()
        for h in hilos:
            h.join()
        return {nombre: rutas for nombre, rutas in self.indice.items() if len(rutas) > 1}

    def mostrar_resultados(self, duplicados):
        if not duplicados:
            return
        for nombre, lista_info in sorted(duplicados.items()):
            lista_info.sort(key=lambda x: x[2], reverse=True)
            for ruta, tamano, fecha in lista_info:
                tamano_formateado = self._formatear_tamano(tamano)
                fecha_str = fecha.strftime('%Y-%m-%d %H:%M:%S')
                print(f"{ruta} - {tamano_formateado} - {fecha_str}")

if __name__ == "__main__":
    escaner = EscanerDeDuplicados()
    unidades_a_escanear = ['C', 'D', 'E']
    resultados = escaner.buscar_duplicados(unidades_a_escanear)
    escaner.mostrar_resultados(resultados)

Ejemplo de Salida

Al ejecutar el script, se producirá una lista de archivos duplicados con su ruta, tamaño y fecha de última modificación, facilitando la iedntificación de versiones recientes y la toma de decisiones para la limpieza.

Etiquetas: Python Multithreading file scanning duplicate detection os module

Publicado el 6-12 02:44