using Newtonsoft.Json.Linq; using Npgsql; using System; using System.Collections.Generic; using System.Data; using System.Data.SQLite; using System.Linq; namespace Ramitta { public class PostgreSQL : IDisposable { private NpgsqlConnection db; private bool disposed = false; // 构造函数,初始化数据库连接 public PostgreSQL(string connectionString, bool readOnly = false) { var connString = connectionString; if (readOnly) { if (!connString.Contains("CommandTimeout")) { connString += ";CommandTimeout=0"; } } db = new NpgsqlConnection(connString); db.Open(); } // 创建表:根据表名和字段定义创建表 public void CreateTable(string tableName, Dictionary columns) { // 构建列定义的字符串 var columnsDefinition = string.Join(", ", columns.Select(c => $"\"{c.Key}\" {MapDataType(c.Value)}")); string createTableQuery = $"CREATE TABLE IF NOT EXISTS \"{tableName}\" ({columnsDefinition});"; using (var cmd = new NpgsqlCommand(createTableQuery, db)) { cmd.ExecuteNonQuery(); } } // 数据类型映射(SQLite到PostgreSQL) private string MapDataType(string sqliteType) { return sqliteType.ToUpper() switch { "INTEGER" => "INTEGER", "TEXT" => "TEXT", "REAL" => "REAL", "BLOB" => "BYTEA", "NUMERIC" => "NUMERIC", "BOOLEAN" => "BOOLEAN", "DATE" => "DATE", "DATETIME" => "TIMESTAMP", "TIMESTAMP" => "TIMESTAMP", _ => "TEXT" // 默认映射为TEXT }; } // 获取数据库中所有表名 public List GetAllTableNames() { List tableNames = new List(); string query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';"; using (var cmd = new NpgsqlCommand(query, db)) using (var reader = cmd.ExecuteReader()) { while (reader.Read()) { string tableName = reader.GetString(0); tableNames.Add(tableName); } } return tableNames; } // 向已存在的表中添加新列 public void AddColumn(string tableName, string columnName, string columnType) { // 检查表是否存在 if (!TableExists(tableName)) { throw new ArgumentException($"表 '{tableName}' 不存在"); } // 检查列是否已存在 if (ColumnExists(tableName, columnName)) { Console.WriteLine($"列 '{columnName}' 在表 '{tableName}' 中已存在"); return; } // 构建添加列的SQL语句 string addColumnQuery = $"ALTER TABLE \"{tableName}\" ADD COLUMN \"{columnName}\" {MapDataType(columnType)};"; using (var cmd = new NpgsqlCommand(addColumnQuery, db)) { cmd.ExecuteNonQuery(); } Console.WriteLine($"已向表 '{tableName}' 添加列 '{columnName}'"); } // 检查表是否存在 private bool TableExists(string tableName) { string query = "SELECT count(*) FROM information_schema.tables WHERE table_schema = 'public' AND table_name = @tableName;"; using (var cmd = new NpgsqlCommand(query, db)) { cmd.Parameters.AddWithValue("@tableName", tableName); var result = cmd.ExecuteScalar(); return Convert.ToInt64(result) > 0; } } // 检查列是否已存在 private bool ColumnExists(string tableName, string columnName) { string query = "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = @tableName;"; using (var cmd = new NpgsqlCommand(query, db)) { cmd.Parameters.AddWithValue("@tableName", tableName); using (var reader = cmd.ExecuteReader()) { while (reader.Read()) { if (reader["column_name"].ToString().Equals(columnName, StringComparison.OrdinalIgnoreCase)) { return true; } } } } return false; } // 批量添加多个列 public void AddColumns(string tableName, Dictionary columns) { foreach (var column in columns) { AddColumn(tableName, column.Key, column.Value); } } // 插入数据:向指定表插入一条记录 // 参数: tableName - 表名 // 参数: columnValues - 字段和对应值的字典 // 例如: InsertData("Users", new Dictionary { {"Name", "John"}, {"Age", 30} }); public void InsertData(string tableName, Dictionary columnValues) { // 构建列名字符串(添加引号防止保留关键字冲突) var columns = string.Join(", ", columnValues.Keys.Select(k => $"\"{k}\"")); // 判断是否需要 JSON 转换,并构建参数占位符 var parameters = columnValues.Keys.Select(k => { var value = columnValues[k]; if (value is JObject || value is JArray) // 判断是否是 JObject 或 JArray { return $"@{k}::json"; } return $"@{k}"; }); // 构建插入语句 string insertQuery = $"INSERT INTO \"{tableName}\" ({columns}) VALUES ({string.Join(", ", parameters)})"; // 使用 NpgsqlCommand 而不是 SQLiteCommand using (var cmd = new NpgsqlCommand(insertQuery, db)) { foreach (var kvp in columnValues) { if (kvp.Value is JObject jObj) { // JObject 转为字符串 cmd.Parameters.AddWithValue("@" + kvp.Key, jObj.ToString()); } else if (kvp.Value is JArray jArray) { // JArray 转为字符串 cmd.Parameters.AddWithValue("@" + kvp.Key, jArray.ToString()); } else { // 处理 null 值 cmd.Parameters.AddWithValue("@" + kvp.Key, kvp.Value ?? DBNull.Value); } } cmd.ExecuteNonQuery(); } } // 查询数据:执行任意查询语句并返回结果 public List> SelectData(string query, Dictionary parameters = null) { var result = new List>(); using (var cmd = new NpgsqlCommand(query, db)) { // 添加查询参数(如果有的话) if (parameters != null) { foreach (var kvp in parameters) { cmd.Parameters.AddWithValue("@" + kvp.Key, kvp.Value ?? DBNull.Value); } } using (var reader = cmd.ExecuteReader()) { while (reader.Read()) { var row = new Dictionary(); for (int i = 0; i < reader.FieldCount; i++) { row[reader.GetName(i)] = reader.GetValue(i); } result.Add(row); } } } return result; } // 更新数据:根据条件更新指定表中的记录 public void UpdateData(string tableName, Dictionary columnValues, string condition) { // 构建SET子句 var setClause = string.Join(", ", columnValues.Keys.Select(k => $"\"{k}\" = @{k}")); string updateQuery = $"UPDATE \"{tableName}\" SET {setClause} WHERE {condition}"; using (var cmd = new NpgsqlCommand(updateQuery, db)) { foreach (var kvp in columnValues) { cmd.Parameters.AddWithValue("@" + kvp.Key, kvp.Value ?? DBNull.Value); } cmd.ExecuteNonQuery(); } } // 删除数据:根据条件删除指定表中的记录 public void DeleteData(string tableName, string condition) { string deleteQuery = $"DELETE FROM \"{tableName}\" WHERE {condition}"; using (var cmd = new NpgsqlCommand(deleteQuery, db)) { cmd.ExecuteNonQuery(); } } // 支持事务操作:允许在同一个事务中执行多个操作 public void ExecuteTransaction(Action transactionActions) { using (var transaction = db.BeginTransaction()) { try { transactionActions.Invoke(); // 执行多个操作 transaction.Commit(); // 提交事务 } catch (Exception) { transaction.Rollback(); // 回滚事务 throw; } } } // 删除所有表 public void DropAllTables() { // 获取所有表名 string getTablesQuery = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';"; var tables = SelectData(getTablesQuery); foreach (var table in tables) { string tableName = table["table_name"].ToString(); string dropTableQuery = $"DROP TABLE IF EXISTS \"{tableName}\" CASCADE;"; using (var cmd = new NpgsqlCommand(dropTableQuery, db)) { cmd.ExecuteNonQuery(); } } } public int ExecuteBatchInsert(string tableName, List columns, string columnsStr, List> batch, int batchIndex) { var valueParams = new List(); var parameters = new Dictionary(); for (int i = 0; i < batch.Count; i++) { var paramNames = columns.Select(col => $"@p{batchIndex}_{i}_{col}").ToList(); valueParams.Add($"({string.Join(", ", paramNames)})"); foreach (var col in columns) { parameters[$"p{batchIndex}_{i}_{col}"] = batch[i][col] ?? DBNull.Value; } } string insertQuery = $"INSERT INTO \"{tableName}\" ({columnsStr}) VALUES {string.Join(", ", valueParams)}"; using (var transaction = db.BeginTransaction()) using (var cmd = new NpgsqlCommand(insertQuery, db, transaction)) { foreach (var param in parameters) { cmd.Parameters.AddWithValue(param.Key, param.Value); } int result = cmd.ExecuteNonQuery(); transaction.Commit(); return result; } } // 修改主方法 public int BulkInsert(string tableName, List> dataList) { if (dataList == null || dataList.Count == 0) return 0; int insertedCount = 0; var columns = dataList[0].Keys.ToList(); var columnsStr = string.Join(", ", columns.Select(c => $"\"{c}\"")); int batchSize = 500; int batchIndex = 0; for (int i = 0; i < dataList.Count; i += batchSize) { var batch = dataList.Skip(i).Take(batchSize).ToList(); insertedCount += ExecuteBatchInsert(tableName, columns, columnsStr, batch, batchIndex); batchIndex++; } return insertedCount; } // 释放资源,关闭数据库连接 public void Dispose() { if (!disposed) { if (db != null && db.State == ConnectionState.Open) { db.Close(); db.Dispose(); } disposed = true; } GC.SuppressFinalize(this); } // 析构函数,调用Dispose释放资源 ~PostgreSQL() { Dispose(); } // PostgreSQL特有的方法:创建连接字符串的便捷方法 public static string BuildConnectionString(string host, string database, string username, string password, int port = 5432) { return $"Host={host};Port={port};Database={database};Username={username};Password={password}"; } } }