Module: Web API Details¶
Tasks¶
Background task management for long-running operations
CancelToken
¶
Source code in webapi/api/tasks.py
class CancelToken:
def __init__(self):
""" Thread-safe cancellation token. This basically just has a lock and a status
indicating whether the job should be cancelled or not. Jobs use this to figure
out whether the web cancel button has been clicked.
"""
self._cancelled = False
self._lock = threading.Lock()
def cancel(self):
with self._lock:
self._cancelled = True
def is_cancelled(self):
with self._lock:
return self._cancelled
__init__()
¶
Thread-safe cancellation token. This basically just has a lock and a status indicating whether the job should be cancelled or not. Jobs use this to figure out whether the web cancel button has been clicked.
Source code in webapi/api/tasks.py
def __init__(self):
""" Thread-safe cancellation token. This basically just has a lock and a status
indicating whether the job should be cancelled or not. Jobs use this to figure
out whether the web cancel button has been clicked.
"""
self._cancelled = False
self._lock = threading.Lock()
cancel_job(job_id, timeout=10.0)
¶
Cancel a running job
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
ID of the job to cancel |
required |
timeout
|
float
|
Maximum time to wait for cancellation (seconds) |
10.0
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict with success status and message |
Source code in webapi/api/tasks.py
def cancel_job(job_id: str, timeout: float = 10.0) -> Dict[str, Any]:
"""
Cancel a running job
Args:
job_id: ID of the job to cancel
timeout: Maximum time to wait for cancellation (seconds)
Returns:
Dict with success status and message
"""
job = jobs.get(job_id)
if not job:
return {'success': False, 'error': 'Job not found'}
if job['status'] != 'running':
return {'success': False, 'error': f"Job is not running (status: {job['status']})"}
# Signal cancellation
cancel_token = job.get('cancel_token')
if cancel_token:
cancel_token.cancel()
# Wait for thread to finish
thread = job.get('thread')
if thread and thread.is_alive():
thread.join(timeout=timeout)
if thread.is_alive():
# Thread didn't stop in time
return {
'success': False,
'error': f'Job did not stop within {timeout} seconds',
'partial_results': job.get('result')
}
# Get partial results from progress
partial_results = {}
if job.get('progress_obj'):
partial_results = job['progress_obj'].get_snapshot()
return {
'success': True,
'message': 'Job cancelled successfully',
'partial_results': partial_results
}
get_job_status(job_id)
¶
Get the status of a job
Source code in webapi/api/tasks.py
def get_job_status(job_id: str) -> Dict[str, Any]:
"""Get the status of a job"""
job = jobs.get(job_id)
if not job:
return {'status': 'not_found'}
result = {
'job_id': job_id,
'status': job['status'],
'started_at': job.get('started_at')
}
# Add progress if running or complete (for final status)
if job.get('progress_obj'):
result['progress'] = job['progress_obj'].get_snapshot()
# Add result if complete or cancelled
if job['status'] in ['complete', 'cancelled']:
result['result'] = job.get('result')
# Add error if failed
if job['status'] == 'error':
result['error'] = job.get('error')
return result
run_async(func, *args, progress_obj=FetchProgress(), **kwargs)
¶
Run a function asynchronously and return a job ID
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
Function to execute. |
required |
*args
|
list
|
Positional arguments for the function |
()
|
progress_obj
|
FetchProgress
|
Optional progress tracking object |
FetchProgress()
|
**kwargs
|
list
|
Keyword arguments for the function |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
Job ID string |
Source code in webapi/api/tasks.py
def run_async(func: Callable, *args,
progress_obj:FetchProgress=FetchProgress(),
**kwargs) -> str:
"""
Run a function asynchronously and return a job ID
Args:
func: Function to execute.
*args (list): Positional arguments for the function
progress_obj: Optional progress tracking object
**kwargs (list): Keyword arguments for the function
Returns:
Job ID string
"""
job_id = str(uuid4())
cancel_token = CancelToken()
jobs[job_id] = {
'status': 'running',
'progress_obj': progress_obj,
'cancel_token': cancel_token,
'thread': None,
'result': None,
'error': None,
'started_at': datetime.now().isoformat()
}
def worker():
""" The code that a
"""
try:
sig = inspect.signature(func)
params = sig.parameters
# Build kwargs based on what function accepts
call_kwargs = dict(kwargs)
if 'cancel_token' in params:
call_kwargs['cancel_token'] = cancel_token
if 'progress' in params and progress_obj is not None:
call_kwargs['progress'] = progress_obj
result = func(*args, **call_kwargs)
# Check if cancelled during execution
if cancel_token.is_cancelled():
jobs[job_id]['status'] = 'cancelled'
jobs[job_id]['result'] = result # Partial results
else:
jobs[job_id]['status'] = 'complete'
jobs[job_id]['result'] = result
jobs[job_id]['error'] = None
jobs[job_id]['started_at'] = jobs[job_id]['started_at']
except Exception as e:
jobs[job_id] = {
'status': 'error',
'progress_obj': progress_obj,
'cancel_token': cancel_token,
'thread': None,
'result': None,
'error': str(e),
'started_at': jobs[job_id]['started_at']
}
thread = threading.Thread(target=worker, daemon=True)
jobs[job_id]['thread'] = thread
thread.start()
return job_id
Progress¶
Progress tracking for long-running fetch operations
FetchProgress
¶
Thread-safe progress tracker for fetch operations
Source code in webapi/api/progress.py
class FetchProgress:
"""Thread-safe progress tracker for fetch operations"""
def __init__(self, max_toots: int = 2000):
self.servers_todo = 0
self.servers_done = 0
self.servers_fail = 0
self.total_toots = 0
self.max_toots = max_toots
self.current_server: Optional[str] = None
self.server_status: Dict[str, dict] = {} # uri -> {'state': str, 'toots': int}
self._lock = threading.Lock()
def update(self, todo: int, done: int, fail: int, toots: int, current: Optional[str] = None):
"""Update progress metrics (thread-safe)"""
with self._lock:
self.servers_todo = todo
self.servers_done = done
self.servers_fail = fail
self.total_toots = toots
self.current_server = current
def start_server(self, uri: str):
"""Mark a server as started (thread-safe)"""
with self._lock:
self.server_status[uri] = {'state': 'running', 'toots': 0}
def set_pending(self, uri: str):
"""Mark a server as pending (thread-safe)"""
with self._lock:
self.server_status[uri] = {'state': 'pending', 'toots': 0}
def update_server_progress(self, uri: str, toot_count: int):
"""Update toot count for a running server (thread-safe)"""
with self._lock:
if uri in self.server_status:
self.server_status[uri]['toots'] = toot_count
def complete_server(self, uri: str, final_count: int, failed: bool = False):
"""Mark a server as completed or failed (thread-safe)"""
with self._lock:
self.server_status[uri] = {
'state': 'failed' if failed else 'done',
'toots': final_count if not failed else 0
}
def get_snapshot(self) -> dict:
"""Get current progress as a dict (thread-safe)"""
with self._lock:
total = self.servers_done + self.servers_fail + self.servers_todo
completed = self.servers_done + self.servers_fail
progress_percent = int((completed / total * 100)) if total > 0 else 0
return {
'servers_todo': self.servers_todo,
'servers_done': self.servers_done,
'servers_fail': self.servers_fail,
'total_servers': total,
'completed_servers': completed,
'progress_percent': progress_percent,
'total_toots': self.total_toots,
'max_toots': self.max_toots,
'current_server': self.current_server,
'server_status': dict(self.server_status) # Copy for safety
}
complete_server(uri, final_count, failed=False)
¶
Mark a server as completed or failed (thread-safe)
Source code in webapi/api/progress.py
def complete_server(self, uri: str, final_count: int, failed: bool = False):
"""Mark a server as completed or failed (thread-safe)"""
with self._lock:
self.server_status[uri] = {
'state': 'failed' if failed else 'done',
'toots': final_count if not failed else 0
}
get_snapshot()
¶
Get current progress as a dict (thread-safe)
Source code in webapi/api/progress.py
def get_snapshot(self) -> dict:
"""Get current progress as a dict (thread-safe)"""
with self._lock:
total = self.servers_done + self.servers_fail + self.servers_todo
completed = self.servers_done + self.servers_fail
progress_percent = int((completed / total * 100)) if total > 0 else 0
return {
'servers_todo': self.servers_todo,
'servers_done': self.servers_done,
'servers_fail': self.servers_fail,
'total_servers': total,
'completed_servers': completed,
'progress_percent': progress_percent,
'total_toots': self.total_toots,
'max_toots': self.max_toots,
'current_server': self.current_server,
'server_status': dict(self.server_status) # Copy for safety
}
set_pending(uri)
¶
Mark a server as pending (thread-safe)
Source code in webapi/api/progress.py
def set_pending(self, uri: str):
"""Mark a server as pending (thread-safe)"""
with self._lock:
self.server_status[uri] = {'state': 'pending', 'toots': 0}
start_server(uri)
¶
Mark a server as started (thread-safe)
Source code in webapi/api/progress.py
def start_server(self, uri: str):
"""Mark a server as started (thread-safe)"""
with self._lock:
self.server_status[uri] = {'state': 'running', 'toots': 0}
update(todo, done, fail, toots, current=None)
¶
Update progress metrics (thread-safe)
Source code in webapi/api/progress.py
def update(self, todo: int, done: int, fail: int, toots: int, current: Optional[str] = None):
"""Update progress metrics (thread-safe)"""
with self._lock:
self.servers_todo = todo
self.servers_done = done
self.servers_fail = fail
self.total_toots = toots
self.current_server = current
update_server_progress(uri, toot_count)
¶
Update toot count for a running server (thread-safe)
Source code in webapi/api/progress.py
def update_server_progress(self, uri: str, toot_count: int):
"""Update toot count for a running server (thread-safe)"""
with self._lock:
if uri in self.server_status:
self.server_status[uri]['toots'] = toot_count
Routes¶
API routes for Mastoscore Web API
cancel_fetch_job()
¶
Cancel a running fetch job
Source code in webapi/api/routes.py
@api_bp.route('/fetch/cancel', methods=['POST'])
def cancel_fetch_job():
"""Cancel a running fetch job"""
data = request.get_json()
job_id = data.get('job_id') if data else None
if not job_id:
return jsonify({'error': 'job_id parameter required'}), 400
try:
result = cancel_job(job_id, timeout=10.0)
if result['success']:
return jsonify({
'success': True,
'job_id': job_id,
'message': result['message'],
'partial_results': result.get('partial_results', {})
}), 200
else:
return jsonify({
'success': False,
'job_id': job_id,
'error': result['error'],
'partial_results': result.get('partial_results', {})
}), 400
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
check_blog_ready()
¶
Check if blog post has been generated
Source code in webapi/api/routes.py
@api_bp.route('/blog/ready', methods=['POST'])
def check_blog_ready():
"""Check if blog post has been generated"""
data = request.get_json()
config_file = data.get('config_file') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
blog_file = DATA_DIR / year / month / day / 'index.md'
if blog_file.exists():
return jsonify({
'ready': True,
'config_file': config_file,
'blog_file': str(blog_file.relative_to(PROJECT_ROOT)),
'message': 'Blog post is ready'
}), 200
else:
return jsonify({
'ready': False,
'config_file': config_file,
'message': 'Blog post not found'
}), 404
except Exception as e:
return jsonify({
'error': str(e)
}), 500
check_fetch_ready()
¶
Check if fetched data is ready for analysis
Source code in webapi/api/routes.py
@api_bp.route('/fetch/ready', methods=['GET'])
def check_fetch_ready():
"""Check if fetched data is ready for analysis"""
config_file = request.args.get('config_file')
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
# Read config to get hashtag and date
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
hashtag = config.get('mastoscore', 'hashtag')
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
# Check for data-HASHTAG-fetch.json in data/YYYY/MM/DD/
data_file = DATA_DIR / year / month / day / f'data-{hashtag}-fetch.json'
if data_file.exists():
return jsonify({
'ready': True,
'config_file': config_file,
'data_file': str(data_file.relative_to(PROJECT_ROOT)),
'message': 'Fetch data is ready'
}), 200
else:
return jsonify({
'ready': False,
'config_file': config_file,
'expected_file': str(data_file.relative_to(PROJECT_ROOT)),
'message': 'Fetch data not found'
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
check_poster_exists()
¶
Check if thumb.jpg exists for the configuration
Source code in webapi/api/routes.py
@api_bp.route('/blog/poster', methods=['POST'])
def check_poster_exists():
"""Check if thumb.jpg exists for the configuration"""
data = request.get_json()
config_file = data.get('config_file') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
thumb_path = DATA_DIR / year / month / day / 'thumb.jpg'
if thumb_path.exists():
return jsonify({
'exists': True,
'config_file': config_file,
'poster_path': str(thumb_path.relative_to(PROJECT_ROOT)),
'message': 'Poster exists'
}), 200
else:
return jsonify({
'exists': False,
'config_file': config_file,
'message': 'Poster not found'
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
create_config(name)
¶
Create new configuration file
Source code in webapi/api/routes.py
@api_bp.route('/config/<path:name>', methods=['POST'])
def create_config(name):
"""Create new configuration file"""
try:
config_path = PROJECT_ROOT / name
# Get config data from request
data = request.get_json()
if not data or 'config' not in data:
return jsonify({'error': 'No configuration data provided'}), 400
overwrite = data.get('overwrite', False)
if config_path.exists() and not overwrite:
return jsonify({'error': 'Configuration already exists'}), 409
# Create parent directories if needed
config_path.parent.mkdir(parents=True, exist_ok=True)
# Write config file
config = ConfigParser()
for section, values in data['config'].items():
config.add_section(section)
for key, value in values.items():
config.set(section, key, str(value))
with open(config_path, 'w') as f:
config.write(f)
return jsonify({
'success': True,
'path': str(config_path.relative_to(PROJECT_ROOT))
}), 201
except PermissionError:
return jsonify({'error': 'Permission denied'}), 403
except Exception as e:
return jsonify({'error': str(e)}), 500
delete_config(name)
¶
Delete configuration file
Source code in webapi/api/routes.py
@api_bp.route('/config/<path:name>', methods=['DELETE'])
def delete_config(name):
"""Delete configuration file"""
try:
config_path = PROJECT_ROOT / name
if not config_path.exists():
return jsonify({'error': 'Configuration not found'}), 404
if not config_path.is_file():
return jsonify({'error': 'Path is not a file'}), 400
if not os.access(config_path, os.W_OK):
return jsonify({'error': 'Permission denied'}), 403
config_path.unlink()
return jsonify({'success': True})
except PermissionError:
return jsonify({'error': 'Permission denied'}), 403
except Exception as e:
return jsonify({'error': str(e)}), 500
get_blog_content()
¶
Get blog post content
Source code in webapi/api/routes.py
@api_bp.route('/blog/content', methods=['POST'])
def get_blog_content():
"""Get blog post content"""
data = request.get_json()
config_file = data.get('config_file') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
blog_file = DATA_DIR / year / month / day / 'index.md'
if not blog_file.exists():
return jsonify({'error': 'Blog post not found'}), 404
content = blog_file.read_text(encoding='utf-8')
modified_time = blog_file.stat().st_mtime
return jsonify({
'content': content,
'modified': modified_time,
'config_file': config_file,
'blog_file': str(blog_file.relative_to(PROJECT_ROOT))
}), 200
except Exception as e:
return jsonify({
'error': str(e)
}), 500
get_blog_posts()
¶
Get posted URLs from posts.json
Source code in webapi/api/routes.py
@api_bp.route('/blog/posts', methods=['POST'])
def get_blog_posts():
"""Get posted URLs from posts.json"""
logger = logging.getLogger(__name__)
data = request.get_json()
config_file = data.get('config_file') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
hashtag = config.get('mastoscore', 'hashtag')
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
posts_file = DATA_DIR / year / month / day / f'data-{hashtag}-posts.json'
if not posts_file.exists():
return jsonify({
'exists': False,
'config_file': config_file,
'message': 'Posts not found'
}), 404
with open(posts_file, 'r') as f:
posts_data = json.load(f)
return jsonify({
'exists': True,
'config_file': config_file,
'posts': posts_data
}), 200
except Exception as e:
logger.error(f"Error getting poster image: {e}", exc_info=True)
return jsonify({'error': str(e)}), 500
get_config(name)
¶
Get specific configuration file
Source code in webapi/api/routes.py
@api_bp.route('/config/<path:name>', methods=['GET'])
def get_config(name):
"""Get specific configuration file"""
try:
config_path = PROJECT_ROOT / name
if not config_path.exists():
return jsonify({'error': 'Configuration not found'}), 404
if not config_path.is_file():
return jsonify({'error': 'Path is not a file'}), 400
if not os.access(config_path, os.R_OK):
return jsonify({'error': 'Permission denied'}), 403
# Read the config file
config = ConfigParser()
config.read(config_path)
# Convert to dict
config_dict = {}
for section in config.sections():
config_dict[section] = dict(config.items(section))
return jsonify({'config': config_dict})
except PermissionError:
return jsonify({'error': 'Permission denied'}), 403
except Exception as e:
return jsonify({'error': str(e)}), 500
get_graph_description()
¶
Get generated graph description text
Source code in webapi/api/routes.py
@api_bp.route('/graph/description', methods=['GET'])
def get_graph_description():
"""Get generated graph description text"""
config_file = request.args.get('config_file')
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
hashtag = config.get('mastoscore', 'hashtag')
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
filename = f'{hashtag}-{year}{month}{day}.txt'
desc_file = DATA_DIR / year / month / day / filename
if not desc_file.exists():
return jsonify({'error': 'Graph description not found'}), 404
with open(desc_file, 'r') as f:
description = f.read()
return jsonify({
'description': description,
'config_file': config_file
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
get_graph_image()
¶
Get generated graph image
Source code in webapi/api/routes.py
@api_bp.route('/graph/image', methods=['GET'])
def get_graph_image():
"""Get generated graph image"""
logger = logging.getLogger(__name__)
config_file = request.args.get('config_file')
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
hashtag = config.get('mastoscore', 'hashtag')
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
filename = f'{hashtag}-{year}{month}{day}.png'
graph_file = DATA_DIR / year / month / day / filename
if not graph_file.exists():
return jsonify({'error': 'Graph image not found'}), 404
return send_file(graph_file, mimetype='image/png')
except Exception as e:
logger.error(f"Error getting graph image: {e}", exc_info=True)
return jsonify({'error': str(e)}), 500
get_graph_status()
¶
Check if graph image and description are available
Source code in webapi/api/routes.py
@api_bp.route('/graph/status', methods=['GET'])
def get_graph_status():
"""Check if graph image and description are available"""
config_file = request.args.get('config_file')
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
hashtag = config.get('mastoscore', 'hashtag')
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
filename = f'{hashtag}-{year}{month}{day}.png'
graph_file = DATA_DIR / year / month / day / filename
desc_filename = f'{hashtag}-{year}{month}{day}.txt'
desc_file = DATA_DIR / year / month / day / desc_filename
return jsonify({
'image_ready': graph_file.exists(),
'description_ready': desc_file.exists()
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
get_job_status_route(job_id)
¶
Get status of a running job
Source code in webapi/api/routes.py
@api_bp.route('/jobs/<job_id>', methods=['GET'])
def get_job_status_route(job_id):
"""Get status of a running job"""
status = get_job_status(job_id)
return jsonify(status)
get_poster_image()
¶
Get poster image file
Source code in webapi/api/routes.py
@api_bp.route('/blog/poster/image', methods=['POST'])
def get_poster_image():
"""Get poster image file"""
logger = logging.getLogger(__name__)
data = request.get_json()
config_file = data.get('config_file') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
thumb_path = DATA_DIR / year / month / day / 'thumb.jpg'
if not thumb_path.exists():
return jsonify({'error': 'Poster image not found'}), 404
return send_file(thumb_path, mimetype='image/jpeg')
except Exception as e:
logger.error(f"Error getting poster image: {e}", exc_info=True)
return jsonify({'error': str(e)}), 500
get_results(config_name)
¶
Get analysis results for a configuration
Source code in webapi/api/routes.py
@api_bp.route('/results/<config_name>', methods=['GET'])
def get_results(config_name):
"""Get analysis results for a configuration"""
return jsonify({
'results': {
'preamble': 'Analysis results',
'num_toots': 'Found 100 toots',
'most_toots': 'User posted 10 toots',
'unique_ids': 50,
'top_n': 3,
'hashtag': 'example',
'generated': '2026-02-02T10:00:00Z'
}
})
get_wordcloud_description()
¶
Get generated wordcloud description text
Source code in webapi/api/routes.py
@api_bp.route('/wordcloud/description', methods=['GET'])
def get_wordcloud_description():
"""Get generated wordcloud description text"""
config_file = request.args.get('config_file')
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
hashtag = config.get('mastoscore', 'hashtag')
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
hashtag_fix = config.get('wordcloud', 'hashtag_fix', fallback='remove')
filename = f'wordcloud-{hashtag}-{year}{month}{day}-{hashtag_fix}.txt'
desc_file = DATA_DIR / year / month / day / filename
if not desc_file.exists():
return jsonify({'error': 'Wordcloud description not found'}), 404
with open(desc_file, 'r') as f:
description = f.read()
return jsonify({
'description': description,
'config_file': config_file
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
get_wordcloud_image()
¶
Get generated wordcloud image
Source code in webapi/api/routes.py
@api_bp.route('/wordcloud/image', methods=['GET'])
def get_wordcloud_image():
"""Get generated wordcloud image"""
logger = logging.getLogger(__name__)
config_file = request.args.get('config_file')
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
hashtag = config.get('mastoscore', 'hashtag')
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
hashtag_fix = config.get('wordcloud', 'hashtag_fix', fallback='remove')
filename = f'wordcloud-{hashtag}-{year}{month}{day}-{hashtag_fix}.png'
wordcloud_file = DATA_DIR / year / month / day / filename
if not wordcloud_file.exists():
return jsonify({'error': 'Wordcloud image not found'}), 404
return send_file(wordcloud_file, mimetype='image/png')
except Exception as e:
logger.error(f"Error getting wordcloud image: {e}", exc_info=True)
return jsonify({'error': str(e)}), 500
get_wordcloud_status()
¶
Check if wordcloud image and description are available
Source code in webapi/api/routes.py
@api_bp.route('/wordcloud/status', methods=['GET'])
def get_wordcloud_status():
"""Check if wordcloud image and description are available"""
config_file = request.args.get('config_file')
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
hashtag = config.get('mastoscore', 'hashtag')
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
hashtag_fix = config.get('wordcloud', 'hashtag_fix', fallback='remove')
filename = f'wordcloud-{hashtag}-{year}{month}{day}-{hashtag_fix}.png'
wordcloud_file = DATA_DIR / year / month / day / filename
desc_filename = f'wordcloud-{hashtag}-{year}{month}{day}-{hashtag_fix}.txt'
desc_file = DATA_DIR / year / month / day / desc_filename
return jsonify({
'image_ready': wordcloud_file.exists(),
'description_ready': desc_file.exists()
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
list_configs()
¶
List available configuration files
Source code in webapi/api/routes.py
@api_bp.route('/config/list', methods=['GET'])
def list_configs():
"""List available configuration files"""
try:
if not INI_DIR.exists():
return jsonify({
'configs': [],
'error': 'Configuration directory does not exist'
}), 404
if not os.access(INI_DIR, os.R_OK):
return jsonify({
'configs': [],
'error': 'Permission denied reading configuration directory'
}), 403
configs = []
# Recursively find all .ini files
for ini_file in INI_DIR.rglob('*.ini'):
try:
# Get relative path from ini directory
rel_path = ini_file.relative_to(PROJECT_ROOT)
# Try to read basic info from the config
config = ConfigParser()
config.read(ini_file)
hashtag = config.get('mastoscore', 'hashtag', fallback=None) if config.has_section('mastoscore') else None
configs.append({
'name': ini_file.stem,
'path': str(rel_path),
'hashtag': hashtag,
'modified': ini_file.stat().st_mtime
})
except PermissionError:
# Skip files we can't read
continue
except Exception:
# Skip malformed files
continue
return jsonify({'configs': configs})
except PermissionError:
return jsonify({
'configs': [],
'error': 'Permission denied accessing configuration directory'
}), 403
except Exception as e:
return jsonify({
'configs': [],
'error': str(e)
}), 500
reset_fetch_data()
¶
Delete all fetch data files for a configuration
Source code in webapi/api/routes.py
@api_bp.route('/fetch/reset', methods=['POST'])
def reset_fetch_data():
"""Delete all fetch data files for a configuration"""
data = request.get_json()
config_file = data.get('config_file') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = read_config(str(config_path))
if config is None:
return jsonify({'error': 'Failed to read config file'}), 400
result = reset_fetch(config)
if result['count'] == 0:
return jsonify({
'success': True,
'config_file': config_file,
'files_deleted': [],
'total_size': 0,
'message': 'No fetch data found to delete'
}), 200
# Convert absolute paths to relative and format response
files_info = [
{
'path': str(Path(f['path']).relative_to(PROJECT_ROOT)),
'size': f['size']
}
for f in result['files_deleted']
]
return jsonify({
'success': True,
'config_file': config_file,
'files_deleted': files_info,
'total_size': result['total_size'],
'message': f"Deleted {result['count']} files ({result['total_size']:,} bytes)"
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
run_analyse()
¶
Execute analyse operation
Source code in webapi/api/routes.py
@api_bp.route('/analyse', methods=['POST'])
def run_analyse():
"""Execute analyse operation"""
data = request.get_json()
config_file = data.get('config_file') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
# Read config using mastoscore's config module
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = read_config(str(config_path))
if config is None:
return jsonify({'error': 'Failed to read config file'}), 400
hashtag = config.get('mastoscore', 'hashtag')
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
# Check if fetch data exists
fetch_file = DATA_DIR / year / month / day / f'data-{hashtag}-fetch.json'
if not fetch_file.exists():
return jsonify({
'success': False,
'error': 'Fetch data not found',
'expected_file': str(fetch_file.relative_to(PROJECT_ROOT))
}), 404
# Check if analysis already exists
analysis_file = DATA_DIR / year / month / day / f'data-{hashtag}-analysis.json'
if analysis_file.exists():
with open(analysis_file, 'r') as f:
results = json.load(f)
return jsonify({
'success': True,
'cached': True,
'results': results,
'message': 'Analysis results loaded from cache'
}), 200
# Run analysis
result = analyse(config)
return jsonify({
'success': True,
'cached': False,
'results': result,
'message': 'Analysis completed successfully'
}), 200
except Exception as e:
error_details = traceback.format_exc()
print(f"Error in analyse: {error_details}")
return jsonify({
'success': False,
'error': str(e),
'traceback': error_details
}), 500
run_blog()
¶
Execute blog generation
Source code in webapi/api/routes.py
@api_bp.route('/blog', methods=['POST'])
def run_blog():
"""Execute blog generation"""
data = request.get_json()
config_file = data.get('config_file') if data else None
copy_files = data.get('copyFiles', False)
tags = data.get('tags', [])
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = str(PROJECT_ROOT / config_file)
config = read_config(config_path)
if config is None:
return jsonify({
'success': False,
'error': 'Failed to read config file'
}), 400
result = blog(config, copy_files=copy_files, additional_tags=tags)
return jsonify({
'success': result,
'config_file': config_file,
'message': 'Blog post generated successfully' if result else 'Blog generation failed'
}), 200 if result else 500
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
run_fetch()
¶
Execute fetch operation asynchronously
Source code in webapi/api/routes.py
@api_bp.route('/fetch', methods=['POST'])
def run_fetch():
"""Execute fetch operation asynchronously"""
data = request.get_json()
config_file = data.get('config_file') if data else None
use_mock = data.get('mock', False)
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = read_config(str(config_path))
if config is None:
return jsonify({'error': 'Failed to read config file'}), 400
# Get max toots from config
max_toots = config.getint('fetch', 'max')
progress = FetchProgress(max_toots=max_toots)
cancel = CancelToken()
job_id = run_async(fetch, config, progress_obj=progress, cancel_token=cancel, mock=use_mock)
return jsonify({
'job_id': job_id,
'status': 'running',
'config_file': config_file
}), 202
except Exception as e:
return jsonify({'error': str(e)}), 500
run_graph()
¶
Execute graph generation
Source code in webapi/api/routes.py
@api_bp.route('/graph', methods=['POST'])
def run_graph():
"""Execute graph generation"""
logger = logging.getLogger(__name__)
data = request.get_json()
config_file = data.get('config_file') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = str(PROJECT_ROOT / config_file)
config = read_config(config_path)
if config is None:
return jsonify({
'success': False,
'error': 'Failed to read config file'
}), 400
graph(config)
return jsonify({
'success': True,
'config_file': config_file,
'message': 'Graph generated successfully'
}), 200
except Exception as e:
logger.error(f"Error generating graph: {e}", exc_info=True)
return jsonify({
'success': False,
'error': str(e)
}), 500
run_post()
¶
Execute post operation (posts text and graphs)
Source code in webapi/api/routes.py
@api_bp.route('/post', methods=['POST'])
def run_post():
"""Execute post operation (posts text and graphs)"""
data = request.get_json()
config_file = data.get('config_file') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = str(PROJECT_ROOT / config_file)
config = read_config(config_path)
if config is None:
return jsonify({'error': 'Failed to read config file'}), 400
job_id = run_async(post_all, config)
return jsonify({
'job_id': job_id,
'status': 'running',
'config_file': config_file
}), 202
except Exception as e:
return jsonify({'error': str(e)}), 500
run_wordcloud()
¶
Execute wordcloud generation
Source code in webapi/api/routes.py
@api_bp.route('/wordcloud', methods=['POST'])
def run_wordcloud():
"""Execute wordcloud generation"""
data = request.get_json()
config_file = data.get('config_file') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
try:
config_path = str(PROJECT_ROOT / config_file)
config = read_config(config_path)
if config is None:
return jsonify({
'success': False,
'error': 'Failed to read config file'
}), 400
write_wordcloud(config)
return jsonify({
'success': True,
'config_file': config_file,
'message': 'Wordcloud generated successfully'
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
search_movie_posters()
¶
Search for movie posters using TMDb API
Source code in webapi/api/routes.py
@api_bp.route('/blog/search-posters', methods=['POST'])
def search_movie_posters():
"""
Search for movie posters using TMDb API
"""
data = request.get_json()
config_file = data.get('config_file') if data else None
search_query = data.get('query') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
# Check for TMDb API key
tmdb_api_key = os.getenv('TMDB_API_KEY')
if not tmdb_api_key or tmdb_api_key == 'your_tmdb_api_key_here':
return jsonify({'error': 'TMDb API key not configured in .env file'}), 500
try:
# If no query provided, get episode title from config
if not search_query:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
# Get episode title (e.g., "Planet Earth (1974)")
search_query = config.get('mastoscore', 'episode_title', fallback=None)
if not search_query:
return jsonify({'error': 'episode_title not found in config'}), 400
# Parse title and year from search_query
match = re.search(r'^(.+?)\s*\((\d{4})\)\s*$', search_query)
if match:
title = match.group(1).strip()
year = match.group(2)
else:
# No year found, use entire string as title
title = search_query.strip()
year = None
# Call TMDb API to search for movie
search_url = 'https://api.themoviedb.org/3/search/movie'
params = {
'api_key': tmdb_api_key,
'query': title
}
if year:
params['year'] = year
response = requests.get(search_url, params=params, timeout=10)
response.raise_for_status()
search_results = response.json()
# Format poster results (limit to top 5)
posters = []
for result in search_results.get('results', [])[:5]:
if result.get('poster_path'):
movie_id = result['id']
# Fetch detailed movie info including keywords
details_url = f'https://api.themoviedb.org/3/movie/{movie_id}'
details_params = {
'api_key': tmdb_api_key,
'append_to_response': 'keywords'
}
try:
details_response = requests.get(details_url, params=details_params, timeout=10)
details_response.raise_for_status()
details = details_response.json()
# Extract keywords
keywords = []
if 'keywords' in details and 'keywords' in details['keywords']:
keywords = [kw['name'] for kw in details['keywords']['keywords'][:10]] # Limit to 10
except Exception as e:
# If keywords fetch fails, continue without them
keywords = []
posters.append({
'id': str(movie_id),
'title': result['title'],
'poster_path': result['poster_path'],
'poster_url': f"https://image.tmdb.org/t/p/w342{result['poster_path']}",
'year': result.get('release_date', '')[:4] if result.get('release_date') else '',
'overview': result.get('overview', ''),
'keywords': keywords
})
return jsonify({
'posters': posters,
'config_file': config_file,
'search_title': title,
'search_year': year
}), 200
except requests.RequestException as e:
return jsonify({
'error': f'TMDb API request failed: {str(e)}'
}), 500
except Exception as e:
return jsonify({
'error': str(e)
}), 500
select_movie_poster()
¶
Download and save selected movie poster as thumb.jpg
Source code in webapi/api/routes.py
@api_bp.route('/blog/select-poster', methods=['POST'])
def select_movie_poster():
"""Download and save selected movie poster as thumb.jpg"""
data = request.get_json()
config_file = data.get('config_file') if data else None
poster_url = data.get('poster_url') if data else None
if not config_file:
return jsonify({'error': 'config_file parameter required'}), 400
if not poster_url:
return jsonify({'error': 'poster_url parameter required'}), 400
try:
config_path = PROJECT_ROOT / config_file
if not config_path.exists():
return jsonify({'error': 'Config file not found'}), 404
config = ConfigParser()
config.read(config_path)
year = config.get('mastoscore', 'event_year')
month = config.get('mastoscore', 'event_month')
day = config.get('mastoscore', 'event_day')
thumb_path = DATA_DIR / year / month / day / 'thumb.jpg'
# Download image from poster_url
response = requests.get(poster_url, timeout=30)
response.raise_for_status()
# Save as thumb.jpg
with open(thumb_path, 'wb') as f:
f.write(response.content)
return jsonify({
'success': True,
'config_file': config_file,
'thumb_file': str(thumb_path.relative_to(PROJECT_ROOT)),
'message': 'Poster saved successfully'
}), 200
except requests.RequestException as e:
return jsonify({
'error': f'Failed to download poster: {str(e)}'
}), 500
except Exception as e:
return jsonify({
'error': str(e)
}), 500
serve_graph(filename)
¶
Serve generated graph images
Source code in webapi/api/routes.py
@api_bp.route('/graphs/<path:filename>', methods=['GET'])
def serve_graph(filename):
"""Serve generated graph images"""
return jsonify({'error': 'Not implemented'}), 404
update_config(name)
¶
Update existing configuration file
Source code in webapi/api/routes.py
@api_bp.route('/config/<path:name>', methods=['PUT'])
def update_config(name):
"""Update existing configuration file"""
try:
config_path = PROJECT_ROOT / name
if not config_path.exists():
return jsonify({'error': 'Configuration not found'}), 404
if not os.access(config_path, os.W_OK):
return jsonify({'error': 'Permission denied'}), 403
# Get config data from request
data = request.get_json()
if not data or 'config' not in data:
return jsonify({'error': 'No configuration data provided'}), 400
# Write config file
config = ConfigParser()
for section, values in data['config'].items():
config.add_section(section)
for key, value in values.items():
config.set(section, key, str(value))
with open(config_path, 'w') as f:
config.write(f)
return jsonify({
'success': True,
'path': str(config_path.relative_to(PROJECT_ROOT))
})
except PermissionError:
return jsonify({'error': 'Permission denied'}), 403
except Exception as e:
return jsonify({'error': str(e)}), 500