diff --git a/ollama_proxy_server/main.py b/ollama_proxy_server/main.py index 531c2c6..31889c8 100644 --- a/ollama_proxy_server/main.py +++ b/ollama_proxy_server/main.py @@ -17,6 +17,7 @@ from ascii_colors import ASCIIColors from pathlib import Path import csv import datetime +from ascii_colors import ASCIIColors, trace_exception def get_config(filename): config = configparser.ConfigParser() @@ -49,37 +50,58 @@ def get_authorized_users(filename): except: ASCIIColors.red(f"User entry broken: {line.strip()}") return authorized_users +def display_config(args, servers, authorized_users): + print("\n🌟 Current Configuration 🌟") + ASCIIColors.blue(f"šŸ“ Config File: {args.config}") + ASCIIColors.blue(f"šŸ—„ļø Log Path: {args.log_path}") + ASCIIColors.blue(f"šŸ‘¤ Users List: {args.users_list}") + ASCIIColors.blue(f"šŸ”¢ Port Number: {args.port}") + ASCIIColors.yellow(f"āš ļø Deactivate Security: {'Yes 🚫' if args.deactivate_security else 'No āœ…'}") + # Additional config details + if servers: + print("\n🌐 Servers Configuration:") + for server in servers.items(): + ASCIIColors.green(f" {server[0]}: {server[1]}") + + print("\nšŸ”‘ Authorized Users:") + for user in authorized_users: + ASCIIColors.yellow(f" - šŸ‘¤ {user}") + def main(): parser = argparse.ArgumentParser() - parser.add_argument('--config', default="config.ini", help='Path to the authorized users list') + parser.add_argument('--config', default="config.ini", help='Path to the config file') parser.add_argument('--log_path', default="access_log.txt", help='Path to the access log file') - parser.add_argument('--users_list', default="authorized_users.txt", help='Path to the config file') + parser.add_argument('--users_list', default="authorized_users.txt", help='Path to the authorized users list') parser.add_argument('--port', type=int, default=11534, help='Port number for the server (default is 100 + default ollama port number)') parser.add_argument('-d', '--deactivate_security', action='store_true', help='Deactivates security') + args = parser.parse_args() servers = get_config(args.config) authorized_users = get_authorized_users(args.users_list) - deactivate_security = args.deactivate_security - ASCIIColors.red("Ollama Proxy server") - ASCIIColors.red("Author: ParisNeo") + ASCIIColors.red("Ollama Proxy Server") + ASCIIColors.multicolor(["Author:", "ParisNeo"], [ASCIIColors.color_red, ASCIIColors.color_magenta]) + + # Display the current configuration + display_config(args, servers, authorized_users) class RequestHandler(BaseHTTPRequestHandler): def add_access_log_entry(self, event, user, ip_address, access, server, nb_queued_requests_on_server, error=""): log_file_path = Path(args.log_path) + try: + if not log_file_path.exists(): + with open(log_file_path, mode='w', newline='') as csvfile: + fieldnames = ['time_stamp', 'event', 'user_name', 'ip_address', 'access', 'server', 'nb_queued_requests_on_server', 'error'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() - if not log_file_path.exists(): - with open(log_file_path, mode='w', newline='') as csvfile: + with open(log_file_path, mode='a', newline='') as csvfile: fieldnames = ['time_stamp', 'event', 'user_name', 'ip_address', 'access', 'server', 'nb_queued_requests_on_server', 'error'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - - with open(log_file_path, mode='a', newline='') as csvfile: - fieldnames = ['time_stamp', 'event', 'user_name', 'ip_address', 'access', 'server', 'nb_queued_requests_on_server', 'error'] - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - row = {'time_stamp': str(datetime.datetime.now()), 'event': event, 'user_name': user, 'ip_address': ip_address, 'access': access, 'server': server, 'nb_queued_requests_on_server': nb_queued_requests_on_server, 'error': error} - writer.writerow(row) - + row = {'time_stamp': str(datetime.datetime.now()), 'event': event, 'user_name': user, 'ip_address': ip_address, 'access': access, 'server': server, 'nb_queued_requests_on_server': nb_queued_requests_on_server, 'error': error} + writer.writerow(row) + except Exception as ex: + trace_exception(ex) def _send_response(self, response): self.send_response(response.status_code) for key, value in response.headers.items():