Search code examples
pythonsql-parser

python SQL parser to get the column name and data type


Im using the python SQL parser to get the table information. Im able to get the table name and schema name.

import sqlparse
line = '''
CREATE TABLE public.actor (
    actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
    first_name character varying(45) NOT NULL,
    last_name character varying(45) NOT NULL,
    last_update timestamp without time zone DEFAULT now() NOT NULL
);

CREATE TABLE public.category (
    category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
    name character varying(25) NOT NULL,
    last_update timestamp without time zone DEFAULT now() NOT NULL
);

CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
(
    "div_cd" VARCHAR(2) NOT NULL
    ,"div_name" VARCHAR(30) NOT NULL
    ,"org_cd" VARCHAR(8) NOT NULL
    ,"org_name" VARCHAR(60) NOT NULL
    ,"team_cd" VARCHAR(2) NOT NULL
    ,"team_name" VARCHAR(120) NOT NULL
    ,"personal_cd" VARCHAR(7) NOT NULL
    ,"personal_name" VARCHAR(300) NOT NULL
    ,"username" VARCHAR(6) NOT NULL
    ,"staff_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
    ,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;

CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
(
     "staff_flg" CHAR(1)  DEFAULT '0'::bpchar SORTKEY ENCODE lzo 
    ,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
'''
parse = sqlparse.parse(line)
print([str(t) for t in parse[0].tokens if t.ttype is None][0])

Output: 
public.actor

But if I want to return the column name and the data type which token I can use for printing both the two DDL.

The output is something like this,[not exactly the same :)]

table: public.actor

print column name and data type one by one(maybe in a for loop)

column: actor_id
date type: integer
column: first_name
data type: character varying

Solution

  • Code documented inline

    import sqlparse
    
    line = '''
    CREATE TABLE public.actor (
        actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
        first_name character varying(45) NOT NULL,
        last_name character varying(45) NOT NULL,
        last_update timestamp without time zone DEFAULT now() NOT NULL
    );
    
    CREATE TABLE public.category (
        category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
        name character varying(25) NOT NULL,
        last_update timestamp without time zone DEFAULT now() NOT NULL
    );
    
    CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
    (
        "div_cd" VARCHAR(2) NOT NULL
        ,"div_name" VARCHAR(30) NOT NULL
        ,"org_cd" VARCHAR(8) NOT NULL
        ,"org_name" VARCHAR(60) NOT NULL
        ,"team_cd" VARCHAR(2) NOT NULL
        ,"team_name" VARCHAR(120) NOT NULL
        ,"personal_cd" VARCHAR(7) NOT NULL
        ,"personal_name" VARCHAR(300) NOT NULL
        ,"username" VARCHAR(6) NOT NULL
        ,"staff_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
        ,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
    )
    DISTSTYLE EVEN
    ;
    
    CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
    (
         "staff_flg" CHAR(1)  DEFAULT '0'::bpchar SORTKEY ENCODE lzo 
        ,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
    )
    DISTSTYLE EVEN
    ;
    '''
    
    def get_table_name(tokens):
        for token in reversed(tokens):
            if token.ttype is None:
                return token.value
        return " "
    
    parse = sqlparse.parse(line)
    for stmt in parse:
        # Get all the tokens except whitespaces
        tokens = [t for t in sqlparse.sql.TokenList(stmt.tokens) if t.ttype != sqlparse.tokens.Whitespace]
        is_create_stmt = False
        for i, token in enumerate(tokens):
            # Is it a create statements ?
            if token.match(sqlparse.tokens.DDL, 'CREATE'):
                is_create_stmt = True
                continue
            
            # If it was a create statement and the current token starts with "("
            if is_create_stmt and token.value.startswith("("):
                # Get the table name by looking at the tokens in reverse order till you find
                # a token with None type
                print (f"table: {get_table_name(tokens[:i])}")
    
                # Now parse the columns
                txt = token.value
                columns = txt[1:txt.rfind(")")].replace("\n","").split(",")
                for column in columns:
                    c = ' '.join(column.split()).split()
                    c_name = c[0].replace('\"',"")
                    c_type = c[1]  # For condensed type information 
                    # OR 
                    #c_type = " ".join(c[1:]) # For detailed type information 
                    print (f"column: {c_name}")
                    print (f"date type: {c_type}")
                print ("---"*20)
                break
    

    Output:

    table: public.actor
    column: actor_id
    date type: integer
    column: first_name
    date type: character
    column: last_name
    date type: character
    column: last_update
    date type: timestamp
    ------------------------------------------------------------
    table: public.category
    column: category_id
    date type: integer
    column: name
    date type: character
    column: last_update
    date type: timestamp
    ------------------------------------------------------------
    table: "sample_schema"."sample_table"
    column: div_cd
    date type: VARCHAR(2)
    column: div_name
    date type: VARCHAR(30)
    column: org_cd
    date type: VARCHAR(8)
    column: org_name
    date type: VARCHAR(60)
    column: team_cd
    date type: VARCHAR(2)
    column: team_name
    date type: VARCHAR(120)
    column: personal_cd
    date type: VARCHAR(7)
    column: personal_name
    date type: VARCHAR(300)
    column: username
    date type: VARCHAR(6)
    column: staff_flg
    date type: CHAR(1)
    column: leader_flg
    date type: CHAR(1)
    ------------------------------------------------------------
    table: "sample_schema"."ref_table"
    column: staff_flg
    date type: CHAR(1)
    column: leader_flg
    date type: CHAR(1)
    ------------------------------------------------------------