Search code examples
pythondatabase-connectionlarge-language-modelmilvushaystack

Milvus ConnectionRefusedError: how to connect locally


I am trying to run a RAG pipeline using haystack & Milvus.

Its my first time using Milvus, and I seem to have an issue with it.

I'm following this tutorial, with some basic changes: https://milvus.io/docs/integrate_with_haystack.md

Here is my code:

import os
import urllib.request

from haystack import Pipeline
from haystack.components.converters import MarkdownToDocument
from haystack_integrations.components.embedders.ollama import OllamaDocumentEmbedder, OllamaTextEmbedder

from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter

from milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetriever

url = "https://www.gutenberg.org/cache/epub/7785/pg7785.txt"
file_path = "./davinci.txt"

if not os.path.exists(file_path):
    urllib.request.urlretrieve(url, file_path) 

document_store = MilvusDocumentStore(
    connection_args={"uri": "./milvus.db"},
    drop_old=True,
)

indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", MarkdownToDocument())
indexing_pipeline.add_component(
    "splitter", DocumentSplitter(split_by="sentence", split_length=2)
)
indexing_pipeline.add_component("embedder", OllamaDocumentEmbedder())
indexing_pipeline.add_component("writer", DocumentWriter(document_store))
indexing_pipeline.connect("converter", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")

indexing_pipeline.draw('./pipeline_diagram.png')

indexing_pipeline.run({"converter": {"sources": [file_path]}})

It all works well until the last line, where I get a ConnectionRefusedError. First the conversion (from markdown to document) runs well, but then the code fails.

I am not sure why it happens, as I see the milvus.db and milvus.db.lock files created as expected.

The full error is:

---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:203, in HTTPConnection._new_conn(self)
    202 try:
--> 203     sock = connection.create_connection(
    204         (self._dns_host, self.port),
    205         self.timeout,
    206         source_address=self.source_address,
    207         socket_options=self.socket_options,
    208     )
    209 except socket.gaierror as e:

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/util/connection.py:85, in create_connection(address, timeout, source_address, socket_options)
     84 try:
---> 85     raise err
     86 finally:
     87     # Break explicitly a reference cycle

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/util/connection.py:73, in create_connection(address, timeout, source_address, socket_options)
     72     sock.bind(source_address)
---> 73 sock.connect(sa)
     74 # Break explicitly a reference cycle

ConnectionRefusedError: [Errno 61] Connection refused

The above exception was the direct cause of the following exception:

NewConnectionError                        Traceback (most recent call last)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connectionpool.py:791, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
    790 # Make the request on the HTTPConnection object
--> 791 response = self._make_request(
    792     conn,
    793     method,
    794     url,
    795     timeout=timeout_obj,
    796     body=body,
    797     headers=headers,
    798     chunked=chunked,
    799     retries=retries,
    800     response_conn=response_conn,
    801     preload_content=preload_content,
    802     decode_content=decode_content,
    803     **response_kw,
    804 )
    806 # Everything went great!

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connectionpool.py:497, in HTTPConnectionPool._make_request(self, conn, method, url, body, headers, retries, timeout, chunked, response_conn, preload_content, decode_content, enforce_content_length)
    496 try:
--> 497     conn.request(
    498         method,
    499         url,
    500         body=body,
    501         headers=headers,
    502         chunked=chunked,
    503         preload_content=preload_content,
    504         decode_content=decode_content,
    505         enforce_content_length=enforce_content_length,
    506     )
    508 # We are swallowing BrokenPipeError (errno.EPIPE) since the server is
    509 # legitimately able to close the connection after sending a valid response.
    510 # With this behaviour, the received response is still readable.

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:395, in HTTPConnection.request(self, method, url, body, headers, chunked, preload_content, decode_content, enforce_content_length)
    394     self.putheader(header, value)
--> 395 self.endheaders()
    397 # If we're given a body we start sending that in chunks.

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/http/client.py:1289, in HTTPConnection.endheaders(self, message_body, encode_chunked)
   1288     raise CannotSendHeader()
-> 1289 self._send_output(message_body, encode_chunked=encode_chunked)

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/http/client.py:1048, in HTTPConnection._send_output(self, message_body, encode_chunked)
   1047 del self._buffer[:]
-> 1048 self.send(msg)
   1050 if message_body is not None:
   1051 
   1052     # create a consistent interface to message_body

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/http/client.py:986, in HTTPConnection.send(self, data)
    985 if self.auto_open:
--> 986     self.connect()
    987 else:

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:243, in HTTPConnection.connect(self)
    242 def connect(self) -> None:
--> 243     self.sock = self._new_conn()
    244     if self._tunnel_host:
    245         # If we're tunneling it means we're connected to our proxy.

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:218, in HTTPConnection._new_conn(self)
    217 except OSError as e:
--> 218     raise NewConnectionError(
    219         self, f"Failed to establish a new connection: {e}"
    220     ) from e
    222 # Audit hooks are only available in Python 3.8+

NewConnectionError: <urllib3.connection.HTTPConnection object at 0x30ca49690>: Failed to establish a new connection: [Errno 61] Connection refused

The above exception was the direct cause of the following exception:

MaxRetryError                             Traceback (most recent call last)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/adapters.py:486, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
    485 try:
--> 486     resp = conn.urlopen(
    487         method=request.method,
    488         url=url,
    489         body=request.body,
    490         headers=request.headers,
    491         redirect=False,
    492         assert_same_host=False,
    493         preload_content=False,
    494         decode_content=False,
    495         retries=self.max_retries,
    496         timeout=timeout,
    497         chunked=chunked,
    498     )
    500 except (ProtocolError, OSError) as err:

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connectionpool.py:845, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
    843     new_e = ProtocolError("Connection aborted.", new_e)
--> 845 retries = retries.increment(
    846     method, url, error=new_e, _pool=self, _stacktrace=sys.exc_info()[2]
    847 )
    848 retries.sleep()

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/util/retry.py:515, in Retry.increment(self, method, url, response, error, _pool, _stacktrace)
    514     reason = error or ResponseError(cause)
--> 515     raise MaxRetryError(_pool, url, reason) from reason  # type: ignore[arg-type]
    517 log.debug("Incremented Retry for (url='%s'): %r", url, new_retry)

MaxRetryError: HTTPConnectionPool(host='localhost', port=11434): Max retries exceeded with url: /api/embeddings (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x30ca49690>: Failed to establish a new connection: [Errno 61] Connection refused'))

During handling of the above exception, another exception occurred:

ConnectionError                           Traceback (most recent call last)
Cell In[15], line 1
----> 1 indexing_pipeline.run({"converter": {"sources": [file_path]}})
      3 print("Number of documents:", document_store.count_documents())

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/haystack/core/pipeline/pipeline.py:197, in Pipeline.run(self, data, debug, include_outputs_from)
    195 span.set_content_tag("haystack.component.input", last_inputs[name])
    196 logger.info("Running component {component_name}", component_name=name)
--> 197 res = comp.run(**last_inputs[name])
    198 self.graph.nodes[name]["visits"] += 1
    200 if not isinstance(res, Mapping):

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/haystack_integrations/components/embedders/ollama/document_embedder.py:139, in OllamaDocumentEmbedder.run(self, documents, generation_kwargs)
    136     raise TypeError(msg)
    138 texts_to_embed = self._prepare_texts_to_embed(documents=documents)
--> 139 embeddings, meta = self._embed_batch(
    140     texts_to_embed=texts_to_embed, batch_size=self.batch_size, generation_kwargs=generation_kwargs
    141 )
    143 for doc, emb in zip(documents, embeddings):
    144     doc.embedding = emb

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/haystack_integrations/components/embedders/ollama/document_embedder.py:107, in OllamaDocumentEmbedder._embed_batch(self, texts_to_embed, batch_size, generation_kwargs)
    105 batch = texts_to_embed[i]  # Single batch only
    106 payload = self._create_json_payload(batch, generation_kwargs)
--> 107 response = requests.post(url=self.url, json=payload, timeout=self.timeout)
    108 response.raise_for_status()
    109 result = response.json()

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/api.py:115, in post(url, data, json, **kwargs)
    103 def post(url, data=None, json=None, **kwargs):
    104     r"""Sends a POST request.
    105 
    106     :param url: URL for the new :class:`Request` object.
   (...)
    112     :rtype: requests.Response
    113     """
--> 115     return request("post", url, data=data, json=json, **kwargs)

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/api.py:59, in request(method, url, **kwargs)
     55 # By using the 'with' statement we are sure the session is closed, thus we
     56 # avoid leaving sockets open which can trigger a ResourceWarning in some
     57 # cases, and look like a memory leak in others.
     58 with sessions.Session() as session:
---> 59     return session.request(method=method, url=url, **kwargs)

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
    584 send_kwargs = {
    585     "timeout": timeout,
    586     "allow_redirects": allow_redirects,
    587 }
    588 send_kwargs.update(settings)
--> 589 resp = self.send(prep, **send_kwargs)
    591 return resp

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/sessions.py:703, in Session.send(self, request, **kwargs)
    700 start = preferred_clock()
    702 # Send the request
--> 703 r = adapter.send(request, **kwargs)
    705 # Total elapsed time of the request (approximately)
    706 elapsed = preferred_clock() - start

File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/adapters.py:519, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
    515     if isinstance(e.reason, _SSLError):
    516         # This branch is for urllib3 v1.22 and later.
    517         raise SSLError(e, request=request)
--> 519     raise ConnectionError(e, request=request)
    521 except ClosedPoolError as e:
    522     raise ConnectionError(e, request=request)

ConnectionError: HTTPConnectionPool(host='localhost', port=11434): Max retries exceeded with url: /api/embeddings (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x30ca49690>: Failed to establish a new connection: [Errno 61] Connection refused'))

any help resolving this would be appreciated. My assumption is that it is something very simple in creating the milvus database local connection, but I dont know where it is.


Solution

  • As suggested in earlier replies, Ollama was not running locally.

    To resolve this I needed to:
    1 - download and install Ollama
    2 - pull (or run) in the terminal ollama pull nomic-embed-text for the embedders. This is not clear enough in the haystack documentation for the ollama integration but once this is done it should run.

    Also I would suggest to change from:
    indexing_pipeline.add_component("embedder", OllamaDocumentEmbedder())
    to:
    indexing_pipeline.add_component("embedder", OllamaDocumentEmbedder(model="nomic-embed-text", url="http://localhost:11434/api/embeddings"))