Spaces:
Running
Running
| import logging | |
| import os | |
| import hashlib | |
| import tarfile | |
| import urllib.request | |
| import zipfile | |
| from tqdm import tqdm | |
| from pathlib import Path | |
| from logger import logger | |
| from py7zr import SevenZipFile | |
| class TqdmUpTo(tqdm): | |
| def update_to(self, b=1, bsize=1, tsize=None): | |
| if tsize is not None: | |
| self.total = tsize | |
| self.update(b * bsize - self.n) | |
| def _download_file(url, dest_path): | |
| logging.info(f"Downloading: {url}") | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3' | |
| } | |
| if os.path.exists(dest_path): | |
| file_size = os.path.getsize(dest_path) | |
| headers['Range'] = f'bytes={file_size}-' | |
| request = urllib.request.Request(url, headers=headers) | |
| response = urllib.request.urlopen(request) | |
| if response.geturl() != url: | |
| return _download_file(response.geturl(), dest_path) | |
| total_size = int(response.headers['Content-Length']) | |
| with open(dest_path, 'ab') as file, tqdm(total=total_size, unit='B', unit_scale=True, unit_divisor=1024, miniters=1, | |
| desc=url.split('/')[-1]) as t: | |
| chunk_size = 1024 * 1024 # 1MB | |
| while True: | |
| chunk = response.read(chunk_size) | |
| if not chunk: | |
| break | |
| file.write(chunk) | |
| t.update(len(chunk)) | |
| def verify_md5(file_path, expected_md5): | |
| md5 = hashlib.md5(file_path.read_bytes()).hexdigest() | |
| if md5 != expected_md5: | |
| return False, f"MD5 mismatch: {md5} != {expected_md5}" | |
| return True, "" | |
| def verify_sha256(file_path, expected_sha256): | |
| sha256 = hashlib.sha256(file_path.read_bytes()).hexdigest() | |
| if sha256 != expected_sha256: | |
| return False, f"SHA256 mismatch: {sha256} != {expected_sha256}" | |
| return True, "" | |
| def extract_file(file_path, destination=None): | |
| """ | |
| Extract a compressed file based on its extension. | |
| If destination is not specified, it will be extracted to its parent directory. | |
| """ | |
| if destination is None: | |
| destination = Path(file_path).parent | |
| logging.info(f"Extracting to {destination}") | |
| if file_path.endswith('.zip'): | |
| with zipfile.ZipFile(file_path, 'r') as zip_ref: | |
| zip_ref.extractall(destination) | |
| elif file_path.endswith('.tar.gz'): | |
| with tarfile.open(file_path, 'r:gz') as tar_ref: | |
| tar_ref.extractall(destination) | |
| elif file_path.endswith('.tar.bz2'): | |
| with tarfile.open(file_path, 'r:bz2') as tar_ref: | |
| tar_ref.extractall(destination) | |
| elif file_path.endswith('.7z'): | |
| with SevenZipFile(file_path, mode='r') as z: | |
| z.extractall(destination) | |
| else: | |
| logging.error(f"Unsupported compression format for file {file_path}") | |
| def download_file(urls, target_path, extract_destination=None, expected_md5=None, expected_sha256=None): | |
| if os.path.exists(target_path): | |
| if expected_md5 is not None: | |
| success, message = verify_md5(Path(target_path), expected_md5) | |
| if not success: | |
| os.remove(target_path) | |
| return False, message | |
| if expected_sha256 is not None: | |
| success, message = verify_sha256(Path(target_path), expected_sha256) | |
| if not success: | |
| os.remove(target_path) | |
| return False, message | |
| # If it's a compressed file and the target_path already exists, skip the download | |
| if extract_destination and target_path.endswith(('.zip', '.tar.gz', '.tar.bz2', '.7z')): | |
| extract_file(target_path, extract_destination) | |
| os.remove(target_path) | |
| return True, "File already exists and verified successfully!" | |
| is_download = False | |
| for url in urls: | |
| try: | |
| _download_file(url, target_path) | |
| is_download = True | |
| break | |
| except Exception as error: | |
| logger.error(f"downloading from URL {url}: {error}") | |
| if not is_download: | |
| return False, "Error downloading from all provided URLs." | |
| if expected_md5 is not None: | |
| success, message = verify_md5(Path(target_path), expected_md5) | |
| if not success: | |
| os.remove(target_path) | |
| return False, message | |
| if expected_sha256 is not None: | |
| success, message = verify_sha256(Path(target_path), expected_sha256) | |
| if not success: | |
| os.remove(target_path) | |
| return False, message | |
| # If it's a compressed file, extract it | |
| if target_path.endswith(('.zip', '.tar.gz', '.tar.bz2', '.7z')): | |
| extract_file(target_path, extract_destination) | |
| os.remove(target_path) | |
| return True, "File downloaded, verified, and extracted successfully!" | |
| if __name__ == "__main__": | |
| URLS = [ | |
| "YOUR_PRIMARY_URL_HERE", | |
| "YOUR_FIRST_BACKUP_URL_HERE", | |
| # ... you can add more backup URLs as needed | |
| ] | |
| TARGET_PATH = "" | |
| EXPECTED_MD5 = "" | |
| EXTRACT_DESTINATION = "" | |
| success, message = download_file(URLS, TARGET_PATH, EXPECTED_MD5, EXTRACT_DESTINATION) | |
| print(message) | |