290 lines
11 KiB
Python
Executable File
290 lines
11 KiB
Python
Executable File
import requests
|
|
from requests.exceptions import RequestException
|
|
import time
|
|
from typing import Optional
|
|
from tqdm import tqdm
|
|
|
|
def download_file(url: str, filename: Optional[str] = None,
|
|
max_retries: int = 3, initial_timeout: int = 30,
|
|
backoff_factor: float = 1.0,
|
|
session: Optional[requests.Session] = None,
|
|
headers: Optional[dict] = None) -> Optional[bytes]:
|
|
"""
|
|
Download a file from URL and return its content as binary data.
|
|
|
|
Args:
|
|
url: The URL of the file to download
|
|
filename: Optional filename for display in progress bar (default: extracted from URL)
|
|
max_retries: Maximum number of retry attempts (default: 3)
|
|
initial_timeout: Initial timeout in seconds for the request (default: 30)
|
|
backoff_factor: Factor to increase timeout between retries (default: 1.0)
|
|
session: Optional requests.Session object to reuse (default: None, creates new session)
|
|
headers: Optional dictionary of headers to include in the request (default: None)
|
|
|
|
Returns:
|
|
File content as bytes if successful, None if all retries fail
|
|
|
|
Raises:
|
|
ValueError: If URL is invalid
|
|
"""
|
|
if not url or not isinstance(url, str):
|
|
raise ValueError("URL must be a non-empty string")
|
|
|
|
# Extract filename from URL if not provided
|
|
if not filename:
|
|
filename = url.split('/')[-1] or "downloaded_file"
|
|
|
|
last_error = None
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# Configure session with timeout
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
# Set default headers if none provided
|
|
default_headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
|
}
|
|
|
|
# Merge headers (user headers override defaults)
|
|
if headers:
|
|
default_headers.update(headers)
|
|
|
|
session.headers.update(default_headers)
|
|
|
|
# Make the request with streaming enabled
|
|
response = session.get(url, timeout=initial_timeout, stream=True)
|
|
|
|
# Check for HTTP errors
|
|
response.raise_for_status()
|
|
|
|
# Get total file size
|
|
total_size = int(response.headers.get('content-length', 0))
|
|
|
|
# Download file with progress bar
|
|
print(f"\nDownloading: {filename}")
|
|
if total_size > 0:
|
|
print(f"Total size: {format_size(total_size)}")
|
|
|
|
downloaded_data = bytearray()
|
|
|
|
# Use tqdm for progress bar
|
|
progress_bar = tqdm(
|
|
total=total_size,
|
|
unit='B',
|
|
unit_scale=True,
|
|
unit_divisor=1024,
|
|
desc=filename[:50] if len(filename) > 50 else filename,
|
|
ascii=True
|
|
)
|
|
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk: # Filter out keep-alive chunks
|
|
downloaded_data.extend(chunk)
|
|
progress_bar.update(len(chunk))
|
|
|
|
progress_bar.close()
|
|
|
|
print(f"\n✓ Downloaded {len(downloaded_data)} bytes ({format_size(len(downloaded_data))})")
|
|
|
|
return bytes(downloaded_data)
|
|
|
|
except requests.exceptions.Timeout as e:
|
|
last_error = e
|
|
print(f"\nAttempt {attempt + 1}/{max_retries}: Timeout error. Retrying...")
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
last_error = e
|
|
print(f"\nAttempt {attempt + 1}/{max_retries}: Connection error. Retrying...")
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
last_error = e
|
|
print(f"\nAttempt {attempt + 1}/{max_retries}: HTTP error {response.status_code}. Retrying...")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
last_error = e
|
|
print(f"\nAttempt {attempt + 1}/{max_retries}: Request error: {str(e)}. Retrying...")
|
|
|
|
except Exception as e:
|
|
last_error = e
|
|
print(f"\nAttempt {attempt + 1}/{max_retries}: Unexpected error: {str(e)}. Retrying...")
|
|
|
|
# Calculate backoff delay
|
|
if attempt < max_retries - 1:
|
|
backoff_time = backoff_factor * (2 ** attempt)
|
|
print(f"Waiting {backoff_time} seconds before retry...")
|
|
time.sleep(backoff_time)
|
|
|
|
# All retries failed
|
|
print(f"\nFailed to download file after {max_retries} attempts.")
|
|
print(f"Last error: {str(last_error)}")
|
|
return None
|
|
|
|
|
|
def format_size(size_bytes: int) -> str:
|
|
"""
|
|
Format file size in human-readable format.
|
|
|
|
Args:
|
|
size_bytes: Size in bytes
|
|
|
|
Returns:
|
|
Formatted size string (e.g., "1.5 MB")
|
|
"""
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
|
if size_bytes < 1024.0:
|
|
return f"{size_bytes:.2f} {unit}"
|
|
size_bytes /= 1024.0
|
|
return f"{size_bytes:.2f} PB"
|
|
|
|
|
|
def fetch_web_page(url: str, max_retries: int = 3, initial_timeout: int = 10,
|
|
backoff_factor: float = 1.0, session: Optional[requests.Session] = None,
|
|
headers: Optional[dict] = None) -> Optional[requests.Response]:
|
|
"""
|
|
Fetch a web page with error handling and retry logic.
|
|
|
|
Args:
|
|
url: The URL of the web page to fetch
|
|
max_retries: Maximum number of retry attempts (default: 3)
|
|
initial_timeout: Initial timeout in seconds for the request (default: 10)
|
|
backoff_factor: Factor to increase timeout between retries (default: 1.0)
|
|
session: Optional requests.Session object to reuse (default: None, creates new session)
|
|
headers: Optional dictionary of headers to include in the request (default: None)
|
|
|
|
Returns:
|
|
requests.Response object if successful, None if all retries fail
|
|
|
|
Raises:
|
|
ValueError: If URL is invalid
|
|
"""
|
|
if not url or not isinstance(url, str):
|
|
raise ValueError("URL must be a non-empty string")
|
|
|
|
last_error = None
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# Configure session with timeout
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
# Set default headers if none provided
|
|
default_headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
|
}
|
|
|
|
# Merge headers (user headers override defaults)
|
|
if headers:
|
|
default_headers.update(headers)
|
|
|
|
session.headers.update(default_headers)
|
|
|
|
# Make the request with timeout
|
|
response = session.get(url, timeout=initial_timeout)
|
|
|
|
# Check for HTTP errors
|
|
response.raise_for_status()
|
|
|
|
# Return successful response
|
|
return response
|
|
|
|
except requests.exceptions.Timeout as e:
|
|
last_error = e
|
|
print(f"Attempt {attempt + 1}/{max_retries}: Timeout error. Retrying...")
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
last_error = e
|
|
print(f"Attempt {attempt + 1}/{max_retries}: Connection error. Retrying...")
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
last_error = e
|
|
print(f"Attempt {attempt + 1}/{max_retries}: HTTP error {response.status_code}. Retrying...")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
last_error = e
|
|
print(f"Attempt {attempt + 1}/{max_retries}: Request error: {str(e)}. Retrying...")
|
|
|
|
except Exception as e:
|
|
last_error = e
|
|
print(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {str(e)}. Retrying...")
|
|
|
|
# Calculate backoff delay
|
|
if attempt < max_retries - 1:
|
|
backoff_time = backoff_factor * (2 ** attempt)
|
|
print(f"Waiting {backoff_time} seconds before retry...")
|
|
time.sleep(backoff_time)
|
|
|
|
# All retries failed
|
|
print(f"Failed to fetch page after {max_retries} attempts.")
|
|
print(f"Last error: {str(last_error)}")
|
|
return None
|
|
|
|
|
|
def fetch_web_page_with_content(url: str, max_retries: int = 3,
|
|
initial_timeout: int = 10,
|
|
backoff_factor: float = 1.0,
|
|
session: Optional[requests.Session] = None,
|
|
headers: Optional[dict] = None) -> Optional[str]:
|
|
"""
|
|
Fetch a web page and return its content as a string.
|
|
|
|
Args:
|
|
url: The URL of the web page to fetch
|
|
max_retries: Maximum number of retry attempts (default: 3)
|
|
initial_timeout: Initial timeout in seconds for the request (default: 10)
|
|
backoff_factor: Factor to increase timeout between retries (default: 1.0)
|
|
session: Optional requests.Session object to reuse (default: None, creates new session)
|
|
headers: Optional dictionary of headers to include in the request (default: None)
|
|
|
|
Returns:
|
|
Page content as string if successful, None if all retries fail
|
|
"""
|
|
response = fetch_web_page(url, max_retries, initial_timeout, backoff_factor, session, headers)
|
|
|
|
if response is not None:
|
|
return response.text
|
|
return None
|
|
|
|
|
|
# Example usage
|
|
if __name__ == "__main__":
|
|
# Example URLs
|
|
test_urls = [
|
|
"https://httpbin.org/status/200", # Success
|
|
"https://httpbin.org/status/404", # Not found
|
|
"https://httpbin.org/delay/5", # Slow response
|
|
"https://httpbin.org/status/500", # Server error
|
|
]
|
|
|
|
# Custom headers example
|
|
custom_headers = {
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.5',
|
|
'Accept-Encoding': 'gzip, deflate',
|
|
}
|
|
|
|
for url in test_urls:
|
|
print(f"\nFetching: {url}")
|
|
print("=" * 60)
|
|
content = fetch_web_page_with_content(url, headers=custom_headers)
|
|
|
|
if content:
|
|
print(f"Success! Fetched {len(content)} characters")
|
|
else:
|
|
print("Failed to fetch page")
|
|
|
|
# Example: Download a file
|
|
print("\n" + "=" * 60)
|
|
print("DOWNLOADING A FILE")
|
|
print("=" * 60)
|
|
|
|
file_url = "https://httpbin.org/bytes/1024" # 1KB test file
|
|
file_data = download_file(file_url, filename="test_file.bin")
|
|
|
|
if file_data:
|
|
print(f"✓ Successfully downloaded {len(file_data)} bytes")
|
|
print(f"First 100 bytes: {file_data[:100]}")
|
|
else:
|
|
print("✗ Failed to download file") |