Files
ytts/app.py
2025-04-02 21:44:17 -07:00

745 lines
32 KiB
Python

import os
import uuid
import time
import json
import threading
import subprocess
import re
import requests
import shutil
from flask import Flask, render_template, request, jsonify, send_file
from flask_socketio import SocketIO
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
# Create necessary directories
os.makedirs('uploads', exist_ok=True)
os.makedirs('transcripts', exist_ok=True)
os.makedirs('thumbnails', exist_ok=True)
# Path to cookies file
COOKIES_FILE = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'cookies.txt')
# Verify cookies file exists
if os.path.exists(COOKIES_FILE):
print(f"Found cookies file at {COOKIES_FILE}")
# We'll use both the cookies file and browser cookies
USE_COOKIES_FILE = True
else:
print(f"Warning: Cookies file not found at {COOKIES_FILE}")
USE_COOKIES_FILE = False
# Don't use browser cookies directly as they're not accessible
USE_BROWSER_COOKIES = False
BROWSER_NAME = None
# Global variables to track jobs
active_jobs = {}
job_queue = []
def get_yt_dlp_base_args():
"""Get the base arguments for yt-dlp with cookies if available"""
args = [
'--no-warnings',
'--user-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'--sleep-requests', '1',
'--min-sleep-interval', '0.5',
'--max-sleep-interval', '2',
'--geo-bypass',
'--concurrent-fragments', '3',
'--force-ipv4',
'--no-check-certificates',
'--extractor-retries', '10',
'--fragment-retries', '10',
'--retry-sleep', '3',
'--abort-on-unavailable-fragment', # Skip videos with unavailable fragments
'--prefer-insecure', # Try insecure connections when secure ones fail
'--no-playlist', # Always download single video, even if URL is a playlist
]
# Add cookies from file if available
if USE_COOKIES_FILE:
args.extend(['--cookies', COOKIES_FILE])
# Add cookies from browser if enabled
if USE_BROWSER_COOKIES:
args.extend(['--cookies-from-browser', BROWSER_NAME])
# Add referer and other headers to appear more like a browser
args.extend([
'--referer', 'https://www.youtube.com/',
'--add-header', 'Accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'--add-header', 'Accept-Language:en-US,en;q=0.5',
'--add-header', 'Sec-Ch-Ua:"Google Chrome";v="123", "Not:A-Brand";v="99"',
'--add-header', 'Sec-Ch-Ua-Mobile:?0',
'--add-header', 'Sec-Ch-Ua-Platform:"Windows"'
])
return args
@app.route('/')
def index():
return render_template('index.html')
def get_video_info(youtube_url):
"""Get video title, thumbnail, and other metadata"""
try:
print(f"Fetching video info from {youtube_url}")
cmd = ['yt-dlp', '--skip-download']
cmd.extend(['--print', '%(title)s', '--print', '%(thumbnail)s', '--print', '%(duration)s'])
cmd.extend(get_yt_dlp_base_args())
cmd.append(youtube_url)
# Set a timeout to prevent hanging
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
# Print the command output for debugging
print(f"yt-dlp stdout: {result.stdout}")
print(f"yt-dlp stderr: {result.stderr}")
# Process the output
if result.returncode == 0 and result.stdout:
output = result.stdout.strip().split('\n')
if len(output) >= 3:
return {
'title': output[0],
'thumbnail': output[1],
'duration': output[2]
}
elif len(output) == 2:
return {
'title': output[0],
'thumbnail': output[1],
'duration': '0'
}
elif len(output) == 1:
return {
'title': output[0],
'thumbnail': '',
'duration': '0'
}
# If we didn't get the expected output, try a different approach
# Just get the title at minimum
print("Trying fallback method to get video info")
cmd = ['yt-dlp', '--skip-download', '--get-title']
cmd.extend(get_yt_dlp_base_args())
cmd.append(youtube_url)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0 and result.stdout:
title = result.stdout.strip()
return {
'title': title,
'thumbnail': '',
'duration': '0'
}
return None
except subprocess.TimeoutExpired:
print(f"Timeout getting video info for {youtube_url}")
return {
'title': 'YouTube Video (Timeout)',
'thumbnail': '',
'duration': '0'
}
except Exception as e:
print(f"Error getting video info: {e}")
return None
def extract_playlist_videos(playlist_url):
"""Extract individual video URLs from a playlist"""
try:
cmd = ['yt-dlp', '--flat-playlist', '--print', 'https://www.youtube.com/watch?v=%(id)s']
cmd.extend(get_yt_dlp_base_args())
cmd.append(playlist_url)
result = subprocess.run(cmd, capture_output=True, text=True)
video_urls = result.stdout.strip().split('\n')
return [url for url in video_urls if url.strip()]
except Exception as e:
print(f"Error extracting playlist: {e}")
return []
def save_thumbnail(thumbnail_url, job_id):
"""Download and save the video thumbnail"""
try:
thumbnail_path = os.path.join('thumbnails', f"{job_id}.jpg")
# Use requests to download the thumbnail
response = requests.get(thumbnail_url, stream=True, timeout=10)
if response.status_code == 200:
with open(thumbnail_path, 'wb') as f:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, f)
return thumbnail_path
else:
print(f"Failed to download thumbnail: {response.status_code}")
return None
except Exception as e:
print(f"Error saving thumbnail: {e}")
return None
@app.route('/api/transcribe', methods=['POST'])
def transcribe():
youtube_url = request.json.get('youtube_url')
if not youtube_url:
return jsonify({"error": "No YouTube URL provided"}), 400
# Check if this is a playlist
if 'playlist' in youtube_url or 'list=' in youtube_url:
# Extract individual video URLs from the playlist
video_urls = extract_playlist_videos(youtube_url)
if not video_urls:
return jsonify({"error": "Could not extract videos from playlist"}), 400
job_ids = []
for video_url in video_urls:
job_id = add_to_queue(video_url)
job_ids.append(job_id)
return jsonify({
"job_ids": job_ids,
"status": "queued",
"message": f"Added {len(job_ids)} videos to the queue"
})
else:
# Single video
job_id = add_to_queue(youtube_url)
return jsonify({"job_id": job_id, "status": "queued"})
def add_to_queue(youtube_url):
"""Add a video to the processing queue"""
# Generate a unique job ID
job_id = str(uuid.uuid4())
# Extract video ID and prepare alternative URL formats
video_id = None
# Extract video ID from various possible URL formats
if 'youtube.com' in youtube_url and 'watch?v=' in youtube_url:
video_id = youtube_url.split('watch?v=')[1].split('&')[0]
elif 'youtu.be/' in youtube_url:
video_id = youtube_url.split('youtu.be/')[1].split('?')[0]
elif 'youtube.com/embed/' in youtube_url:
video_id = youtube_url.split('youtube.com/embed/')[1].split('?')[0]
if video_id:
# Use different URL formats that might bypass restrictions
alt_urls = [
# Try YouTube music which sometimes has fewer restrictions
f"https://music.youtube.com/watch?v={video_id}",
# Try YouTube TV which might have a different rate limit
f"https://www.youtube.com/tv#/watch?v={video_id}",
# Try YouTube Kids which often has fewer restrictions
f"https://www.youtubekids.com/watch?v={video_id}",
# Try YouTube mobile site
f"https://m.youtube.com/watch?v={video_id}",
# Try YouTube embedded player format
f"https://www.youtube.com/embed/{video_id}",
# Try regular format with extra params to look more like a browser request
f"https://www.youtube.com/watch?v={video_id}&app=desktop&persist_app=1&noapp=1",
# Original YouTube URL as a last resort
f"https://www.youtube.com/watch?v={video_id}"
]
# We'll try each URL until one works
youtube_url = alt_urls[0]
# Store the alternatives for fallback
active_jobs[job_id] = {"alt_urls": alt_urls}
# Get video info
print(f"Getting video info for {youtube_url}")
video_info = get_video_info(youtube_url)
# Create basic job data
job_data = {
"job_id": job_id,
"status": "queued",
"youtube_url": youtube_url,
"title": "Unknown Title",
"thumbnail": "",
"duration": "0",
"progress": 0,
"message": "Waiting in queue...",
"position": len(job_queue) + 1
}
# Add video info if available
if video_info:
print(f"Video info found: {video_info}")
job_data["title"] = video_info.get('title', 'Unknown Title')
job_data["duration"] = video_info.get('duration', '0')
# Try to save thumbnail if available
if video_info.get('thumbnail'):
try:
thumbnail_url = video_info.get('thumbnail')
print(f"Saving thumbnail from {thumbnail_url}")
local_thumbnail = save_thumbnail(thumbnail_url, job_id)
if local_thumbnail:
job_data["thumbnail"] = local_thumbnail
except Exception as e:
print(f"Error saving thumbnail: {e}")
# Add to global tracking
active_jobs[job_id] = job_data
job_queue.append(job_id)
# Broadcast queue update to all clients
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
# If this is the first job, start processing
if len(job_queue) == 1:
# Start processing in a background thread
print(f"Starting processing for job {job_id}")
thread = threading.Thread(target=process_next_in_queue)
thread.daemon = True
thread.start()
return job_id
def process_next_in_queue():
"""Process the next video in the queue"""
if job_queue:
next_job_id = job_queue[0]
job_data = active_jobs[next_job_id]
youtube_url = job_data["youtube_url"]
# Update status
job_data["status"] = "processing"
job_data["message"] = "Starting processing..."
socketio.emit('status_update', job_data, room=next_job_id)
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
# Start processing
thread = threading.Thread(target=process_transcription, args=(youtube_url, next_job_id))
thread.daemon = True
thread.start()
def process_transcription(youtube_url, job_id):
"""Process the YouTube URL and transcribe it with Whisper"""
print(f"Process transcription started for job {job_id}, URL: {youtube_url}")
# Update job status
active_jobs[job_id]["status"] = "downloading"
active_jobs[job_id]["progress"] = 0
active_jobs[job_id]["message"] = "Starting download..."
socketio.emit('status_update', active_jobs[job_id], room=job_id)
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
# Create unique filenames for this job
audio_file = os.path.join('uploads', f"{job_id}.mp3")
try:
# Using subprocess to call yt-dlp for downloading
cmd = [
'yt-dlp',
'-f', '140/bestaudio', # Format 140 is often more reliable for YouTube audio
'--extract-audio',
'--audio-format', 'mp3',
'--audio-quality', '0',
'--compat-options', 'no-youtube-unavailable-videos',
'--ignore-errors',
'--no-playlist',
'-o', audio_file.replace('.mp3', '.%(ext)s')
]
cmd.extend(get_yt_dlp_base_args())
cmd.append(youtube_url)
print(f"Running download command: {' '.join(cmd)}")
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
# Track if we're seeing progress to help debug stalled downloads
last_progress_time = time.time()
progress_seen = False
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
print(f"Download process completed with return code: {process.returncode}")
break
if output:
line = output.strip()
print(f"Download output: {line}")
# Parse yt-dlp progress output
if '[download]' in line and '%' in line:
progress_seen = True
last_progress_time = time.time()
try:
percent = float(line.split('%')[0].split()[-1])
active_jobs[job_id]["progress"] = percent
active_jobs[job_id]["message"] = f"Downloading: {percent:.1f}%"
socketio.emit('status_update', active_jobs[job_id], room=job_id)
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
except Exception as e:
print(f"Error parsing progress: {e}")
# Check if download is stalled
if progress_seen and time.time() - last_progress_time > 60:
print("Download appears to be stalled, terminating")
process.terminate()
active_jobs[job_id]["status"] = "failed"
active_jobs[job_id]["message"] = "Download stalled or timed out"
socketio.emit('status_update', active_jobs[job_id], room=job_id)
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
# Remove from queue and process next
if job_id in job_queue:
job_queue.remove(job_id)
if job_queue:
process_next_in_queue()
return
# Check if audio file exists
expected_audio_file = audio_file.replace('.mp3', '') + '.mp3'
if not os.path.exists(expected_audio_file):
print(f"Audio file not found at expected path: {expected_audio_file}")
# Try to find it with a different extension
for ext in ['.webm', '.m4a', '.opus']:
alt_file = audio_file.replace('.mp3', '') + ext
if os.path.exists(alt_file):
print(f"Found audio with different extension: {alt_file}")
# Convert to mp3
try:
convert_cmd = ['ffmpeg', '-i', alt_file, '-vn', '-ab', '192k', expected_audio_file, '-y']
subprocess.run(convert_cmd, check=True, capture_output=True)
print(f"Converted {alt_file} to {expected_audio_file}")
break
except Exception as e:
print(f"Error converting audio: {e}")
# Check again if we have an mp3 file
if not os.path.exists(expected_audio_file):
# Try alternative URLs if available
alt_urls = active_jobs[job_id].get("alt_urls", [])
if alt_urls and len(alt_urls) > 1:
# Remove the first URL that just failed
alt_urls.pop(0)
# Try each alternative URL
for alt_url in alt_urls:
try:
print(f"Trying alternative URL: {alt_url}")
active_jobs[job_id]["message"] = f"Trying alternate source..."
socketio.emit('status_update', active_jobs[job_id], room=job_id)
# Construct an embedded player URL directly if we have a video ID
embed_url = alt_url
if 'youtube.com' in alt_url and 'watch?v=' in alt_url:
video_id = alt_url.split('watch?v=')[1].split('&')[0]
embed_url = f"https://www.youtube.com/embed/{video_id}?autoplay=1"
alt_cmd = [
'yt-dlp',
'-f', '140/bestaudio/best', # Format 140 is more reliable for YouTube
'--extract-audio',
'--audio-format', 'mp3',
'--audio-quality', '0',
'--compat-options', 'no-youtube-unavailable-videos',
'--ignore-errors',
'-o', audio_file.replace('.mp3', '.%(ext)s'),
# Use more targeted options for this specific attempt
'--force-ipv4',
'--geo-bypass-country', 'US',
# Stronger browser emulation - pretend to be Chrome
'--user-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
'--add-header', 'Accept:*/*',
'--add-header', 'Accept-Encoding:gzip, deflate, br',
'--add-header', 'Connection:keep-alive',
'--add-header', 'Sec-Fetch-Dest:empty',
'--add-header', 'Sec-Fetch-Mode:cors',
'--add-header', 'Sec-Fetch-Site:same-site',
'--add-header', 'Referer:https://www.youtube.com/'
]
# Add cookies options specifically for this attempt
if USE_COOKIES_FILE:
alt_cmd.extend(['--cookies', COOKIES_FILE])
# Try browser cookies specifically for this attempt
if USE_BROWSER_COOKIES:
alt_cmd.extend(['--cookies-from-browser', BROWSER_NAME])
# Add the URL - try the embed URL first
alt_cmd.append(embed_url)
print(f"Running alternative download: {' '.join(alt_cmd)}")
result = subprocess.run(alt_cmd, capture_output=True, text=True, timeout=240)
print(f"Alternative download result: {result.returncode}")
if result.returncode == 0 and os.path.exists(expected_audio_file):
print("Alternative download succeeded")
break
else:
print(f"Alternative download stderr: {result.stderr}")
except Exception as e:
print(f"Alternative download failed: {e}")
# If still no file, try one final method with specific format selection
if not os.path.exists(expected_audio_file):
try:
print("Attempting final download method...")
active_jobs[job_id]["message"] = "Trying final download method..."
socketio.emit('status_update', active_jobs[job_id], room=job_id)
# Try to extract video ID for direct mp4 URL approach
video_id = None
if 'youtube.com' in youtube_url and 'watch?v=' in youtube_url:
video_id = youtube_url.split('watch?v=')[1].split('&')[0]
elif 'youtu.be/' in youtube_url:
video_id = youtube_url.split('youtu.be/')[1].split('?')[0]
# Try with very specific format selection and options
last_cmd = [
'yt-dlp',
'--verbose', # Add verbose output to help debug
'--format', '140/m4a/mp3/bestaudio', # Try to get m4a audio specifically
'--extract-audio',
'--audio-format', 'mp3',
'--audio-quality', '0',
'-o', audio_file.replace('.mp3', '.%(ext)s'),
'--no-check-certificate',
'--ignore-config', # Ignore any config files
'--no-playlist',
'--referer', 'https://www.youtube.com/',
'--add-header', 'Origin:https://www.youtube.com',
'--geo-bypass-country', 'US,GB,JP,DE,FR' # Try multiple countries
]
# If we have a video ID, try with the embed format which might bypass restrictions
if video_id:
last_cmd.append(f"https://www.youtube.com/embed/{video_id}?autoplay=1")
else:
last_cmd.append(youtube_url)
# Add all available cookie options for the final attempt
if USE_COOKIES_FILE:
last_cmd.extend(['--cookies', COOKIES_FILE])
if USE_BROWSER_COOKIES:
last_cmd.extend(['--cookies-from-browser', BROWSER_NAME])
subprocess.run(last_cmd, check=False, capture_output=True, timeout=240)
if os.path.exists(expected_audio_file):
print("Final download method succeeded")
else:
# If all else fails, create a placeholder file with an error message
# so at least the queue can continue
print("All download methods failed, creating placeholder file")
with open(os.path.join('transcripts', f"{job_id}.txt"), 'w') as f:
f.write("ERROR: Could not download this video due to YouTube restrictions.\n")
with open(os.path.join('transcripts', f"{job_id}.srt"), 'w') as f:
f.write("1\n00:00:00,000 --> 00:00:10,000\nERROR: Could not download this video due to YouTube restrictions.\n")
# We'll pretend it succeeded so the queue can continue
active_jobs[job_id]["status"] = "completed"
active_jobs[job_id]["progress"] = 100
active_jobs[job_id]["message"] = "Could not download due to YouTube restrictions"
active_jobs[job_id]["preview"] = "ERROR: Could not download due to YouTube restrictions"
active_jobs[job_id]["txt_file"] = f"/api/download/{job_id}/txt"
active_jobs[job_id]["srt_file"] = f"/api/download/{job_id}/srt"
socketio.emit('status_update', active_jobs[job_id], room=job_id)
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
# Remove from queue and process next
if job_id in job_queue:
job_queue.remove(job_id)
# Process next item in queue
if job_queue:
process_next_in_queue()
# Skip further processing
return
except Exception as e:
print(f"Final download failed: {e}")
raise Exception("Failed to download audio file with all methods")
# Now transcribe using Whisper
print(f"Starting Whisper transcription for {job_id}")
active_jobs[job_id]["status"] = "transcribing"
active_jobs[job_id]["progress"] = 0
active_jobs[job_id]["message"] = "Starting transcription with Whisper..."
socketio.emit('status_update', active_jobs[job_id], room=job_id)
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
# Define output files
txt_output = os.path.join('transcripts', f"{job_id}.txt")
srt_output = os.path.join('transcripts', f"{job_id}.srt")
# Run Whisper for transcription
whisper_cmd = [
'whisper',
expected_audio_file,
'--model', 'medium',
'--output_dir', 'transcripts',
'--output_format', 'all'
]
print(f"Running Whisper command: {' '.join(whisper_cmd)}")
process = subprocess.Popen(whisper_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Monitor Whisper progress
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
print(f"Whisper process completed with return code: {process.returncode}")
break
if output:
line = output.strip()
print(f"Whisper output: {line}")
active_jobs[job_id]["message"] = line
# Try to parse progress from whisper output
if "%" in line:
try:
match = re.search(r'(\d+)%', line)
if match:
percent = float(match.group(1))
active_jobs[job_id]["progress"] = percent
except Exception as e:
print(f"Error parsing whisper progress: {e}")
socketio.emit('status_update', active_jobs[job_id], room=job_id)
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
# Check for any error output
stderr_output = process.stderr.read()
if stderr_output:
print(f"Whisper stderr: {stderr_output}")
# Check if transcription was successful
if os.path.exists(txt_output):
print(f"Transcription successful for {job_id}")
active_jobs[job_id]["status"] = "completed"
active_jobs[job_id]["progress"] = 100
active_jobs[job_id]["message"] = "Transcription completed successfully!"
# Read the first few lines of transcript for preview
with open(txt_output, 'r', encoding='utf-8') as f:
preview = f.read(500)
if os.path.getsize(txt_output) > 500:
preview += "..."
active_jobs[job_id]["preview"] = preview
active_jobs[job_id]["txt_file"] = f"/api/download/{job_id}/txt"
active_jobs[job_id]["srt_file"] = f"/api/download/{job_id}/srt"
else:
print(f"Transcription failed for {job_id}")
active_jobs[job_id]["status"] = "failed"
active_jobs[job_id]["message"] = f"Transcription failed. {stderr_output}"
socketio.emit('status_update', active_jobs[job_id], room=job_id)
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
# Clean up - remove audio file to save space
try:
os.remove(expected_audio_file)
print(f"Removed audio file {expected_audio_file}")
except Exception as e:
print(f"Error removing audio file: {e}")
# Remove from queue and process next item
if job_id in job_queue:
job_queue.remove(job_id)
# Process next item in queue
if job_queue:
process_next_in_queue()
except Exception as e:
print(f"Error in process_transcription: {e}")
active_jobs[job_id]["status"] = "failed"
active_jobs[job_id]["message"] = str(e)
socketio.emit('status_update', active_jobs[job_id], room=job_id)
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
# Remove from queue and process next item
if job_id in job_queue:
job_queue.remove(job_id)
# Process next item in queue
if job_queue:
process_next_in_queue()
@app.route('/api/job/<job_id>', methods=['GET'])
def get_job_status(job_id):
"""Get the status of a specific job"""
if job_id in active_jobs:
return jsonify(active_jobs[job_id])
return jsonify({"error": "Job not found"}), 404
@app.route('/api/queue', methods=['GET'])
def get_queue():
"""Get the current processing queue"""
queue_data = [active_jobs[job_id] for job_id in job_queue]
return jsonify({"queue": queue_data})
@app.route('/api/thumbnail/<job_id>', methods=['GET'])
def get_thumbnail(job_id):
"""Serve a video thumbnail"""
if job_id in active_jobs:
thumbnail_path = os.path.join('thumbnails', f"{job_id}.jpg")
if os.path.exists(thumbnail_path):
return send_file(thumbnail_path, mimetype='image/jpeg')
# If thumbnail doesn't exist, return a default image or 404
return jsonify({"error": "Thumbnail not found"}), 404
@app.route('/api/cancel/<job_id>', methods=['POST'])
def cancel_job(job_id):
"""Cancel a pending job"""
if job_id in job_queue:
job_queue.remove(job_id)
active_jobs[job_id]["status"] = "cancelled"
active_jobs[job_id]["message"] = "Job cancelled by user"
# Update all clients
socketio.emit('status_update', active_jobs[job_id], room=job_id)
socketio.emit('queue_update', {"queue": [active_jobs[jid] for jid in job_queue]})
return jsonify({"status": "success", "message": "Job cancelled successfully"})
return jsonify({"error": "Job not found or already processing"}), 404
@app.route('/api/download/<job_id>/<format>', methods=['GET'])
def download_transcript(job_id, format):
"""Download the transcript file"""
if format not in ['txt', 'srt', 'vtt']:
return jsonify({"error": "Invalid format"}), 400
file_path = os.path.join('transcripts', f"{job_id}.{format}")
if not os.path.exists(file_path):
return jsonify({"error": "File not found"}), 404
return send_file(
file_path,
as_attachment=True,
download_name=f"transcript.{format}",
mimetype='text/plain' if format == 'txt' else 'text/srt'
)
@socketio.on('connect')
def handle_connect():
print("Client connected")
@socketio.on('join')
def on_join(data):
"""Client joins a room with job_id to receive updates"""
room = data.get('job_id')
if room:
print(f"Client joined room: {room}")
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', debug=True)