Spaces:
Sleeping
Sleeping
import os | |
import logging | |
from elasticsearch import Elasticsearch, ConnectionError, AuthenticationException | |
# Configure logging at the application level | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO) | |
# Load environment variables | |
ES_CLIENT_URL = os.getenv("ELASTICSEARCH_HOSTS", "http://localhost:9200") | |
class ElasticsearchClientError(Exception): | |
"""Custom exception for Elasticsearch client errors.""" | |
pass | |
def get_es_client() -> Elasticsearch: | |
""" | |
Establish connection to Elasticsearch and return the client instance. | |
Raises ElasticsearchClientError if the connection cannot be established. | |
""" | |
try: | |
print("es client", ES_CLIENT_URL) | |
# Initialize Elasticsearch client | |
es_client = Elasticsearch( | |
hosts=[ES_CLIENT_URL], | |
) | |
# Verify connection | |
if not es_client.ping(): | |
error_message = "Elasticsearch cluster is not reachable!" | |
logger.error(error_message) | |
raise ElasticsearchClientError(error_message) | |
logger.info("Successfully connected to Elasticsearch") | |
return es_client | |
except (ConnectionError, AuthenticationException) as e: | |
error_message = f"Elasticsearch connection error: {e}" | |
logger.error(error_message) | |
raise ElasticsearchClientError(error_message) from e |