前不久新項目中需要用到ClickHouse,作為一個合格的Python程序員,首先當然是找找有沒有合適的輪子。
都勻ssl適用于網站、小程序/APP、API接口等需要進行數據傳輸應用場景,ssl證書未來市場廣闊!成為創新互聯公司的ssl證書銷售渠道,可以享受市場價格4-6折優惠!如果有意向歡迎電話聯系或者加微信:13518219792(備注:SSL證書合作)期待與您的合作!
翻了一圈,infi.clickhouse_orm在功能和易用性上沒有明顯的短板,其ORM API對后端程序員格外親切。可惜主分支已經八個月沒有更新了,據聞核心開發者已離職,而infi.clickhouse_orm尚不支持一些我需要的新功能如Geo類型和函數,基于這些原因,這篇文章的主角ch-orm也就誕生了。
ch-orm庫fork自infi.clickhouse_orm(v2.1.1)。
與infi相比,ch-orm支持同步和異步兩種方式與ClickHouse服務器交互,它添加了一些新功能:
異步支持(AioDatabase)
類型注解
新的類型支持
新的函數支持
支持創建臨時表(TemporaryModel)
需要提醒的是,ch-orm僅使用ClickHouse的http協議,不支持TCP協議。更多細節參見Github上的文檔。
通過pip安裝ch-orm
pip install ch-orm
雖然pypi的庫名為ch-orm
,但在代碼中需要導入的是clickhouse_orm
。
from clickhouse_orm import Database, Model, MergeTree
from clickhouse_orm.fields import (
StringField, Int32Field, UUIDField, Int8Field
)
from clickhouse_orm.contrib.geo.fields import PointField
class Residence(Model):
uuid = UUIDField()
residence_type = Int8Field()
geo = PointField(db_column='geo_wgs84')
geohash_wgs84 = StringField()
province = StringField()
city = StringField()
district = StringField()
poi_id = Int32Field(default=1000)
poi_name = StringField()
p_geo_bd09 = PointField()
engine = MergeTree(partition_key=('uuid', ), order_by=('uuid', ))
@classmethod
def table_name(cls):
return 'residence'
上面定義了一個Residence
模型,它將會映射到ClickHouse上的residence
表,而Residence
中眾多Field屬性則被映射為表中的列,可以在Python中對Residence實例進行操作進而處理ClickHouse(沒錯,就像Django ORM所做的那樣)
接下來,先假定此時residence
尚不存在,借助Residence
來創建它。
想要對數據庫執行操作,首先必須實例化一個Database對象(或AioDatabase),可以粗淺的理解為它和數據庫連接屬于一類抽象,內部實現對后端數據庫的交互。
from clickhouse_orm.database import Database
from clickhouse_orm.aio.database import AioDatabase
# 以同步方式創建數據庫
sync_db = Database('db-test', db_url='http://localhost:8123/')
sync_db.create_table(Residence)
# 以異步方式創建數據庫
async def main():
async_db = AioDatabase('db-test', db_url='http://localhost:8123/')
# 異步模型下需要主動執行init方法初始化
await async_db.init()
await async_db.create_table(Residence)
此時,db-test庫內應當出現了一個名為residence
的表。
ClickHouse在數據寫入性能表現十分優異,ch-orm能輕易處理寫入數據需求
以寫入100萬條數據為例,使用生成器創建100萬個Residence隨機實例
import uuid
from clickhouse_orm.contrib.geo.fields import Point
# 同步寫入100萬條residence
sync_db.insert(
(Residence(uuid=str(uuid.uuid4()), geo=Point(120, 20)) for _ in range()),
batch_size=
)
# 異步寫入100萬條residence
async def insert():
...
await async_db.insert(
(Residence(uuid=str(uuid.uuid4()), geo=Point(120, 20)) for _ in range()),
batch_size=
)
示例中僅對uuid
和geo
列進行賦值,其他字段會被設置為默認值(而非None值)
可以看看residence
表中有多少條數據
# 同步方式查詢Residence行數
Residence.objects_in(sync_db).count()
# 異步方式查詢Residence行數
async def read_count():
...
await Residence.objects_in(async_db).count()
ch-orm實現了QuerySet,暴露API基本參照Django設計的,如前述的獲取表行數的count()
方法就來自QuerySet
。
與Django不同的是,ch-orm僅將QuerySet作為查詢實例,不具備查詢結果緩存功能,這代表如果對一個QuerySet對象執行兩次迭代,與后端數據庫的交互將變成兩次而非一次。
可以通過Model的類方法objects_in
獲得一個QuerySet
實例,接著來查詢uuid="48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30"
的數據
# 同步API
queryset = Residence.objects_in(sync_db)
queryset = queryset.filter(Residence.uuid == "48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30")
result = list(queryset)
# 對于異步API
queryset = Residence.objects_in(async_db)
queryset = queryset.filter(Residence.uuid == "48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30")
result = [_ async for _ in queryset]
真正的查詢請求是在對queryset迭代時處理的,因此下列兩行代碼不會與數據庫后端進行交互
queryset = Residence.objects_in(sync_db)
queryset = queryset.filter(Residence.uuid == "48d75e4d-8e6f-4acd-a2e9-f4c3059b5b30")
最終得到一個由Residence實例的組成的結果列表result。
ch-orm具備日常使用的大多數場景功能
執行原生查詢并創建動態對象
現有表生成模型類
F函數
Q查詢
Aggregation 聚合查詢
order_by 查詢排序
distinct 結果去重
Pagination 查詢分頁
表引擎
...
這些內容Github倉庫有相應的文檔,限于本文篇幅這里就不再過多介紹。
文章標題:Python使用ch-orm對ClickHouse簡單查詢及寫入
分享地址:http://m.kartarina.com/article10/dsogjgo.html
成都網站建設公司_創新互聯,為您提供網站內鏈、網站維護、Google、網站建設、靜態網站、品牌網站設計
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯