I want to recreate one of my tables on a different server programmatically. From the web console I see I can ask for the table schema and it gets me the CREATE TABLE
statement, but I cannot see on the docs any reference to SHOW CREATE TABLE
or any other command that would work.
I've searched stackoverflow and I see this 3-year old question asking for the same , but the provided solution is not ideal, as it gives me only information about the tables and columns, but not the CREATE TABLE
statement.
For example, if I do table_columns('kafka_cluster');
I get this output:
But what I would need is this:
CREATE TABLE 'kafka_cluster' (
system SYMBOL capacity 256 CACHE,
controller_GlobalTopicCount_Value DOUBLE,
controller_MigratingZkBrokerCount_Value DOUBLE,
controller_ActiveControllerCount_Value DOUBLE,
topics_ReassignmentBytesInPerSec_Count DOUBLE,
controller_GlobalPartitionCount_Value DOUBLE,
request_MessageConversionsTimeMs_Count DOUBLE,
controller_ZkMigrationState_Value DOUBLE,
request_ResponseQueueTimeMs_Count DOUBLE,
topics_BytesOutPerSec_Count DOUBLE,
request_TemporaryMemoryBytes_Count DOUBLE,
topics_TotalProduceRequestsPerSec_Count DOUBLE,
replica_manager_IsrShrinksPerSec_Count DOUBLE,
request_RemoteTimeMs_Count DOUBLE,
controller_MetadataErrorCount_Value DOUBLE,
replica_manager_IsrExpandsPerSec_Count DOUBLE,
controller_EventQueueProcessingTimeMs_Count DOUBLE,
topics_TotalFetchRequestsPerSec_Count DOUBLE,
controller_LastAppliedRecordLagMs_Value DOUBLE,
controller_OfflinePartitionsCount_Value DOUBLE,
controller_LastAppliedRecordOffset_Value DOUBLE,
topics_InvalidOffsetOrSequenceRecordsPerSec_Count DOUBLE,
replica_manager_PartitionsWithLateTransactionsCount_Value DOUBLE,
request_RequestBytes_Count DOUBLE,
topics_ReassignmentBytesOutPerSec_Count DOUBLE,
controller_EventQueueOperationsTimedOutCount_Value DOUBLE,
replica_manager_LeaderCount_Value DOUBLE,
replica_manager_AtMinIsrPartitionCount_Value DOUBLE,
topics_BytesRejectedPerSec_Count DOUBLE,
topics_NoKeyCompactedTopicRecordsPerSec_Count DOUBLE,
topics_FailedProduceRequestsPerSec_Count DOUBLE,
controller_FencedBrokerCount_Value DOUBLE,
topics_ReplicationBytesInPerSec_Count DOUBLE,
controller_LastCommittedRecordOffset_Value DOUBLE,
request_ThrottleTimeMs_Count DOUBLE,
controller_EventQueueTimeMs_Count DOUBLE,
replica_manager_ProducerIdCount_Value DOUBLE,
request_RequestQueueTimeMs_Count DOUBLE,
replica_manager_PartitionCount_Value DOUBLE,
topics_MessagesInPerSec_Count DOUBLE,
controller_TimedOutBrokerHeartbeatCount_Value DOUBLE,
controller_NewActiveControllersCount_Value DOUBLE,
request_TotalTimeMs_Count DOUBLE,
replica_manager_FailedIsrUpdatesPerSec_Count DOUBLE,
replica_manager_UnderMinIsrPartitionCount_Value DOUBLE,
controller_PreferredReplicaImbalanceCount_Value DOUBLE,
topics_FetchMessageConversionsPerSec_Count DOUBLE,
topics_BytesInPerSec_Count DOUBLE,
replica_manager_ReassigningPartitions_Value DOUBLE,
topics_ReplicationBytesOutPerSec_Count DOUBLE,
topics_InvalidMessageCrcRecordsPerSec_Count DOUBLE,
controller_EventQueueOperationsStartedCount_Value DOUBLE,
topics_ProduceMessageConversionsPerSec_Count DOUBLE,
controller_LastAppliedRecordTimestamp_Value DOUBLE,
replica_manager_OfflineReplicaCount_Value DOUBLE,
request_ResponseSendTimeMs_Count DOUBLE,
topics_InvalidMagicNumberRecordsPerSec_Count DOUBLE,
replica_manager_UnderReplicatedPartitions_Value DOUBLE,
topics_FailedFetchRequestsPerSec_Count DOUBLE,
request_LocalTimeMs_Count DOUBLE,
controller_ActiveBrokerCount_Value DOUBLE,
timestamp TIMESTAMP
) timestamp (timestamp) PARTITION BY DAY WAL;
At the moment QuestDB does not expose any command to get the CREATE TABLE
statement. The web console sends several queries to the database asking for metadata (like the table_columns
one), and then it composes the statement dynamically.
This python snippet could be used to recreate the statement based on the metadata queries, and then to send the SQL to another server to create the table there:
import psycopg2
from psycopg2 import extras
import sys
def connect_qdb(host: str = '127.0.0.1', port: int = 8812, user: str = 'admin', pwd: str = 'quest', dbname: str = 'qdb'):
try:
conn = psycopg2.connect(f'user={user} password={pwd} host={host} port={port} dbname={dbname}')
conn.autocommit = False
return conn
except psycopg2.Error as e:
print(f'Had problem connecting with error {e}.')
def get_table_meta(conn, table_name):
meta = { "table_name": table_name, "partition" : None, "wal": False, "dedup" : None, "upsertKeys" : [],
"columns" : {}, "symbols": [], "designated" : None, "columns_sql" : [], "with" : [] }
with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
cur.execute(f"""
SELECT * FROM tables WHERE name = '{table_name}';
"""
)
row = cur.fetchone()
meta["dedup"] = row.get("dedup")
meta["designated"] = row.get("designatedTimestamp")
meta["partition"] = row.get("partitionBy")
if meta["partition"] == "NONE":
meta["partition"] = None
meta["wal"] = row.get("walEnabled")
if row.get("maxUncommittedRows"):
meta["with"].append(f' maxUncommittedRows={row["maxUncommittedRows"]} ')
cur.execute(f"""
SELECT * FROM table_columns('{table_name}');
"""
)
records = cur.fetchall()
for row in records:
column_name = row["column"]
column_type = row["type"]
meta["columns"][column_name]={"type": column_type}
if row["upsertKey"]:
meta["upsertKeys"].append(column_name)
if column_type == "SYMBOL":
meta["symbols"].append(column_name)
if row["symbolCached"]:
cached_sql = "CACHE"
else:
cached_sql = "NOCACHE"
if row["indexed"]:
cached_sql = 'INDEX CAPACITY {row["indexBlockCapacity"]}'
else:
index_sql = ""
meta["columns_sql"].append(f'{column_name} SYMBOL CAPACITY {row["symbolCapacity"]} {cached_sql} {index_sql}')
else:
meta["columns_sql"].append(f"{column_name} {column_type}")
return meta
def get_create_statement(conn, table_meta):
columns_sql = ",\n\t".join(table_meta["columns_sql"])
if table_meta["designated"]:
designated_sql = f' TIMESTAMP({table_meta["designated"]}) '
else:
designated_sql = ""
if table_meta["partition"]:
partition_sql = f' PARTITION BY {table_meta["partition"]} '
else:
partition_sql = ""
if table_meta["wal"]:
wal_sql = f' WAL '
else:
wal_sql = ""
if table_meta["dedup"]:
upsert_sql = ", ".join(table_meta["upsertKeys"])
dedup_sql = f" DEDUP UPSERT KEYS({upsert_sql}) "
else:
dedup_sql = ""
if table_meta["with"]:
with_sql = f' WITH {", ".join(table_meta["with"])}'
else:
with_sql = ""
sql = f"""\
CREATE TABLE IF NOT EXISTS {table_meta['table_name']} (
\t{columns_sql}
) {designated_sql} {partition_sql} {wal_sql} {with_sql} {dedup_sql};
"""
return sql
def create_dest_table(conn, sql):
with conn.cursor() as cur:
cur.execute(sql)
origin_conn = connect_qdb()
table_meta = get_table_meta(origin_conn, "table_name")
sql = get_create_statement(origin_conn, table_meta)
destination_conn = connect_qdb("destination_host_ip")
create_dest_table(destination_conn, sql)
origin_conn.close()
destination_conn.close()