Whitepaper
Docs
Sign In
Tool
Tool
v0.4.2
nocodb
Tool ID
nocodb
Creator
@notmmao
Downloads
16+
nocodb管理
Get
README
No README available
Tool Code
Show
""" title: Nocodb author: notmmao author_url: https://notmmao.com description: 提供nocodb api的封装,支持查询、新增、修改、删除、上传文件等操作。 version: 0.4.2 licence: MIT create_at: 2025-02-21 update_at: 2025-03-19 """ import json from typing import List, Dict, Optional from pydantic import BaseModel, Field import requests import logging logger = logging.getLogger("nocodb") class NocodbClient: """NocodbClient is a client for nocodb api. 查询返回的数据格式如下: { 'list': [{'Id': 1}], 'pageInfo': { 'totalRows': 1, 'page': 1, 'pageSize': 25, 'isFirstPage': True, 'isLastPage': True}, 'stats': {'dbQueryTime': '11.463'} } """ def __init__(self, url: str, api_token: str, project_id: str, **kwargs): self.host_url = url self.base_url = url + "/api/v2" self.base_url_v1 = url + "/api/v1" self.orgs = "noco" # v1版本API缺省参数 self.api_token = api_token self.project_id = project_id self.config = kwargs self.session = requests.Session() self.session.headers.update(self._get_headers()) def _get_headers(self): if self.api_token: headers = {"xc-token": self.api_token} else: headers = {} return headers def get_projects(self): url = f"{self.base_url}/meta/bases/" headers = self._get_headers() response = self.session.get(url, headers=headers) return response.json() def get_tables(self, full_info=False, project_id=None): """Get tables in the project""" project_id = project_id or self.project_id url = f"{self.base_url}/meta/bases/{project_id}/tables" headers = self._get_headers() response = self.session.get(url, headers=headers) resp = response.json() tables = {} if "error" in resp: print(resp) return tables for item in resp["list"]: tables[item["title"]] = item if full_info else item["id"] return tables def get_table_metadata(self, table_id): """Get table metadata""" url = f"{self.base_url}/meta/tables/{table_id}" response = self.session.get(url) return response.json() def create_table(self, table_meta, project_id=None): """Create table""" project_id = project_id or self.project_id url = f"{self.base_url}/meta/bases/{project_id}/tables" response = self.session.post(url, json=table_meta).json() if "error" in response: print(response) return response def delete_table(self, table_id): """Delete table""" url = f"{self.base_url}/meta/tables/{table_id}" response = self.session.delete(url) return response.json() def get_table_links(self, table_id, full_info=False): """Get table link fields""" coulmns = self.get_table_metadata(table_id).get("columns", []) links = {} for col in coulmns: if col["uidt"] in ("LinkToAnotherRecord", "Links"): links[col["title"]] = col if full_info else col["id"] return links def get_views(self, table_id): """Get tables in the project""" url = f"{self.base_url}/meta/tables/{table_id}/views" headers = self._get_headers() views = {} response = self.session.get(url, headers=headers) resp = response.json() if "error" in resp: print(resp) return views for item in resp["list"]: views[item["title"]] = item["id"] return views def count(self, table_id, params: dict = None, view_id=None): """Count records in table ref: https://data-apis-v2.nocodb.com/#tag/Table-Records/operation/db-data-table-row-count Args: table_id: 表格id params: 查询参数, dict, 支持 where where: str, like "(field1,eq,value1)~and(field2,eq,value2)" view_id: 视图id Returns dict: response from nocode api int: count """ url = f"{self.base_url}/tables/{table_id}/records/count" _params = { "where": "", } if view_id: _params["viewId"] = view_id if params: _params.update(params) response = self.session.get(url, params=_params) resp = response.json() if "error" in resp: print(resp) return resp def get( self, table_id, params: dict = None, view_id=None, fields=None, sort=None, offset=0, limit=25, ): """Get records from table ref: https://data-apis-v2.nocodb.com/#tag/Table-Records/operation/db-data-table-row-list Args: table_id: 表格id params: 查询参数, dict, 支持 where, sort, offset, limit, filds where: str, like "(field1,eq,value1)~and(field2,eq,value2)" sort: str, like "field1,-field2" filds: str or list, like "field1,field2" offset: int, default 0 limit: int, default 25 view_id: 视图id fields: 要查询的字段, str or list sort: 排序字段, str offset: 查询偏移量, 默认0 limit: 查询条数, 默认25 Returns: dict: response from nocode api list: [{"Id": 1}, {"Id": 2}] pageInfo: dict totalRows: int page: int pageSize: int isFirstPage: bool isLastPage: bool """ url = f"{self.base_url}/tables/{table_id}/records" _params = { "offset": offset, "limit": limit, "where": "", } if view_id: _params["viewId"] = view_id if fields: if isinstance(fields, list): fields = ",".join(fields) # list to str _params["fields"] = fields if sort: _params["sort"] = sort if params: _params.update(params) headers = self._get_headers() response = self.session.get(url, headers=headers, params=_params) resp = response.json() if "error" in resp: print(resp) return resp def get_one(self, table_id, id): """Get one record from table""" url = f"{self.base_url}/tables/{table_id}/records/{id}" headers = self._get_headers() response = self.session.get(url, headers=headers) return response.json() def group_by( self, table_id, group_fields, params=None, view_id=None, offset=0, limit=25, sort=None, ): """Group by field""" url = f"{self.base_url_v1}/db/data/{self.orgs}/{self.project_id}/{table_id}/groupby" if isinstance(group_fields, list): group_fields = ",".join(group_fields) # list to str _params = { "column_name": group_fields, "offset": offset, "limit": limit, "where": "", } print(f"group_by sort: {sort}") if sort: _params["sort"] = sort if view_id: _params["viewId"] = view_id if params: _params.update(params) response = self.session.get(url, params=_params) resp = response.json() if "error" in resp: print(resp) return resp def link(self, table_id, link_field_id, record_id, ids): """Link record to table""" url = f"{self.base_url}/tables/{table_id}/links/{link_field_id}/records/{record_id}" response = self.session.post(url, json=ids) resp = response.json() if type(resp) == "dict" and "error" in resp: print(resp) return resp def unlink(self, table_id, link_field_id, record_id, ids): """Unlink record from table""" url = f"{self.base_url}/tables/{table_id}/links/{link_field_id}/records/{record_id}" response = self.session.delete(url, json=ids) resp = response.json() if type(resp) == "dict" and "error" in resp: print(resp) return resp def add(self, table_id, rows): """Add records to table""" url = f"{self.base_url}/tables/{table_id}/records" headers = self._get_headers() response = self.session.post(url, headers=headers, json=rows) resp = response.json() if type(resp) == "error" in resp: print(resp) return resp def update(self, table_id, rows): """Update records in table""" url = f"{self.base_url}/tables/{table_id}/records" headers = self._get_headers() response = self.session.patch(url, headers=headers, json=rows) return response.json() def delete(self, table_id, ids): """Delete records from table""" url = f"{self.base_url}/tables/{table_id}/records" headers = self._get_headers() response = self.session.delete(url, headers=headers, json=ids) return response.json() def add_one( self, table_id, item, key="Id", attachments=None, update_if_exists=False ): """Add one record to table, if the record exists, skip Args: table_id: table id item: record to add key: key to check if the record exists attachments: list of fields to upload as attachments Returns: dict: response from nocode api """ url = f"{self.base_url}/tables/{table_id}/records" headers = self._get_headers() if key: # 只有指定了key才做排重检查 _params = {"fields": "Id", "where": f"({key},eq,{item[key]})"} r = self.session.get(url, headers=headers, params=_params).json() if r["pageInfo"]["totalRows"] > 0: if update_if_exists: item["Id"] = r["list"][0]["Id"] # 使用第一个元素的Id, 用于更新 else: print(f"{item[key]} exists, skip") return r if attachments: for attachment in attachments: file_to_upload = item[attachment] item[attachment] = self.upload_file(file_to_upload) if "Id" in item: response = self.session.patch(url, headers=headers, json=item) else: response = self.session.post(url, headers=headers, json=item) ret = response.json() if "error" in ret: print(f"error: {ret}") return ret def upload_file(self, file_path): """Upload file to nocodb storage""" url = f"{self.base_url}/storage/upload" headers = self._get_headers() if isinstance(file_path, str): fd = open(file_path, "rb") else: fd = file_path files = {"file": fd} # TODO: file_name, mimetype response = self.session.post(url, headers=headers, files=files) return response.json() def upload_file_tuple(self, file_tuple): """Upload file to nocodb storage Args: file_tuple: (file_path, file_stream, mimetype) Returns: dict: response from nocode api """ url = f"{self.base_url}/storage/upload" headers = self._get_headers() files = {"file": file_tuple} response = self.session.post(url, headers=headers, files=files) return response.json() def upload_files(self, files): """Upload files to nocodb storage Args: files: {"file" : (file_path, file_stream, mimetype)} Returns: dict: response from nocode api """ url = f"{self.base_url}/storage/upload" headers = self._get_headers() response = self.session.post(url, headers=headers, files=files) return response.json() def upload_urls(self, urls): """Upload urls to nocodb storage Args: urls: [{"url":"http://xxxx.xxx/xxx/xxx.jpg"}] Returns: dict: response from nocode api """ if isinstance(urls, str): urls = [{"url": urls}] # 鲁棒性 url = f"{self.base_url}/storage/upload-by-url" response = self.session.post(url, json=urls) return response.json() def get_upload_file(self, path): """Get file from nocodb storage""" url = f"{self.host_url}/{path}" return self.session.get(url, stream=True) def get_db(tool, project_id: Optional[str] = None): tool.target_project_id = ( project_id or tool.target_project_id or tool.valves.project_id ) logger.info(f"{tool.target_project_id=}") nocodb = NocodbClient(tool.valves.url, tool.valves.api_key, tool.target_project_id) return nocodb class Tools: def __init__(self): """Initialize the Tool.""" self.citation = True self.valves = self.Valves() self.target_project_id = None class Valves(BaseModel): url: str = Field(default="", description="NOCODB Baseurl") api_key: str = Field( default="", description="NOCODB API key", ) project_id: str = Field(default="", description="NOCODB PROJECT ID") def my_nocodb_project(self) -> str: """列出我的所有数据库""" db = get_db(self) resp = db.get_projects() return json.dumps(resp, ensure_ascii=False) def my_nocodb_tables(self, project_id: str = None) -> str: """列出我的数据库中的所有表 :param project_id: 数据库id, 可选. 为None时使用默认数据库. """ db = get_db(self, project_id) lines = [] lines.append("表名,表ID") for k, v in db.get_tables().items(): lines.append(f"{k},{v}") resp = "\n".join(lines) return resp def my_nocodb_views(self, table_name: str) -> str: """列出我的数据库指定表的所有视图 :param table_name: 数据表名称 :return: 视图集合,首行为:视图,视图ID """ db = get_db(self) tables = db.get_tables() tables = {k.lower(): v for k, v in tables.items()} _table_id = tables.get(table_name.lower()) if not _table_id: return "数据表不存在" lines = [] lines.append("视图,视图ID") for k, v in db.get_views(_table_id).items(): lines.append(f"{k},{v}") resp = "\n".join(lines) return resp def my_nocodb_records( self, table_name: str, offset: int = 0, limit: int = 25, ) -> str: """从数据表中查询记录 :param table_name: 数据表名称 :param offset: 查询偏移量, 默认0 :param limit: 查询条数, 默认25 """ logger.info(f"{table_name=}") db = get_db(self) _tables = db.get_tables() _tables = {k.lower(): v for k, v in _tables.items()} _table_id = _tables.get(table_name.lower()) if not _table_id: return "数据表不存在" resp = db.get(_table_id, None, None, None, None, int(offset), int(limit)) return json.dumps(resp, ensure_ascii=False)