Search code examples
azureapache-sparkpysparkazure-sql-databaseazure-dsvm

Retrieve data from Azure HDInsight with PySpark


I have the credentials and the URL for access to an Azure database.

I want to read the data using pyspark but I don't know how to do it.

Is there a specific syntax to connect to an Azure database?

EDIT

After I used the shared code I received this kind of error, any suggestion?

I saw that in a sample that i have on the machine they use ODBC driver, maybe this is involved?

2018-07-14 11:22:00 WARN  SQLServerConnection:2141 - ConnectionID:1 ClientConnectionId: 7561d3ba-71ac-43b3-a35f-26ababef90cc Prelogin error: host servername.azurehdinsight.net port 443 Error reading prelogin response: An existing connection was forcibly closed by the remote host ClientConnectionId:7561d3ba-71ac-43b3-a35f-26ababef90cc

Traceback (most recent call last):
  File "C:/Users/team2/PycharmProjects/Bridgestone/spark_driver_style.py", line 46, in <module>
    .option("password", "**********")\
  File "C:\dsvm\tools\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "C:\Users\team2\PycharmProjects\Bridgestone\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\dsvm\tools\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\team2\PycharmProjects\Bridgestone\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: com.microsoft.sqlserver.jdbc.SQLServerException: An existing connection was forcibly closed by the remote host ClientConnectionId:7561d3ba-71ac-43b3-a35f-26ababef90cc
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:2400)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:2384)
    at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1884)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.Prelogin(SQLServerConnection.java:2137)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper(SQLServerConnection.java:1973)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnection.java:1628)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectInternal(SQLServerConnection.java:1459)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connect(SQLServerConnection.java:773)
    at com.microsoft.sqlserver.jdbc.SQLServerDriver.connect(SQLServerDriver.java:1168)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

Solution

  • If you want to access your HDInsight cluster from a pyspark Notebook in a Data Science VM you can follow the steps described in the Tutorial under step 7.

    Import needed packages:

    #Import required Packages
    import pyodbc
    import time as time
    import json
    import os
    import urllib
    import warnings
    import re
    import pandas as pd
    

    Setup the Hive Metastore connection (user and password from the cluster is needed):

    #Create the connection to Hive using ODBC
    SERVER_NAME='xxx.azurehdinsight.net'
    DATABASE_NAME='default'
    USERID='xxx'
    PASSWORD='xxxx'
    DB_DRIVER='Microsoft Hive ODBC Driver'
    driver = 'DRIVER={' + DB_DRIVER + '}'
    server = 'Host=' + SERVER_NAME + ';Port=443'
    database = 'Schema=' + DATABASE_NAME
    hiveserv = 'HiveServerType=2'
    auth = 'AuthMech=6'
    uid = 'UID=' + USERID
    pwd = 'PWD=' + PASSWORD
    CONNECTION_STRING = ';'.join([driver,server,database,hiveserv,auth,uid,pwd])
    connection = pyodbc.connect(CONNECTION_STRING, autocommit=True)
    cursor=connection.cursor()
    

    Query the data:

    queryString = """
        show tables in default;
    """
    pd.read_sql(queryString,connection)