I am having below data frame and wanted to save the data frame as a CSV file in the Azure Data lake. My data frame is called 'df'. I am using Azure Synpase Notebook
df.to_csv('abfss://jobsdata@strxxxuei.dfs.core.windows.net/Jobs_newdata/data.csv', sep=',', encoding='utf-8', index=False)
Getting the below error message when I tried to run the above code,
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/tmp/ipykernel_6713/3472604753.py in <module>
----> 1 df.to_csv('abfss://jobsdata@strxxxuei.dfs.core.windows.net/Jobs_newdata/jobs.csv', sep=',', encoding='utf-8', index=False)
~/cluster-env/clonedenv/lib/python3.8/site-packages/pandas/core/generic.py in to_csv(self, path_or_buf, sep, na_rep, float_format, columns, header, index, index_label, mode, encoding, compression, quoting, quotechar, line_terminator, chunksize, date_format, doublequote, escapechar, decimal, errors, storage_options)
3385 )
3386
-> 3387 return DataFrameRenderer(formatter).to_csv(
3388 path_or_buf,
3389 line_terminator=line_terminator,
~/cluster-env/clonedenv/lib/python3.8/site-packages/pandas/io/formats/format.py in to_csv(self, path_or_buf, encoding, sep, columns, index_label, mode, compression, quoting, quotechar, line_terminator, chunksize, date_format, doublequote, escapechar, errors, storage_options)
1081 formatter=self.fmt,
1082 )
-> 1083 csv_formatter.save()
1084
1085 if created_buffer:
~/cluster-env/clonedenv/lib/python3.8/site-packages/pandas/io/formats/csvs.py in save(self)
226 """
227 # apply compression and byte/text conversion
--> 228 with get_handle(
229 self.filepath_or_buffer,
230 self.mode,
~/cluster-env/clonedenv/lib/python3.8/site-packages/pandas/io/common.py in get_handle(path_or_buf, mode, encoding, compression, memory_map, is_text, errors, storage_options)
556
557 # open URLs
--> 558 ioargs = _get_filepath_or_buffer(
559 path_or_buf,
560 encoding=encoding,
~/cluster-env/clonedenv/lib/python3.8/site-packages/pandas/io/common.py in _get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode, storage_options)
331
332 try:
--> 333 file_obj = fsspec.open(
334 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
335 ).open()
~/cluster-env/clonedenv/lib/python3.8/site-packages/fsspec/core.py in open(urlpath, mode, compression, encoding, errors, protocol, newline, **kwargs)
427 ``OpenFile`` object.
428 """
--> 429 return open_files(
430 urlpath=[urlpath],
431 mode=mode,
~/cluster-env/clonedenv/lib/python3.8/site-packages/fsspec/core.py in open_files(urlpath, mode, compression, encoding, errors, name_function, num, protocol, newline, auto_mkdir, expand, **kwargs)
279 be used as a single context
280 """
--> 281 fs, fs_token, paths = get_fs_token_paths(
282 urlpath,
283 mode,
~/cluster-env/clonedenv/lib/python3.8/site-packages/fsspec/core.py in get_fs_token_paths(urlpath, mode, num, name_function, storage_options, protocol, expand)
597 "share the same protocol"
598 )
--> 599 cls = get_filesystem_class(protocol)
600 optionss = list(map(cls._get_kwargs_from_urls, urlpath))
601 paths = [cls._strip_protocol(u) for u in urlpath]
~/cluster-env/clonedenv/lib/python3.8/site-packages/fsspec/registry.py in get_filesystem_class(protocol)
209 bit = known_implementations[protocol]
210 try:
--> 211 register_implementation(protocol, _import_class(bit["class"]))
212 except ImportError as e:
213 raise ImportError(bit["err"]) from e
~/cluster-env/clonedenv/lib/python3.8/site-packages/fsspec/registry.py in _import_class(cls, minv)
232 else:
233 mod, name = cls.rsplit(".", 1)
--> 234 mod = importlib.import_module(mod)
235 return getattr(mod, name)
236
~/cluster-env/clonedenv/lib/python3.8/importlib/__init__.py in import_module(name, package)
125 break
126 level += 1
--> 127 return _bootstrap._gcd_import(name[level:], package, level)
128
129
~/cluster-env/clonedenv/lib/python3.8/importlib/_bootstrap.py in _gcd_import(name, package, level)
~/cluster-env/clonedenv/lib/python3.8/importlib/_bootstrap.py in _find_and_load(name, import_)
~/cluster-env/clonedenv/lib/python3.8/importlib/_bootstrap.py in _find_and_load_unlocked(name, import_)
~/cluster-env/clonedenv/lib/python3.8/importlib/_bootstrap.py in _load_unlocked(spec)
~/cluster-env/clonedenv/lib/python3.8/importlib/_bootstrap_external.py in exec_module(self, module)
~/cluster-env/clonedenv/lib/python3.8/importlib/_bootstrap.py in _call_with_frames_removed(f, *args, **kwds)
~/cluster-env/clonedenv/lib/python3.8/site-packages/fsspec_wrapper/__init__.py in <module>
----> 1 from .core import (
2 AzureBlobFileSystem
3 )
4
5 __all__ = [
~/cluster-env/clonedenv/lib/python3.8/site-packages/fsspec_wrapper/core.py in <module>
3 from .utils import logger as synapseml_pandas_logger
4 from .utils.common import SynapseCredential
----> 5 import adlfs
6 import time
7 import re
~/cluster-env/clonedenv/lib/python3.8/site-packages/adlfs/__init__.py in <module>
----> 1 from .spec import AzureDatalakeFileSystem
2 from .spec import AzureBlobFileSystem, AzureBlobFile
3 from ._version import get_versions
4
5 __all__ = ["AzureBlobFileSystem", "AzureBlobFile", "AzureDatalakeFileSystem"]
~/cluster-env/clonedenv/lib/python3.8/site-packages/adlfs/spec.py in <module>
16 ResourceExistsError,
17 )
---> 18 from azure.storage.blob._shared.base_client import create_configuration
19 from azure.datalake.store import AzureDLFileSystem, lib
20 from azure.datalake.store.core import AzureDLFile, AzureDLPath
~/cluster-env/clonedenv/lib/python3.8/site-packages/azure/storage/blob/__init__.py in <module>
8 from typing import Union, Iterable, AnyStr, IO, Any, Dict # pylint: disable=unused-import
9 from ._version import VERSION
---> 10 from ._blob_client import BlobClient
11 from ._container_client import ContainerClient
12 from ._blob_service_client import BlobServiceClient
~/cluster-env/clonedenv/lib/python3.8/site-packages/azure/storage/blob/_blob_client.py in <module>
24
25 from ._shared import encode_base64
---> 26 from ._shared.base_client import StorageAccountHostsMixin, parse_connection_str, parse_query, TransportWrapper
27 from ._shared.encryption import generate_blob_encryption_data
28 from ._shared.uploads import IterStreamer
~/cluster-env/clonedenv/lib/python3.8/site-packages/azure/storage/blob/_shared/base_client.py in <module>
38 from .constants import STORAGE_OAUTH_SCOPE, SERVICE_HOST_BASE, CONNECTION_TIMEOUT, READ_TIMEOUT
39 from .models import LocationMode
---> 40 from .authentication import SharedKeyCredentialPolicy
41 from .shared_access_signature import QueryStringConstants
42 from .request_handlers import serialize_batch_body, _get_batch_request_delimiter
~/cluster-env/clonedenv/lib/python3.8/site-packages/azure/storage/blob/_shared/authentication.py in <module>
20
21 try:
---> 22 from azure.core.pipeline.transport import AioHttpTransport
23 except ImportError:
24 AioHttpTransport = None
~/cluster-env/clonedenv/lib/python3.8/importlib/_bootstrap.py in _handle_fromlist(module, fromlist, import_, recursive)
~/cluster-env/clonedenv/lib/python3.8/site-packages/azure/core/pipeline/transport/__init__.py in __getattr__(name)
66 if name == 'AioHttpTransport':
67 try:
---> 68 from ._aiohttp import AioHttpTransport
69 return AioHttpTransport
70 except ImportError:
~/cluster-env/clonedenv/lib/python3.8/site-packages/azure/core/pipeline/transport/_aiohttp.py in <module>
33 import asyncio
34 import codecs
---> 35 import aiohttp
36 from multidict import CIMultiDict
37
~/cluster-env/clonedenv/lib/python3.8/site-packages/aiohttp/__init__.py in <module>
4
5 from . import hdrs as hdrs
----> 6 from .client import (
7 BaseConnector as BaseConnector,
8 ClientConnectionError as ClientConnectionError,
~/cluster-env/clonedenv/lib/python3.8/site-packages/aiohttp/client.py in <module>
33 from yarl import URL
34
---> 35 from . import hdrs, http, payload
36 from .abc import AbstractCookieJar
37 from .client_exceptions import (
~/cluster-env/clonedenv/lib/python3.8/site-packages/aiohttp/http.py in <module>
5 from . import __version__
6 from .http_exceptions import HttpProcessingError as HttpProcessingError
----> 7 from .http_parser import (
8 HeadersParser as HeadersParser,
9 HttpParser as HttpParser,
~/cluster-env/clonedenv/lib/python3.8/site-packages/aiohttp/http_parser.py in <module>
13 from . import hdrs
14 from .base_protocol import BaseProtocol
---> 15 from .helpers import NO_EXTENSIONS, BaseTimerContext
16 from .http_exceptions import (
17 BadStatusLine,
~/cluster-env/clonedenv/lib/python3.8/site-packages/aiohttp/helpers.py in <module>
665
666
--> 667 class CeilTimeout(async_timeout.timeout):
668 def __enter__(self) -> async_timeout.timeout:
669 if self._timeout is not None:
TypeError: function() argument 'code' must be code, not str
I am getting the above error message, not sure how to rectify it.
Can anyone advise what is the issue in my code?
This could be due to invalid permission to access the container or you may not have write permissions to the container.
I have repro’d with your code and was able to write the data to CSV successfully.
df.to_csv('abfss://<container name>@<storage account name>.dfs.core.windows.net/source/sample2.csv', sep=',', encoding='utf-8', index=False)
Output: