当前位置:首页>行情>DolphinDB接入币安交易所获取行情数据(一)

DolphinDB接入币安交易所获取行情数据(一)

  • 2026-05-10 21:06:26
DolphinDB接入币安交易所获取行情数据(一)

环境要求

项目
要求
说明
版本
≥ 3.00.4
脚本使用 TSDB engine、fromStdJson 等新特性
插件
WebSocket、httpClient
Linux系统
网络代理
http://192.168.110.5:7890/
科学网络

数据库架构

数据库
用途
包含表
dfs://CryptoKLine
K线数据
kline_1m, kline_daily, continuousKline, indexKline, markKline
dfs://CryptoTick
逐笔数据
trade, depth, bookTicker, depthTradeMerge, tickTrade
dfs://CryptoDaily
低频数据
ticker, fundingRate, openInterest, liquidation, contractInfo, precision, settlement
dfs://CryptoFactor
因子数据
factor_1min
dfs://CryptoOrderBook
订单簿
orderbook, rawOrderbook
dfs://CryptoAccount
账户数据
order, execution, position, balance

启动步骤

  • [x] 启动 DolphinDB节点
  • [x] 加载插件
      loadPlugin("WebSocket")
      loadPlugin("httpClient")
    或者
      # 编辑clusterDemo/config/cluster.cfg 
      persistenceDir=/opt/dolphindb/persistence
      preloadModules=plugins::httpClient,plugins::WebSocket
  • [x] 重新启动需要删除库、删除表、取消订阅
    // 获取所有流表的发布统计信息
    getStreamingStat().pubTables
    // 查看websocket订阅状态
    WebSocket::getSubJobStat()

    // 停止所有订阅:
    WebSocket::cancelSubJob("spot_depth")
    // 取消流表订阅
    unsubscribeTable(tableName="Crypto_depthST", actionName="insertDB")
    unsubscribeTable(tableName="Crypto_minKLineST", actionName="insertDB")
    // 删除共享流表
    dropStreamTable("Crypto_depthST")
    dropStreamTable("Crypto_minKLineST")
    // 只删除持久化流表数据
    clearTablePersistence(Crypto_depthST)
    clearTablePersistence(Crypto_minKLineST)
    // 取消共享流表
    undef("Crypto_depthST", SHARED)
    undef("Crypto_minKLineST", SHARED)
    // 删除数据库
    dropDatabase("dfs://CryptoDaily")
  • [x] 初始化数据库和流表
    run("/opt/dolphindb/server/data/Win-computenode/Cryptocurrency/createDatabase.dos")
  • [x] 启动实时 WebSocket 采集器
    run("/opt/dolphindb/server/data/Win-computenode/Cryptocurrency/Binance/Binance_Depth_ws.dos")
    run("/opt/dolphindb/server/data/Win-computenode/Cryptocurrency/Binance/Binance_Trade_ws.dos")
    run("/opt/dolphindb/server/data/Win-computenode/Cryptocurrency/Binance/Binance_KLine_ws.dos")
    run("/opt/dolphindb/server/data/Win-computenode/Cryptocurrency/Binance/Binance_Ticker_ws.dos")
    run("/opt/dolphindb/server/data/Win-computenode/Cryptocurrency/Binance/Binance_Liquidation_ws.dos")
    run("/opt/dolphindb/server/data/Win-computenode/Cryptocurrency/Binance/Binance_ContractInfo_ws.dos")
  • [x] 启动资金费率定时任
    run("/opt/dolphindb/server/data/Win-computenode/Cryptocurrency/Binance/Binance_FundingRate_http.dos")
  • [x] 按需回补历史数据(手动执行函数调用)
      run("/opt/dolphindb/server/data/Win-computenode/Cryptocurrency/Binance/Binance_historyAggTrades_http.dos")
    // 然后调用 fetchHistoryAggTrades(...)
      run("/opt/dolphindb/server/data/Win-computenode/Cryptocurrency/Binance/Binance_historyKLine_http.dos")
    // 然后调用 fetchHistoryKLines(...)
  • [x] 导出CSV文件:
    t = select * from loadTable("dfs://binance_spot""trade") where event_time >= 2026.04.26 and event_time < 2026.04.27
    saveText(t, "/home/firebirds/Documents/spot_trade_20260426.csv")

createDatabase.dos

// ============================================================
// Cryptocurrency Database Schema
// Exchanges: Binance-Spot, Binance-UM, Binance-CM, Binance-Margin, BitMEX, OKX
// Architecture: 6 databases by data frequency & access pattern
// ============================================================


// ============================================================
// 1. dfs://CryptoKLine — K线数据 (OLAP, RANGE by MONTH)
//    kline_1m, kline_daily, continuousKline, indexKline, markKline
// ============================================================
dbName = "dfs://CryptoKLine"
if(!existsDatabase(dbName)){
    db = database(dbName, RANGE, 2010.01M+(0..30)*12)
}else{ db=database(dbName)}

// 1.1 分钟K线 (所有交易所统一格式)
tbName = "kline_1m"
streamTbName = "Crypto_minKLineST"
colNames = `eventTime`collectionTime`exchange`marketType`symbol`open`high`low`close`volume`quoteVolume`trades`takerBuyBase`takerBuyQuote
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime)
}
// 流表: 按exchange+symbol+eventTime去重
enableTableShareAndPersistence(table=keyedStreamTable(`
exchange`symbol`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
kline1mTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=kline1mTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 1.2 日K线
tbName = "kline_daily"
colNames = `eventTime`collectionTime`exchange`marketType`symbol`open`high`low`close`volume`quoteVolume`trades`takerBuyBase`takerBuyQuote
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime)
}


// 1.3 连续合约K线 (仅期货)
tbName = "continuousKline"
streamTbName = "Crypto_continuousKLineST"
colNames = `
eventTime`collectionTime`exchange`marketType`symbol`contractType`open`high`low`close`volume`quoteVolume`trades`takerBuyBase`takerBuyQuote
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, STRING, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`
exchange`symbol`eventTime`contractType, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
continuousKlineTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=continuousKlineTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 1.4 指数价格K线
tbName = "indexKline"
colNames = `
eventTime`exchange`symbol`open`high`low`close
colTypes = [TIMESTAMP, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime)
}


// 1.5 标记价格K线
tbName = "markKline"
colNames = `
eventTime`exchange`symbol`open`high`low`close
colTypes = [TIMESTAMP, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime)
}


// ============================================================
// 2. dfs://CryptoTick — 逐笔数据 (TSDB, COMPO DATE + HASH SYMBOL)
//    trade, depth, bookTicker, depthTradeMerge, tickTrade
// ============================================================
dbName = "dfs://CryptoTick"
if(!existsDatabase(dbName)){
    dbDate = database("", VALUE, 2012.01.01..2012.01.30)
    dbSym  = database("", HASH, [SYMBOL, 4])
    db = database(dbName, COMPO, [dbDate, dbSym], engine='TSDB')
}else{ db=database(dbName)}


// 2.1 逐笔成交 (Binance aggTrade, BitMEX trade, OKX trades)
tbName = "trade"
streamTbName = "Crypto_aggTradeST"
colNames = `
eventTime`collectionTime`exchange`marketType`symbol`price`quantity`side`tradeId`firstId`lastId
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, STRING, LONG, LONG, LONG]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime`symbol, sortColumns=`exchange`symbol`eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`
exchange`symbol`lastId, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
tradeTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=tradeTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 2.2 增量深度 (Binance depth, BitMEX orderBookL2, OKX books)
tbName = "depth"
streamTbName = "Crypto_depthST"
colNames = `eventTime`collectionTime`exchange`marketType`symbol`bidPrice`bidQty`bidOrders`askPrice`askQty`askOrders`firstId`lastId`prevLastId
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], LONG, LONG, LONG]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime`symbol, sortColumns=`exchange`marketType`symbol`eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`exchange`marketType`symbol`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
depthTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=depthTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 2.3 最优买卖价 (Binance bookTicker, OKX books5, BitMEX quote)
tbName = "bookTicker"
streamTbName = "Crypto_bookTickerST"
colNames = `eventTime`collectionTime`exchange`marketType`symbol`bidPrice`bidQty`askPrice`askQty`updateId
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime`symbol, sortColumns=`exchange`marketType`symbol`eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`exchange`symbol`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
bookTickerTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=bookTickerTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 2.4 深度交易合并视图
tbName = "depthTradeMerge"
streamTbName = "Crypto_depthTradeMergeST"
colNames = `
symbol`exchange`timestamp`bidPrice`bidQty`offerPrice`offerQty`lastPrice`totalBidQty`highPrice`lowPrice
colTypes = [SYMBOL, SYMBOL, TIMESTAMP, DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE, DOUBLE, DOUBLE, DOUBLE]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `timestamp`symbol, sortColumns=`exchange`symbol`timestamp)
}
enableTableShareAndPersistence(table=keyedStreamTable(`
exchange`symbol`timestamp, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go


// 2.5 逐笔成交 (带成交额)
tbName = "tickTrade"
streamTbName = "Crypto_tickTradeST"
colNames = `eventTime`collectionTime`exchange`symbol`tradeId`price`quantity`quoteQty`side
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, LONG, DOUBLE, DOUBLE, DOUBLE, STRING]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `
eventTime`symbol, sortColumns=`exchange`symbol`eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`exchange`symbol`tradeId, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
tickTradeTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=tickTradeTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// ============================================================
// 3. dfs://CryptoDaily — 低频日线级数据 (OLAP, RANGE by MONTH)
//    ticker, fundingRate, openInterest, liquidation, contractInfo, precision, settlement
// ============================================================
dbName = "dfs://CryptoDaily"
if(!existsDatabase(dbName)){
    db = database(dbName, RANGE, 2010.01M+(0..30)*60)
}else{ db=database(dbName)}


// 3.1 24h行情快照
tbName = "ticker"
streamTbName = "Crypto_tickerST"
colNames = `
eventTime`collectionTime`exchange`marketType`symbol`priceChange`priceChangePct`weightedAvg`lastPrice`lastQty`bestBid`bestBidQty`bestAsk`bestAskQty`openPrice`highPrice`lowPrice`volume`quoteVolume`trades
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `
eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`exchange`symbol`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
tickerTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=tickerTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 3.2 资金费率
tbName = "fundingRate"
streamTbName = "Crypto_fundingRateST"
colNames = `
eventTime`collectionTime`exchange`symbol`fundingRate`markPrice`nextFundingTime
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, DOUBLE, DOUBLE, TIMESTAMP]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`
exchange`symbol`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
fundingRateTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=fundingRateTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 3.3 持仓量 (Binance openInterest, OKX open-interest)
tbName = "openInterest"
streamTbName = "Crypto_openInterestST"
colNames = `eventTime`collectionTime`exchange`symbol`openInterest`openInterestValue
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, DOUBLE, DOUBLE]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`
exchange`symbol`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
openInterestTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=openInterestTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 3.4 强平订单
tbName = "liquidation"
streamTbName = "Crypto_liquidationST"
colNames = `eventTime`collectionTime`exchange`marketType`symbol`side`type`timeInForce`quantity`price`avgPrice`status`lastFilledQty`filledAccumQty
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, STRING, STRING, STRING, DOUBLE, DOUBLE, DOUBLE, STRING, DOUBLE, DOUBLE]
if(!existsTable(dbName,tbName)){
    createDimensionTable(db, table(1:0, colNames, colTypes), tbName)
}
enableTableShareAndPersistence(table=keyedStreamTable(`exchange`symbol`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
liquidationTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=append!{liquidationTb,}, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 3.5 合约信息
tbName = "contractInfo"
streamTbName = "Crypto_contractInfoST"
colNames = `
eventTime`collectionTime`exchange`symbol`contractType`contractDirection`deliveryDatetime`onboardDatetime`contractStatus`notionalBracket`floorNotional`capNotional`maintenanceRatio`auxiliaryNumber`minLeverage`maxLeverage`settleCurrency`tickSize`lotSize`minSize`contractValue`contractMultiplier`contractValueCurrency
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, STRING, STRING, TIMESTAMP, TIMESTAMP, STRING, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, STRING, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, STRING]
if(!existsTable(dbName,tbName)){
    createDimensionTable(db, table(1:0, colNames, colTypes), tbName)
}
enableTableShareAndPersistence(table=keyedStreamTable(`exchange`symbol`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
contractInfoTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=append!{contractInfoTb,}, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 3.6 精度信息
tbName = "precision"
colNames = `
symbol`exchange`contractType`tickSize`stepSize
colTypes = [SYMBOL, SYMBOL, STRING, DOUBLE, DOUBLE]
if(!existsTable(dbName,tbName)){
    createDimensionTable(db, table(1:0, colNames, colTypes), tbName)
}


// 3.7 结算数据 (BitMEX settlement)
tbName = "settlement"
colNames = `eventTime`exchange`symbol`settlementType`price
colTypes = [TIMESTAMP, SYMBOL, SYMBOL, STRING, DOUBLE]
if(!existsTable(dbName,tbName)){
    createDimensionTable(db, table(1:0, colNames, colTypes), tbName)
}


// ============================================================
// 4. dfs://CryptoFactor — 因子数据 (TSDB, COMPO MONTH + FACTORNAME)
//    factor_1min
// ============================================================
dbName = "dfs://CryptoFactor"
tbName = "factor_1min"
colNames = `
datetime`symbol`exchange`factorname`factorvalue
colTypes = [TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE]
if(!existsDatabase(dbName)){
    db_date = database("", VALUE, 2025.07M..2025.08M)
    db_factor = database("", VALUE, ["factor1"])
    db = database(dbName, COMPO, [db_date, db_factor], engine="TSDB")
}else{ db=database(dbName)}
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, partitionColumns=`datetime`factorname,
        compressMethods={"datetime":"delta"}, sortColumns=`exchange`symbol`datetime)
}


// ============================================================
// 5. dfs://CryptoOrderBook — 全量订单簿 (TSDB, COMPO DATEHOUR + SYMBOL)
//    orderbook, rawOrderbook
// ============================================================
dbName = "dfs://CryptoOrderBook"
if(!existsDatabase(dbName)){
    db1 = database("", VALUE, datehour(2025.12.02T00:00:00)..datehour(2025.12.03T00:00:00))
    db2 = database("", VALUE, ["symbol1"])
    db = database(dbName, COMPO, [db1, db2], engine="TSDB")
}else{ db=database(dbName)}

// 5.1 订单簿快照
tbName = "orderbook"
colNames = `
isIncremental`exchange`symbol`askPrice`askVolume`askNum`bidPrice`bidVolume`bidNum`checksum`prevSeqId`seqId`updateTime
colTypes = [BOOL, SYMBOL, SYMBOL, DOUBLE[], DOUBLE[], INT[], DOUBLE[], DOUBLE[], INT[], LONG, LONG, LONG, TIMESTAMP]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, partitionColumns=`updateTime`symbol, sortColumns=`symbol`updateTime)
}

// 5.2 原始订单簿数据
tbName = "rawOrderbook"
colNames = `isIncremental`exchange`symbol`askPrice`askVolume`askNum`bidPrice`bidVolume`bidNum`checksum`prevSeqId`seqId`updateTime
colTypes = [BOOL, SYMBOL, SYMBOL, DOUBLE[], DOUBLE[], INT[], DOUBLE[], DOUBLE[], INT[], LONG, LONG, LONG, TIMESTAMP]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, partitionColumns=`
updateTime`symbol, sortColumns=`symbol`updateTime)
}


// ============================================================
// 6. dfs://CryptoAccount — 账户/交易数据 (TSDB, COMPO DATE + HASH EXCHANGE)
//    order, execution, position, balance
//    Private WebSocket streams from all exchanges
// ============================================================
dbName = "dfs://CryptoAccount"
if(!existsDatabase(dbName)){
    dbDate = database("", VALUE, 2012.01.01..2012.01.30)
    dbExch = database("", HASH, [SYMBOL, 2])
    db = database(dbName, COMPO, [dbDate, dbExch], engine='TSDB')
}else{ db=database(dbName)}

// 6.1 订单更新 (Binance ORDER_TRADE_UPDATE, BitMEX order, OKX orders)
tbName = "order"
streamTbName = "Crypto_orderST"
colNames = `
eventTime`exchange`marketType`symbol`orderId`clientOrderId`side`orderType`status`price`origQty`executedQty`avgPrice`stopPrice`timeInForce`commission`commissionAsset
colTypes = [TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, LONG, STRING, STRING, STRING, STRING, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, STRING, DOUBLE, SYMBOL]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime`exchange, sortColumns=`exchange`symbol`eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`
exchange`orderId`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
orderTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=orderTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 6.2 成交回报 (Binance executionReport, BitMEX execution, OKX orders fill)
tbName = "execution"
streamTbName = "Crypto_executionST"
colNames = `eventTime`exchange`marketType`symbol`orderId`tradeId`side`price`qty`commission`commissionAsset`isMaker
colTypes = [TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, LONG, LONG, STRING, DOUBLE, DOUBLE, DOUBLE, SYMBOL, BOOL]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime`exchange, sortColumns=`exchange`symbol`eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`
exchange`tradeId, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
executionTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=executionTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 6.3 持仓更新 (Binance ACCOUNT_UPDATE, BitMEX position, OKX positions)
tbName = "position"
streamTbName = "Crypto_positionST"
colNames = `
eventTime`exchange`marketType`symbol`side`size`avgEntryPrice`unrealizedPnl`leverage`marginType`liquidationPrice
colTypes = [TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, STRING, DOUBLE, DOUBLE, DOUBLE, DOUBLE, STRING, DOUBLE]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime`exchange, sortColumns=`exchange`symbol`eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`
exchange`symbol`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
positionTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=positionTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)


// 6.4 账户余额 (Binance outboundAccountPosition, BitMEX margin, OKX account)
tbName = "balance"
streamTbName = "Crypto_balanceST"
colNames = `eventTime`exchange`asset`totalBalance`availableBalance`crossMarginAsset`crossMarginBorrowed`crossMarginInterest`crossMarginLocked
colTypes = [TIMESTAMP, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
if(!existsTable(dbName,tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `
eventTime`exchange, sortColumns=`exchange`asset`eventTime)
}
enableTableShareAndPersistence(table=keyedStreamTable(`exchange`asset`eventTime, 10000:0, colNames, colTypes), tableName=streamTbName, cacheSize=100000, retentionMinutes=2880)
go
balanceTb = loadTable(dbName, tbName)
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2, handler=balanceTb, msgAsTable=true, batchSize=10000, throttle=1, persistOffset=true)

Binance_ContractInfo_ws.dos

/*
 * Binance_ContractInfo_ws.dos
 *
 * DolphinDB WebSocket 插件版 Binance 合约信息(contractInfo)采集器
 *
 * 支持市场:
 *   - USDT-M Futures  U本位合约(永续+交割)  wss://fstream.binance.com
 *   - Coin-M Futures  币本位合约(永续+交割)  wss://dstream.binance.com
 *
 * Note: Spot 无 contractInfo 流, 不支持
 *
 * 数据流: !contractInfo (全市场合约信息, 无需按品种订阅)
 * 每个事件可能包含多个 notional bracket (bks 数组), 每个 bracket 生成一行
 * 通过 ct 字段区分: PERPETUAL / CURRENT_QUARTER / NEXT_QUARTER
 *
 * Ref:
 *   USDs-M: https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams
 *   Coin-M: https://developers.binance.com/docs/derivatives/coin-margined-futures/websocket-market-streams
 *
 * 前置条件:
 *   - DolphinDB Server 2.00.12+
 *   - WebSocket 插件已安装: loadPlugin("WebSocket")
 *
 * 使用方法:
 *   loadPlugin("WebSocket")
 *   run("Binance_ContractInfo_ws.dos")
 */


// ============================================================
// 1. 创建数据库与流表
// ============================================================

dbName = "dfs://CryptoDaily"
tbName = "contractInfo"
streamTbName = "Crypto_contractInfoST"

colNames = `eventTime`collectionTime`exchange`symbol`contractType`contractDirection`deliveryDatetime`onboardDatetime`contractStatus`notionalBracket`floorNotional`capNotional`maintenanceRatio`auxiliaryNumber`minLeverage`maxLeverage`settleCurrency`tickSize`lotSize`minSize`contractValue`contractMultiplier`contractValueCurrency
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, STRING, STRING, TIMESTAMP, TIMESTAMP, STRING, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, STRING, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, STRING]

if(!existsDatabase(dbName)){
    db = database(dbName, RANGE, 2010.01M+(0..30)*60)
}else{
    db = database(dbName)
}
if(!existsTable(dbName, tbName)){
    createDimensionTable(db, table(1:0, colNames, colTypes), tbName)
}

try{
    enableTableShareAndPersistence(
        table=keyedStreamTable(`
exchange`symbol`eventTime, 10000:0, colNames, colTypes),
        tableName=streamTbName,
        cacheSize=100000,
        retentionMinutes=2880
    )
}catch(ex){}
go

contractInfoTb = loadTable(dbName, tbName)
try{ unsubscribeTable(tableName=streamTbName, actionName="insertDB") }catch(ex){}
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2,
               handler=append!{contractInfoTb,}, msgAsTable=true, batchSize=10000,
               throttle=1, persistOffset=true)

// ============================================================
// 2. 代理配置
// ============================================================

// 代理地址 (无代理则留空字符串)
proxy = "http://192.168.110.5:7890/"

// ============================================================
// 3. 工具函数
// ============================================================

// 检查字典是否包含指定 key
def dictHas(d, key) {
    keys = d.keys()
for(k in keys) { if(k == key) returntrue }
returnfalse
}

// 安全获取字典值, 不存在则返回 NULL
def dictGet(d, key) {
if(!dictHas(d, key)) return NULL
return d[key]
}

// 安全转换为 TIMESTAMP, 处理 NULL
def toTimestamp(v) {
if(isNull(v)) return timestamp(NULL)
return timestamp(long(v))
}

// ============================================================
// 4. WebSocket 回调函数
// ============================================================

def contractInfoOnOpen(ws) {
    writeLog("[ContractInfoWS] Connected")
}

def contractInfoOnError(ws, error) {
    writeLog("[ContractInfoWS] Error: " + string(error))
}

def contractInfoOnClose(ws, statusCode, msg) {
    writeLog("[ContractInfoWS] Closed, code=" + string(statusCode) + ", msg=" + string(msg))
}

// 核心 contractInfo 消息处理
// !contractInfo 事件格式 (USDT-M 和 Coin-M 结构相同):
//   {"e":"contractInfo","E":1672531200000,"s":"BTCUSDT","ps":"BTCUSDT",
//    "ct":"PERPETUAL","dt":4133404800000,"ot":1569398400000,"cs":"TRADING",
//    "bks":[{"bs":1,"bnf":0,"bnc":5000,"mmr":0.01,"cf":0,"mi":21,"ma":50},...]}
//
// Note:
//   - ps(pair) 字段为 Binance 官方返回, 但当前 schema 未包含, 如需可扩展
//   - bks 仅在 bracket 变更时出现, 其余事件中 bks 字段可能缺失
//   - dt 对永续合约为远期时间戳(~2101年), 非缺失值
//   - 永续: ct=PERPETUAL; 交割: ct=CURRENT_QUARTER / NEXT_QUARTER 等
def processContractInfo(ws, data, exchange, contractDirection) {
    n = data.rows()
if(n == 0return

    st = objByName("Crypto_contractInfoST")

for(i in0..(n-1)) {
try {
            s = string(data.msg[i])

// 处理币安应用层 ping
if(regexFind(s, "\"ping\"") >= 0 && regexFind(s, "\"pong\"") < 0) {
                pong = regexReplace(s, "\"ping\"""\"pong\"")
WebSocket::send(ws, blob(pong))
continue
            }

// 仅处理 contractInfo 事件 (快速过滤, 避免不必要的 fromStdJson 开销)
if(regexFind(s, "\"e\":\"contractInfo\"") < 0continue

// 含嵌套对象数组 bks, 必须用 fromStdJson 解析
            dict = fromStdJson(s)

// 解包组合流包装 (!contractInfo 是单流, 通常无包装)
            payload = dict
if(dictHas(dict, "data")) {
                payload = dict["data"]
            }

// 提取基础字段
            evt = toTimestamp(dictGet(payload, "E"))
            sym = string(dictGet(payload, "s"))
            ct  = string(dictGet(payload, "ct"))
            dtVal = toTimestamp(dictGet(payload, "dt"))
            otVal = toTimestamp(dictGet(payload, "ot"))
            cs  = string(dictGet(payload, "cs"))

// 获取 brackets 数组
            bks = dictGet(payload, "bks")
            bksSize = 0
if(!isNull(bks)) {
                bksSize = bks.size()
            }

            collectTime = now()

if(bksSize > 0) {
// 每个 bracket 生成一行
for(j in0..(bksSize-1)) {
                    bk = bks[j]
                    row = table(
                        evt as eventTime,
                        collectTime as collectionTime,
                        exchange as exchange,
                        sym as symbol,
                        ct as contractType,
                        contractDirection as contractDirection,
                        dtVal as deliveryDatetime,
                        otVal as onboardDatetime,
                        cs as contractStatus,
                        int(dictGet(bk, "bs")) as notionalBracket,
                        double(dictGet(bk, "bnf")) as floorNotional,
                        double(dictGet(bk, "bnc")) as capNotional,
                        double(dictGet(bk, "mmr")) as maintenanceRatio,
                        double(dictGet(bk, "cf")) as auxiliaryNumber,
                        double(dictGet(bk, "mi")) as minLeverage,
                        double(dictGet(bk, "ma")) as maxLeverage,
                        string(NULL) as settleCurrency,
                        double(NULL) as tickSize,
                        double(NULL) as lotSize,
                        double(NULL) as minSize,
                        double(NULL) as contractValue,
                        double(NULL) as contractMultiplier,
                        string(NULL) as contractValueCurrency
                    )
                    st.append!(row)
                }
            } else {
// 无 brackets - 写入一行空 bracket 值
                row = table(
                    evt as eventTime,
                    collectTime as collectionTime,
                    exchange as exchange,
                    sym as symbol,
                    ct as contractType,
                    contractDirection as contractDirection,
                    dtVal as deliveryDatetime,
                    otVal as onboardDatetime,
                    cs as contractStatus,
                    int(NULL) as notionalBracket,
                    double(NULL) as floorNotional,
                    double(NULL) as capNotional,
                    double(NULL) as maintenanceRatio,
                    double(NULL) as auxiliaryNumber,
                    double(NULL) as minLeverage,
                    double(NULL) as maxLeverage,
                    string(NULL) as settleCurrency,
                    double(NULL) as tickSize,
                    double(NULL) as lotSize,
                    double(NULL) as minSize,
                    double(NULL) as contractValue,
                    double(NULL) as contractMultiplier,
                    string(NULL) as contractValueCurrency
                )
                st.append!(row)
            }
        } catch(ex) {
            writeLog("[ContractInfoWS] Parse error: " + string(ex))
        }
    }
}

// 各市场 onMessage 包装函数
def umContractInfoOnMsg(ws, data) { processContractInfo(ws, data, "Binance-UM""linear") }
def cmContractInfoOnMsg(ws, data) { processContractInfo(ws, data, "Binance-CM""inverse") }

// ============================================================
// 5. 构建 WebSocket URL
// ============================================================
//
// !contractInfo 是全市场流, 无需按品种订阅:
//   USDT-M: wss://fstream.binance.com/ws/!contractInfo
//   Coin-M: wss://dstream.binance.com/ws/!contractInfo
// ============================================================

umUrl = "wss://fstream.binance.com/ws/!contractInfo"
cmUrl = "wss://dstream.binance.com/ws/!contractInfo"

// ============================================================
// 6. 启动 WebSocket 订阅
// ============================================================

config = dict(STRING, ANY)
if(strlen(proxy) > 0) config["proxy"] = proxy
config["reconnectCount"] = -1// 无限重连

// --- USDT-M Futures (U本位 永续+交割) ---
tryWebSocket::cancelSubJob("um_contractInfo") }catch(ex){}
ws1 = WebSocket::createSubJob(umUrl, contractInfoOnOpen, umContractInfoOnMsg, contractInfoOnError, contractInfoOnClose, "um_contractInfo", config)
writeLog("[ContractInfoWS] USDT-M started: " + umUrl)

// --- Coin-M Futures (币本位 永续+交割) ---
tryWebSocket::cancelSubJob("cm_contractInfo") }catch(ex){}
ws2 = WebSocket::createSubJob(cmUrl, contractInfoOnOpen, cmContractInfoOnMsg, contractInfoOnError, contractInfoOnClose, "cm_contractInfo", config)
writeLog("[ContractInfoWS] Coin-M started: " + cmUrl)

// ============================================================
// 7. 管理命令 (在 DolphinDB GUI 中手动执行)
// ============================================================
//
// 查看订阅状态:
//   WebSocket::getSubJobStat()
//
// 停止所有订阅:
//   WebSocket::cancelSubJob("um_contractInfo")
//   WebSocket::cancelSubJob("cm_contractInfo")
//
// 查看已采集数据:
//   select top 100 * from Crypto_contractInfoST
//   select count(*) from Crypto_contractInfoST group by exchange, contractType

Binance_Depth_ws.dos

/*
 * Binance_Depth_ws.dos
 *
 * DolphinDB WebSocket 插件版 Binance 增量深度(depthUpdate)采集器
 *
 * 支持市场:
 *   - Spot            现货           wss://stream.binance.com:9443
 *   - USDT-M Perpetual U本位永续     wss://fstream.binance.com
 *   - USDT-M Delivery  U本位交割     wss://fstream.binance.com
 *   - Coin-M Perpetual 币本位永续    wss://dstream.binance.com
 *   - Coin-M Delivery  币本位交割    wss://dstream.binance.com
 *
 * 数据流: <symbol>@depth@100ms (depthUpdate events)
 * 所有市场写入同一流表 Crypto_depthST, 通过 exchange + marketType 区分
 *
 * Note:
 *   Spot depthUpdate 没有 pu (prevLastId) 字段, 默认填 0
 *   深度数据含嵌套数组 b/a, 必须用 fromStdJson 解析 (regex 无法处理)
 *
 * Ref:
 *   Spot:   https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams/Partial-Book-Depth-Streams
 *   USDs-M: https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Partial-Book-Depth-Streams
 *   Coin-M: https://developers.binance.com/docs/derivatives/coin-margined-futures/websocket-market-streams/Partial-Book-Depth-Streams
 *
 * 前置条件:
 *   - DolphinDB Server 2.00.12+
 *   - WebSocket 插件已安装: installPlugin("WebSocket"); loadPlugin("WebSocket")
 *
 * 使用方法:
 *   loadPlugin("WebSocket")
 *   run("Binance_Depth_ws.dos")
 */


// ============================================================
// 1. 创建数据库与流表
// ============================================================

dbName = "dfs://CryptoTick"
tbName = "depth"
streamTbName = "Crypto_depthST"

colNames = `eventTime`collectionTime`exchange`marketType`symbol`bidPrice`bidQty`bidOrders`askPrice`askQty`askOrders`firstId`lastId`prevLastId
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], DOUBLE[], LONG, LONG, LONG]

if(!existsDatabase(dbName)){
    dbDate = database("", VALUE, 2012.01.01..2012.01.30)
    dbSym  = database("", HASH, [SYMBOL, 4])
    db = database(dbName, COMPO, [dbDate, dbSym], engine='TSDB')
}else{
    db = database(dbName)
}
if(!existsTable(dbName, tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime`symbol, sortColumns=`exchange`marketType`symbol`eventTime)
}

try{
    enableTableShareAndPersistence(
        table=keyedStreamTable(`exchange`marketType`symbol`eventTime, 10000:0, colNames, colTypes),
        tableName=streamTbName,
        cacheSize=100000,
        retentionMinutes=2880
    )
}catch(ex){}

depthTb = loadTable(dbName, tbName)
try{ unsubscribeTable(tableName=streamTbName, actionName="insertDB") }catch(ex){}
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2,
               handler=depthTb, msgAsTable=true, batchSize=10000,
               throttle=1, persistOffset=true)


// ============================================================
// 2. 品种与代理配置
// ============================================================

// 现货 & USDT-M 公共品种
usdtSymbols = ["btcusdt","ethusdt","solusdt","bnbusdt"]

// Coin-M 品种 (USD 计价)
coinSymbols = ["btcusd_perp","ethusd_perp"]

// USDT-M 交割品种 (有活跃季度合约时填入)
umDeliverySymbols = ["btcusdt_260925","ethusdt_260925"]

// Coin-M 交割品种 (有活跃季度合约时填入)
cmDeliverySymbols = ["btcusd_260925","ethusd_260925"]

// 代理地址 (无代理则留空字符串)
proxy = "http://192.168.110.5:7890/"


// ============================================================
// 3. 工具函数
// ============================================================

// 构建币安组合流 URL: <symbol>@depth@100ms
def buildDepthUrl(baseUrl, symbols) {
if(symbols.size() == 0return""
    streams = symbols + "@depth@100ms"
return baseUrl + "/stream?streams=" + concat(streams, "/")
}

// 检查字典是否包含指定 key
def dictHas(d, key) {
    keys = d.keys()
for(k in keys) { if(k == key) returntrue }
returnfalse
}

// 安全构造 DOUBLE[] 数组向量
// 非空: arrayVector([count], data)
// 空数据: arrayVector([1], double([0.0])) 作为哨兵值, 保证类型始终为 DOUBLE[]
def makeDoubleArrVec(data) {
    n = data.size()
if(n > 0return arrayVector([n], data)
return arrayVector([1], double([0.0]))
}


// ============================================================
// 4. WebSocket 回调函数
// ============================================================

def depthOnOpen(ws) {
    writeLog("[DepthWS] Connected")
}

def depthOnError(ws, error) {
    writeLog("[DepthWS] Error: " + string(error))
}

def depthOnClose(ws, statusCode, msg) {
    writeLog("[DepthWS] Closed, code=" + string(statusCode) + ", msg=" + string(msg))
}

// 核心深度消息处理
// depthUpdate 组合流格式:
//   {"stream":"btcusdt@depth@100ms","data":{
//     "e":"depthUpdate","E":1672531200000,"s":"BTCUSDT",
//     "U":157,"u":160,"pu":156,
//     "b":[["16500.00","0.500"],["16499.00","1.200"]],
//     "a":[["16501.00","0.300"],["16502.00","2.100"]]
//   }}
def processDepth(ws, data, exchange, marketType) {
    n = data.rows()
if(n == 0return

    st = objByName("Crypto_depthST")

for(i in0..(n-1)) {
try {
            s = string(data.msg[i])

// 处理币安应用层 ping (Spot 市场会发送 {"ping":<id>})
if(regexFind(s, "\"ping\"") >= 0 && regexFind(s, "\"pong\"") < 0) {
                pong = regexReplace(s, "\"ping\"""\"pong\"")
WebSocket::send(ws, blob(pong))
continue
            }

// 仅处理 depthUpdate 事件 (快速过滤, 避免 fromStdJson 开销)
if(regexFind(s, "\"e\":\"depthUpdate\"") < 0continue

// 深度数据含嵌套数组, 必须用 fromStdJson 解析
            dict = fromStdJson(s)

// 解包组合流包装
if(dictHas(dict, "stream") && dictHas(dict, "data")) {
                payload = dict["data"]
            } else {
                payload = dict
            }

            bids = payload["b"]
            asks = payload["a"]
            bidCount = bids.size()
            askCount = asks.size()

// 构建 bid 价格和数量 flat 数组
            bp = array(DOUBLE, bidCount)
            bq = array(DOUBLE, bidCount)
for(j in0:bidCount) {
                bp[j] = double(bids[j][0])
                bq[j] = double(bids[j][1])
            }

// 构建 ask 价格和数量 flat 数组
            ap = array(DOUBLE, askCount)
            aq = array(DOUBLE, askCount)
for(j in0:askCount) {
                ap[j] = double(asks[j][0])
                aq[j] = double(asks[j][1])
            }

// 包装为 DOUBLE[] 数组向量 (每个 cell 是一个不定长 DOUBLE 数组)
// arrayVector offset 必须严格递增且为正数, 所以用累积长度 [count] 而非 [0, count]
            bidPriceCol  = makeDoubleArrVec(bp)
            bidQtyCol    = makeDoubleArrVec(bq)
            bidOrdersCol = makeDoubleArrVec(array(DOUBLE, 0))
            askPriceCol  = makeDoubleArrVec(ap)
            askQtyCol    = makeDoubleArrVec(aq)
            askOrdersCol = makeDoubleArrVec(array(DOUBLE, 0))

// Spot 无 pu 字段, 期货有 (DolphinDB 无三元运算符, 必须用 if/else)
if(dictHas(payload, "pu")) {
                pu = long(payload["pu"])
            } else {
                pu = 0l
            }

// 构建一行数据并写入流表
            row = table(
                timestamp(long(payload["E"])) as eventTime,
                now() as collectionTime,
                exchange as exchange,
                marketType as marketType,
                string(payload["s"]) as symbol,
                bidPriceCol as bidPrice,
                bidQtyCol as bidQty,
                bidOrdersCol as bidOrders,
                askPriceCol as askPrice,
                askQtyCol as askQty,
                askOrdersCol as askOrders,
                long(payload["U"]) as firstId,
                long(payload["u"]) as lastId,
                pu as prevLastId
            )

            st.append!(row)
        } catch(ex) {
            writeLog("[DepthWS] Parse error: " + string(ex))
        }
    }
}

// 各市场 onMessage 包装函数
def spotDepthOnMsg(ws, data)    { processDepth(ws, data, "Binance-Spot""SPOT") }
def umPerpDepthOnMsg(ws, data)  { processDepth(ws, data, "Binance-UM",  "SWAP") }
def umDelivDepthOnMsg(ws, data) { processDepth(ws, data, "Binance-UM",  "FUTURE") }
def cmPerpDepthOnMsg(ws, data)  { processDepth(ws, data, "Binance-CM",  "SWAP") }
def cmDelivDepthOnMsg(ws, data) { processDepth(ws, data, "Binance-CM",  "FUTURE") }


// ============================================================
// 5. 构建各市场 WebSocket URL
// ============================================================
//
// 币安官方 WebSocket 端点:
//   现货:        wss://stream.binance.com:9443
//   USDT-M 合约: wss://fstream.binance.com/public  (永续+交割共用)
//   Coin-M 合约: wss://dstream.binance.com  (永续+交割共用)
//
// @depth@100ms = depthUpdate 增量深度 (非 @depth20@100ms 快照)
// ============================================================

spotUrl    = buildDepthUrl("wss://stream.binance.com:9443", usdtSymbols)
umPerpUrl  = buildDepthUrl("wss://fstream.binance.com/public", usdtSymbols)
umDelivUrl = buildDepthUrl("wss://fstream.binance.com/public", umDeliverySymbols)
cmPerpUrl  = buildDepthUrl("wss://dstream.binance.com", coinSymbols)
cmDelivUrl = buildDepthUrl("wss://dstream.binance.com", cmDeliverySymbols)


// ============================================================
// 6. 启动 WebSocket 订阅
// ============================================================

config = dict(STRING, ANY)
if(strlen(proxy) > 0) config["proxy"] = proxy
config["reconnectCount"] = -1// 无限重连

// --- Spot ---
tryWebSocket::cancelSubJob("spot_depth") }catch(ex){}
ws1 = WebSocket::createSubJob(spotUrl, depthOnOpen, spotDepthOnMsg, depthOnError, depthOnClose, "spot_depth", config)
writeLog("[DepthWS] Spot started: " + spotUrl)

// --- USDT-M Perpetual ---
tryWebSocket::cancelSubJob("um_perp_depth") }catch(ex){}
ws2 = WebSocket::createSubJob(umPerpUrl, depthOnOpen, umPerpDepthOnMsg, depthOnError, depthOnClose, "um_perp_depth", config)
writeLog("[DepthWS] USDT-M Perp started: " + umPerpUrl)

// --- USDT-M Delivery ---
if(umDeliverySymbols.size() > 0) {
tryWebSocket::cancelSubJob("um_deliv_depth") }catch(ex){}
    ws3 = WebSocket::createSubJob(umDelivUrl, depthOnOpen, umDelivDepthOnMsg, depthOnError, depthOnClose, "um_deliv_depth", config)
    writeLog("[DepthWS] USDT-M Delivery started: " + umDelivUrl)
}else{
    writeLog("[DepthWS] USDT-M Delivery: skipped (no symbols configured)")
}

// --- Coin-M Perpetual ---
tryWebSocket::cancelSubJob("cm_perp_depth") }catch(ex){}
ws4 = WebSocket::createSubJob(cmPerpUrl, depthOnOpen, cmPerpDepthOnMsg, depthOnError, depthOnClose, "cm_perp_depth", config)
writeLog("[DepthWS] Coin-M Perp started: " + cmPerpUrl)

// --- Coin-M Delivery ---
if(cmDeliverySymbols.size() > 0) {
tryWebSocket::cancelSubJob("cm_deliv_depth") }catch(ex){}
    ws5 = WebSocket::createSubJob(cmDelivUrl, depthOnOpen, cmDelivDepthOnMsg, depthOnError, depthOnClose, "cm_deliv_depth", config)
    writeLog("[DepthWS] Coin-M Delivery started: " + cmDelivUrl)
}else{
    writeLog("[DepthWS] Coin-M Delivery: skipped (no symbols configured)")
}


// ============================================================
// 7. 管理命令 (在 DolphinDB GUI 中手动执行)
// ============================================================
//
// 查看订阅状态:
//   WebSocket::getSubJobStat()
//
// 停止所有订阅:
//   WebSocket::cancelSubJob("spot_depth")
//   WebSocket::cancelSubJob("um_perp_depth")
//   WebSocket::cancelSubJob("um_deliv_depth")
//   WebSocket::cancelSubJob("cm_perp_depth")
//   WebSocket::cancelSubJob("cm_deliv_depth")
//
// 查看已采集数据:
//   select top 100 * from Crypto_depthST
//   select count(*) from Crypto_depthST group by exchange, marketType

Binance_KLine_ws.dos

/*
 * Binance_KLine_ws.dos
 *
 * DolphinDB WebSocket 插件版 Binance 1分钟K线采集器
 *
 * 支持市场:
 *   - Spot            现货           wss://stream.binance.com:9443
 *   - USDT-M Perpetual U本位永续     wss://fstream.binance.com
 *   - USDT-M Delivery  U本位交割     wss://fstream.binance.com
 *   - Coin-M Perpetual 币本位永续    wss://dstream.binance.com
 *   - Coin-M Delivery  币本位交割    wss://dstream.binance.com
 *
 * 数据流: <symbol>@kline_1m
 * Ref: https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams/Kline-Candlestick-Streams
 *
 * 前置条件:
 *   - DolphinDB Server 2.00.12+
 *   - WebSocket 插件已安装: installPlugin("WebSocket"); loadPlugin("WebSocket")
 *
 * 使用方法:
 *   loadPlugin("WebSocket")
 *   run("Binance_KLine_ws.dos")
 */


// ============================================================
// 1. 创建数据库与流表
// ============================================================

dbName = "dfs://CryptoKLine"
tbName = "kline_1m"
streamTbName = "Crypto_minKLineST"

colNames = `eventTime`collectionTime`exchange`marketType`symbol`open`high`low`close`volume`quoteVolume`trades`takerBuyBase`takerBuyQuote
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE]

if(!existsDatabase(dbName)){
    db = database(dbName, RANGE, 2010.01M+(0..30)*12)
}else{
    db = database(dbName)
}
if(!existsTable(dbName, tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime)
}

try{
    enableTableShareAndPersistence(
        table=keyedStreamTable(`
exchange`symbol`eventTime, 10000:0, colNames, colTypes),
        tableName=streamTbName,
        cacheSize=100000,
        retentionMinutes=2880
    )
}catch(ex){}

kline1mTb = loadTable(dbName, tbName)
try{ unsubscribeTable(tableName=streamTbName, actionName="insertDB") }catch(ex){}
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2,
               handler=kline1mTb, msgAsTable=true, batchSize=10000,
               throttle=1, persistOffset=true)


// ============================================================
// 2. 品种与代理配置
// ============================================================

// 现货 & USDT-M 公共品种
usdtSymbols = ["btcusdt","ethusdt","solusdt","bnbusdt"]

// Coin-M 品种 (USD 计价)
coinSymbols = ["btcusd_perp","ethusd_perp"]

// USDT-M 交割品种 (有活跃季度合约时填入, 如 "btcusdt_250627","ethusdt_250627")
umDeliverySymbols = ["btcusdt_260925","ethusdt_260925"]

// Coin-M 交割品种 (有活跃季度合约时填入)
cmDeliverySymbols = ["btcusd_260925","ethusd_260925"]

// 代理地址 (无代理则留空字符串)
proxy = "http://192.168.110.5:7890/"

// ============================================================
// 3. 工具函数
// ============================================================

// 构建币安组合流 URL
def buildKlineUrl(baseUrl, symbols) {
if(symbols.size() == 0return""
    streams = symbols + "@kline_1m"
return baseUrl + "/stream?streams=" + concat(streams, "/")
}

// regexFindStr 返回完整匹配而非捕获组, 需用 substr 截取值部分
// 从 JSON 字符串提取字符串字段值: "key":"value" → "value"
def extractStr(json, key) {
    tag = "\"" + key + "\":\""
    matched = regexFindStr(json, tag + "[^\"]*\""true)
if(matched == ""return NULL
    tagLen = strlen(tag)
return substr(matched, tagLen, strlen(matched) - tagLen - 1)
}

// 从 JSON 字符串提取数值字段值: "key":12345 → "12345"
def extractNum(json, key) {
    tag = "\"" + key + "\":"
    matched = regexFindStr(json, tag + "[^,}\\s]+"true)
if(matched == ""return NULL
    tagLen = strlen(tag)
return substr(matched, tagLen, strlen(matched) - tagLen)
}

// ============================================================
// 4. WebSocket 回调函数
// ============================================================

def klineOnOpen(ws) {
    writeLog("[KlineWS] Connected")
}

def klineOnError(ws, error) {
    writeLog("[KlineWS] Error: " + string(error))
}

def klineOnClose(ws, statusCode, msg) {
    writeLog("[KlineWS] Closed, code=" + string(statusCode) + ", msg=" + string(msg))
}

// 核心K线消息处理: 解析 JSON, 过滤已完成K线, 写入流表
def processKline(ws, data, exchange, marketType) {
    n = data.rows()
if(n == 0return

    st = objByName("Crypto_minKLineST")

    eventTimes      = array(TIMESTAMP, 0)
    collectionTimes = array(TIMESTAMP, 0)
    exchanges       = array(SYMBOL, 0)
    marketTypes     = array(SYMBOL, 0)
    symbols         = array(SYMBOL, 0)
    opens           = array(DOUBLE, 0)
    highs           = array(DOUBLE, 0)
    lows            = array(DOUBLE, 0)
    closes          = array(DOUBLE, 0)
    volumes         = array(DOUBLE, 0)
    quoteVolumes    = array(DOUBLE, 0)
    tradesList      = array(INT, 0)
    takerBuyBases   = array(DOUBLE, 0)
    takerBuyQuotes  = array(DOUBLE, 0)

for(i in0..(n-1)) {
try {
            s = string(data.msg[i])

// 处理币安应用层 ping (Spot 市场会发送 {"ping":<id>})
if(regexFind(s, "\"ping\"") >= 0 && regexFind(s, "\"pong\"") < 0) {
                pong = regexReplace(s, "\"ping\"""\"pong\"")
WebSocket::send(ws, blob(pong))
continue
            }

// 仅处理 kline 事件
if(regexFind(s, "\"e\":\"kline\"") < 0continue

// 仅处理已完成的K线 (x=true)
if(regexFind(s, "\"x\":true") < 0continue

// 提取字段 (使用 regex 从 JSON 中提取)
            evt    = extractNum(s, "E")   // eventTime (毫秒时间戳)
            sym    = extractStr(s, "s")   // symbol
            o      = extractStr(s, "o")   // open
            h      = extractStr(s, "h")   // high
            l      = extractStr(s, "l")   // low
            c      = extractStr(s, "c")   // close
            v      = extractStr(s, "v")   // volume
            q      = extractStr(s, "q")   // quoteVolume
            tradeN = extractNum(s, "n")   // trades
            tbb    = extractStr(s, "V")   // takerBuyBaseVolume
            tbq    = extractStr(s, "Q")   // takerBuyQuoteVolume

// 校验必要字段
if(isNull(evt) || isNull(sym) || isNull(o)) continue

            eventTimes.append!(timestamp(long(evt)))
            collectionTimes.append!(now())
            exchanges.append!(exchange)
            marketTypes.append!(marketType)
            symbols.append!(sym)
            opens.append!(double(o))
            highs.append!(isNull(h) ? 0d : double(h))
            lows.append!(isNull(l) ? 0d : double(l))
            closes.append!(isNull(c) ? 0d : double(c))
            volumes.append!(isNull(v) ? 0d : double(v))
            quoteVolumes.append!(isNull(q) ? 0d : double(q))
            tradesList.append!(isNull(tradeN) ? 0 : int(tradeN))
            takerBuyBases.append!(isNull(tbb) ? 0d : double(tbb))
            takerBuyQuotes.append!(isNull(tbq) ? 0d : double(tbq))
        } catch(ex) {
            writeLog("[KlineWS] Parse error: " + string(ex))
        }
    }

    cnt = eventTimes.size()
if(cnt > 0) {
        result = table(
            eventTimes as eventTime,
            collectionTimes as collectionTime,
            exchanges as exchange,
            marketTypes as marketType,
            symbols as symbol,
            opens as open,
            highs as high,
            lows as low,
            closes as close,
            volumes as volume,
            quoteVolumes as quoteVolume,
            tradesList as trades,
            takerBuyBases as takerBuyBase,
            takerBuyQuotes as takerBuyQuote
        )
        st.append!(result)
    }
}

// 各市场 onMessage 包装函数
def spotOnMsg(ws, data)    { processKline(ws, data, "Binance-Spot""SPOT") }
def umPerpOnMsg(ws, data)  { processKline(ws, data, "Binance-UM""SWAP") }
def umDelivOnMsg(ws, data) { processKline(ws, data, "Binance-UM""FUTURE") }
def cmPerpOnMsg(ws, data)  { processKline(ws, data, "Binance-CM""SWAP") }
def cmDelivOnMsg(ws, data) { processKline(ws, data, "Binance-CM""FUTURE") }

// ============================================================
// 5. 构建各市场 WebSocket URL
// ============================================================
//
// 币安官方 WebSocket 端点 (Ref: Binance API Docs):
//   现货:        wss://stream.binance.com:9443
//   USDT-M 合约: wss://fstream.binance.com/market  (永续+交割共用)
//   Coin-M 合约: wss://dstream.binance.com  (永续+交割共用)
//
// 组合流格式: /stream?streams=<stream1>/<stream2>/...
// 单流格式:   /ws/<streamName>
// ============================================================

spotUrl    = buildKlineUrl("wss://stream.binance.com:9443", usdtSymbols)
umPerpUrl  = buildKlineUrl("wss://fstream.binance.com/market", usdtSymbols)
umDelivUrl = buildKlineUrl("wss://fstream.binance.com/market", umDeliverySymbols)
cmPerpUrl  = buildKlineUrl("wss://dstream.binance.com", coinSymbols)
cmDelivUrl = buildKlineUrl("wss://dstream.binance.com", cmDeliverySymbols)

// ============================================================
// 6. 启动 WebSocket 订阅
// ============================================================

config = dict(STRING, ANY)
if(strlen(proxy) > 0) config["proxy"] = proxy
config["reconnectCount"] = -1// 无限重连

// --- Spot ---
ws1 = WebSocket::createSubJob(spotUrl, klineOnOpen, spotOnMsg, klineOnError, klineOnClose, "spot_kline", config)
writeLog("[KlineWS] Spot started: " + spotUrl)

// --- USDT-M Perpetual ---
ws2 = WebSocket::createSubJob(umPerpUrl, klineOnOpen, umPerpOnMsg, klineOnError, klineOnClose, "um_perp_kline", config)
writeLog("[KlineWS] USDT-M Perp started: " + umPerpUrl)

// --- USDT-M Delivery ---
if(umDeliverySymbols.size() > 0) {
    ws3 = WebSocket::createSubJob(umDelivUrl, klineOnOpen, umDelivOnMsg, klineOnError, klineOnClose, "um_deliv_kline", config)
    writeLog("[KlineWS] USDT-M Delivery started: " + umDelivUrl)
}else{
    writeLog("[KlineWS] USDT-M Delivery: skipped (no symbols configured)")
}

// --- Coin-M Perpetual ---
ws4 = WebSocket::createSubJob(cmPerpUrl, klineOnOpen, cmPerpOnMsg, klineOnError, klineOnClose, "cm_perp_kline", config)
writeLog("[KlineWS] Coin-M Perp started: " + cmPerpUrl)

// --- Coin-M Delivery ---
if(cmDeliverySymbols.size() > 0) {
    ws5 = WebSocket::createSubJob(cmDelivUrl, klineOnOpen, cmDelivOnMsg, klineOnError, klineOnClose, "cm_deliv_kline", config)
    writeLog("[KlineWS] Coin-M Delivery started: " + cmDelivUrl)
}else{
    writeLog("[KlineWS] Coin-M Delivery: skipped (no symbols configured)")
}

// ============================================================
// 7. 管理命令 (在 DolphinDB GUI 中手动执行)
// ============================================================
//
// 查看订阅状态:
//   WebSocket::getSubJobStat()
//
// 停止所有订阅:
//   WebSocket::cancelSubJob("spot_kline")
//   WebSocket::cancelSubJob("um_perp_kline")
//   WebSocket::cancelSubJob("um_deliv_kline")
//   WebSocket::cancelSubJob("cm_perp_kline")
//   WebSocket::cancelSubJob("cm_deliv_kline")
//
// 查看已采集数据:
//   select top 100 * from Crypto_minKLineST
//   select count(*) from Crypto_minKLineST group by exchange, marketType

Binance_Liquidation_ws.dos

/*
 * Binance_Liquidation_ws.dos
 *
 * DolphinDB WebSocket 插件版 Binance 强平订单(forceOrder)采集器
 *
 * 支持市场:
 *   - USDT-M Perpetual  U本位永续     wss://fstream.binance.com
 *   - USDT-M Delivery   U本位交割     wss://fstream.binance.com
 *   - Coin-M Perpetual  币本位永续    wss://dstream.binance.com
 *   - Coin-M Delivery   币本位交割    wss://dstream.binance.com
 *
 * Note: Spot 无 forceOrder 流, 不支持
 *
 * 数据流: <symbol>@forceOrder (按品种订阅)
 * 所有市场写入同一流表 Crypto_liquidationST, 通过 exchange + marketType 区分
 *
 * Ref:
 *   USDs-M: https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Liquidation-Order-Streams
 *   Coin-M: https://developers.binance.com/docs/derivatives/coin-margined-futures/websocket-market-streams/Liquidation-Order-Streams
 *
 * 前置条件:
 *   - DolphinDB Server 2.00.12+
 *   - WebSocket 插件已安装: loadPlugin("WebSocket")
 *
 * 使用方法:
 *   loadPlugin("WebSocket")
 *   run("Binance_Liquidation_ws.dos")
 */


// ============================================================
// 1. 创建数据库与流表
// ============================================================

dbName = "dfs://CryptoDaily"
tbName = "liquidation"
streamTbName = "Crypto_liquidationST"

colNames = `eventTime`collectionTime`exchange`marketType`symbol`side`type`timeInForce`quantity`price`avgPrice`status`lastFilledQty`filledAccumQty
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, STRING, STRING, STRING, DOUBLE, DOUBLE, DOUBLE, STRING, DOUBLE, DOUBLE]

if(!existsDatabase(dbName)){
    db = database(dbName, RANGE, 2010.01M+(0..30)*60)
}else{
    db = database(dbName)
}
if(!existsTable(dbName, tbName)){
    createDimensionTable(db, table(1:0, colNames, colTypes), tbName)
}

try{
    enableTableShareAndPersistence(
        table=keyedStreamTable(`exchange`symbol`eventTime, 10000:0, colNames, colTypes),
        tableName=streamTbName,
        cacheSize=100000,
        retentionMinutes=2880
    )
}catch(ex){}
go

liquidationTb = loadTable(dbName, tbName)
try{ unsubscribeTable(tableName=streamTbName, actionName="insertDB") }catch(ex){}
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2,
               handler=append!{liquidationTb,}, msgAsTable=true, batchSize=10000,
               throttle=1, persistOffset=true)

// ============================================================
// 2. 品种与代理配置
// ============================================================

// USDT-M 公共品种 (永续+交割共用)
usdtSymbols = ["btcusdt","ethusdt","solusdt","bnbusdt"]

// Coin-M 品种 (USD 计价)
coinSymbols = ["btcusd_perp","ethusd_perp"]

// USDT-M 交割品种 (有活跃季度合约时填入)
umDeliverySymbols = ["btcusdt_260925","ethusdt_260925"]

// Coin-M 交割品种 (有活跃季度合约时填入)
cmDeliverySymbols = ["btcusd_260925","ethusd_260925"]

// 代理地址 (无代理则留空字符串)
proxy = "http://192.168.110.5:7890/"

// ============================================================
// 3. 工具函数
// ============================================================

// 检查字典是否包含指定 key
def dictHas(d, key) {
    keys = d.keys()
    for(k in keys) { if(k == key) return true }
    return false
}

// 安全获取字典值, 不存在则返回 NULL
def dictGet(d, key) {
    if(!dictHas(d, key)) return NULL
    return d[key]
}

// 安全转换为 TIMESTAMP, 处理 NULL
def toTimestamp(v) {
    if(isNull(v)) return timestamp(NULL)
    return timestamp(long(v))
}

// 构建币安组合流 URL: <symbol>@forceOrder
def buildLiquidationUrl(baseUrl, symbols) {
    if(symbols.size() == 0) return ""
    streams = symbols + "@forceOrder"
    return baseUrl + "/stream?streams=" + concat(streams, "/")
}

// ============================================================
// 4. WebSocket 回调函数
// ============================================================

def liquidationOnOpen(ws) {
    writeLog("[LiquidationWS] Connected")
}

def liquidationOnError(ws, error) {
    writeLog("[LiquidationWS] Error: " + string(error))
}

def liquidationOnClose(ws, statusCode, msg) {
    writeLog("[LiquidationWS] Closed, code=" + string(statusCode) + ", msg=" + string(msg))
}

// 核心 forceOrder 消息处理
// forceOrder 组合流格式:
//   {"stream":"btcusdt@forceOrder","data":{
//     "e":"forceOrder","E":1672531200000,
//     "o":{
//       "s":"BTCUSDT","S":"SELL","o":"LIMIT","f":"IOC",
//       "q":"0.014","p":"16500.00","ap":"16500.00",
//       "X":"FILLED","l":"0.014","z":"0.014"
//     }
//   }}
//
// Note:
//   - 订单详情在嵌套对象 o 中, 必须用 fromStdJson 解析
//   - side(S) 字段在 Binance 原始为大写, 统一转小写存储
def processLiquidation(ws, data, exchange, marketType) {
    n = data.rows()
    if(n == 0) return

    st = objByName("Crypto_liquidationST")

    for(i in 0..(n-1)) {
        try {
            s = string(data.msg[i])

            // 处理币安应用层 ping
            if(regexFind(s, "\"ping\"") >= 0 && regexFind(s, "\"pong\"") < 0) {
                pong = regexReplace(s, "\"ping\"", "\"pong\"")
                WebSocket::send(ws, blob(pong))
                continue
            }

            // 仅处理 forceOrder 事件 (快速过滤)
            if(regexFind(s, "\"e\":\"forceOrder\"") < 0) continue

            // 含嵌套对象 o, 必须用 fromStdJson 解析
            dict = fromStdJson(s)

            // 解包组合流包装
            payload = dict
            if(dictHas(dict, "data")) {
                payload = dict["data"]
            }

            // 获取嵌套订单对象
            orderObj = dictGet(payload, "o")
            if(isNull(orderObj)) continue

            // 提取字段
            evt = toTimestamp(dictGet(payload, "E"))
            sym = lower(string(dictGet(orderObj, "s")))
            side = lower(string(dictGet(orderObj, "S")))
            ordType = string(dictGet(orderObj, "o"))
            tif = string(dictGet(orderObj, "f"))
            qty = double(dictGet(orderObj, "q"))
            price = double(dictGet(orderObj, "p"))
            avgPrice = double(dictGet(orderObj, "ap"))
            status = string(dictGet(orderObj, "X"))
            lastFilledQty = double(dictGet(orderObj, "l"))
            filledAccumQty = double(dictGet(orderObj, "z"))

            // 校验必要字段
            if(isNull(evt) || isNull(sym)) continue

            row = table(
                evt as eventTime,
                now() as collectionTime,
                exchange as exchange,
                marketType as marketType,
                sym as symbol,
                side as side,
                ordType as type,
                tif as timeInForce,
                qty as quantity,
                price as price,
                avgPrice as avgPrice,
                status as status,
                lastFilledQty as lastFilledQty,
                filledAccumQty as filledAccumQty
            )
            st.append!(row)
        } catch(ex) {
            writeLog("[LiquidationWS] Parse error: " + string(ex))
        }
    }
}

// 各市场 onMessage 包装函数
def umPerpLiquidationOnMsg(ws, data)  { processLiquidation(ws, data, "Binance-UM", "SWAP") }
def umDelivLiquidationOnMsg(ws, data) { processLiquidation(ws, data, "Binance-UM", "FUTURE") }
def cmPerpLiquidationOnMsg(ws, data)  { processLiquidation(ws, data, "Binance-CM", "SWAP") }
def cmDelivLiquidationOnMsg(ws, data) { processLiquidation(ws, data, "Binance-CM", "FUTURE") }

// ============================================================
// 5. 构建各市场 WebSocket URL
// ============================================================
//
// 币安官方 WebSocket 端点:
//   USDT-M 合约: wss://fstream.binance.com  (永续+交割共用)
//   Coin-M 合约: wss://dstream.binance.com  (永续+交割共用)
//
// forceOrder 为按品种订阅: <symbol>@forceOrder
// ============================================================

umPerpUrl  = buildLiquidationUrl("wss://fstream.binance.com", usdtSymbols)
umDelivUrl = buildLiquidationUrl("wss://fstream.binance.com", umDeliverySymbols)
cmPerpUrl  = buildLiquidationUrl("wss://dstream.binance.com", coinSymbols)
cmDelivUrl = buildLiquidationUrl("wss://dstream.binance.com", cmDeliverySymbols)

// ============================================================
// 6. 启动 WebSocket 订阅
// ============================================================

config = dict(STRING, ANY)
if(strlen(proxy) > 0) config["proxy"] = proxy
config["reconnectCount"] = -1  // 无限重连

// --- USDT-M Perpetual (U本位永续) ---
try{ WebSocket::cancelSubJob("um_perp_liquidation") }catch(ex){}
ws1 = WebSocket::createSubJob(umPerpUrl, liquidationOnOpen, umPerpLiquidationOnMsg, liquidationOnError, liquidationOnClose, "um_perp_liquidation", config)
writeLog("[LiquidationWS] USDT-M Perp started: " + umPerpUrl)

// --- USDT-M Delivery (U本位交割) ---
if(umDeliverySymbols.size() > 0) {
    try{ WebSocket::cancelSubJob("um_deliv_liquidation") }catch(ex){}
    ws3 = WebSocket::createSubJob(umDelivUrl, liquidationOnOpen, umDelivLiquidationOnMsg, liquidationOnError, liquidationOnClose, "um_deliv_liquidation", config)
    writeLog("[LiquidationWS] USDT-M Delivery started: " + umDelivUrl)
}else{
    writeLog("[LiquidationWS] USDT-M Delivery: skipped (no symbols configured)")
}

// --- Coin-M Perpetual (币本位永续) ---
try{ WebSocket::cancelSubJob("cm_perp_liquidation") }catch(ex){}
ws4 = WebSocket::createSubJob(cmPerpUrl, liquidationOnOpen, cmPerpLiquidationOnMsg, liquidationOnError, liquidationOnClose, "cm_perp_liquidation", config)
writeLog("[LiquidationWS] Coin-M Perp started: " + cmPerpUrl)

// --- Coin-M Delivery (币本位交割) ---
if(cmDeliverySymbols.size() > 0) {
    try{ WebSocket::cancelSubJob("cm_deliv_liquidation") }catch(ex){}
    ws5 = WebSocket::createSubJob(cmDelivUrl, liquidationOnOpen, cmDelivLiquidationOnMsg, liquidationOnError, liquidationOnClose, "cm_deliv_liquidation", config)
    writeLog("[LiquidationWS] Coin-M Delivery started: " + cmDelivUrl)
}else{
    writeLog("[LiquidationWS] Coin-M Delivery: skipped (no symbols configured)")
}

// ============================================================
// 7. 管理命令 (在 DolphinDB GUI 中手动执行)
// ============================================================
//
// 查看订阅状态:
//   WebSocket::getSubJobStat()
//
// 停止所有订阅:
//   WebSocket::cancelSubJob("um_perp_liquidation")
//   WebSocket::cancelSubJob("um_deliv_liquidation")
//   WebSocket::cancelSubJob("cm_perp_liquidation")
//   WebSocket::cancelSubJob("cm_deliv_liquidation")
//
// 查看已采集数据:
//   select top 100 * from Crypto_liquidationST
//   select count(*) from Crypto_liquidationST group by exchange, marketType

Binance_Ticker_ws.dos

/*
 * Binance_Ticker_ws.dos
 *
 * DolphinDB WebSocket 插件版 Binance 最优挂单(bookTicker)采集器
 *
 * 支持市场:
 *   - Spot            现货           wss://stream.binance.com:9443
 *   - USDT-M Perpetual U本位永续     wss://fstream.binance.com
 *   - USDT-M Delivery  U本位交割     wss://fstream.binance.com
 *   - Coin-M Perpetual 币本位永续    wss://dstream.binance.com
 *   - Coin-M Delivery  币本位交割    wss://dstream.binance.com
 *
 * 数据流: <symbol>@bookTicker
 * Ref:
 *   Spot:   https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams/Individual-Symbol-Book-Ticker-Streams
 *   USDs-M: https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Individual-Symbol-Book-Ticker-Streams
 *   Coin-M: https://developers.binance.com/docs/derivatives/coin-margined-futures/websocket-market-streams/Individual-Symbol-Book-Ticker-Streams
 *
 * 前置条件:
 *   - DolphinDB Server 2.00.12+
 *   - WebSocket 插件已安装: installPlugin("WebSocket"); loadPlugin("WebSocket")
 *
 * 使用方法:
 *   loadPlugin("WebSocket")
 *   run("Binance_Ticker_ws.dos")
 */


// ============================================================
// 1. 创建数据库与流表
// ============================================================


dbName = "dfs://CryptoTick"
tbName = "bookTicker"
streamTbName = "Crypto_bookTickerST"

colNames = `eventTime`collectionTime`exchange`marketType`symbol`bidPrice`bidQty`askPrice`askQty`updateId
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG]

if(!existsDatabase(dbName)){
    dbDate = database("", RANGE, 2020.01M+(0..120))
    dbSym  = database("", HASH, [SYMBOL, 4])
    db = database(dbName, COMPO, [dbDate, dbSym], engine='TSDB')
}else{
    db = database(dbName)
}
if(!existsTable(dbName, tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `eventTime`symbol, sortColumns=`exchange`marketType`symbol`eventTime)
}

try{
    enableTableShareAndPersistence(
        table=keyedStreamTable(`exchange`symbol`eventTime, 10000:0, colNames, colTypes),
        tableName=streamTbName,
        cacheSize=100000,
        retentionMinutes=2880
    )
}catch(ex){}

bookTickerTb = loadTable(dbName, tbName)
try{ unsubscribeTable(tableName=streamTbName, actionName="insertDB") }catch(ex){}
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2,
               handler=bookTickerTb, msgAsTable=true, batchSize=10000,
               throttle=1, persistOffset=true)

// ============================================================
// 2. 品种与代理配置
// ============================================================

// 现货 & USDT-M 公共品种
usdtSymbols = ["btcusdt","ethusdt","solusdt","bnbusdt"]

// Coin-M 品种 (USD 计价)
coinSymbols = ["btcusd_perp","ethusd_perp"]

// USDT-M 交割品种 (有活跃季度合约时填入, 如 "btcusdt_250627","ethusdt_250627")
umDeliverySymbols = ["btcusdt_260925","ethusdt_260925"]

// Coin-M 交割品种 (有活跃季度合约时填入)
cmDeliverySymbols = ["btcusd_260925","ethusd_260925"]

// 代理地址 (无代理则留空字符串)
proxy = "http://192.168.110.5:7890/"

// ============================================================
// 3. 工具函数
// ============================================================

// 构建币安组合流 URL
def buildBookTickerUrl(baseUrl, symbols) {
    if(symbols.size() == 0) return ""
    streams = symbols + "@bookTicker"
    return baseUrl + "/stream?streams=" + concat(streams, "/")
}

// regexFindStr 返回完整匹配而非捕获组, 需用 substr 截取值部分
// 从 JSON 字符串提取字符串字段值: "key":"value" → "value"
def extractStr(json, key) {
    tag = "\"" + key + "\":\""
    matched = regexFindStr(json, tag + "[^\"]*\"", true)
    if(matched == "") return NULL
    tagLen = strlen(tag)
    return substr(matched, tagLen, strlen(matched) - tagLen - 1)
}

// 从 JSON 字符串提取数值字段值: "key":12345 → "12345"
def extractNum(json, key) {
    tag = "\"" + key + "\":"
    matched = regexFindStr(json, tag + "[^,}\\s]+", true)
    if(matched == "") return NULL
    tagLen = strlen(tag)
    return substr(matched, tagLen, strlen(matched) - tagLen)
}

// ============================================================
// 4. WebSocket 回调函数
// ============================================================

def tickerOnOpen(ws) {
    writeLog("[TickerWS] Connected")
}

def tickerOnError(ws, error) {
    writeLog("[TickerWS] Error: " + string(error))
}

def tickerOnClose(ws, statusCode, msg) {
    writeLog("[TickerWS] Closed, code=" + string(statusCode) + ", msg=" + string(msg))
}

// 核心 bookTicker 消息处理: 解析 JSON, 写入流表
// bookTicker 响应格式 (Spot/USDT-M/Coin-M 统一):
//   {"e":"bookTicker","E":1672531200000,"s":"BTCUSDT",
//    "b":"16500.00","B":"0.500","a":"16501.00","A":"0.300","u":400900217}
// 组合流包装:
//   {"stream":"btcusdt@bookTicker","data":{...}}
def processBookTicker(ws, data, exchange, marketType) {
    n = data.rows()
    if(n == 0) return

    st = objByName("Crypto_bookTickerST")

    eventTimes      = array(TIMESTAMP, 0)
    collectionTimes = array(TIMESTAMP, 0)
    exchanges       = array(SYMBOL, 0)
    marketTypes     = array(SYMBOL, 0)
    symbols         = array(SYMBOL, 0)
    bidPrices       = array(DOUBLE, 0)
    bidQtys         = array(DOUBLE, 0)
    askPrices       = array(DOUBLE, 0)
    askQtys         = array(DOUBLE, 0)
    updateIds       = array(LONG, 0)

    for(i in 0..(n-1)) {
        try {
            s = string(data.msg[i])

            // 处理币安应用层 ping (Spot 市场会发送 {"ping":<id>})
            if(regexFind(s, "\"ping\"") >= 0 && regexFind(s, "\"pong\"") < 0) {
                pong = regexReplace(s, "\"ping\"", "\"pong\"")
                WebSocket::send(ws, blob(pong))
                continue
            }

            // 仅处理 bookTicker 事件
            if(regexFind(s, "\"e\":\"bookTicker\"") < 0) continue

            // 提取字段 (使用 regex 从 JSON 中提取)
            // 注意大小写: E=eventTime, e=事件类型; b=bidPrice, B=bidQty; a=askPrice, A=askQty
            evt = extractNum(s, "E")   // eventTime (毫秒时间戳)
            sym = extractStr(s, "s")   // symbol
            b   = extractStr(s, "b")   // bidPrice
            B   = extractStr(s, "B")   // bidQty
            a   = extractStr(s, "a")   // askPrice
            A   = extractStr(s, "A")   // askQty
            u   = extractNum(s, "u")   // updateId

            // 校验必要字段
            if(isNull(evt) || isNull(sym) || isNull(b) || isNull(a)) continue

            eventTimes.append!(timestamp(long(evt)))
            collectionTimes.append!(now())
            exchanges.append!(exchange)
            marketTypes.append!(marketType)
            symbols.append!(sym)
            bidPrices.append!(double(b))
            bidQtys.append!(isNull(B) ? 0d : double(B))
            askPrices.append!(double(a))
            askQtys.append!(isNull(A) ? 0d : double(A))
            updateIds.append!(isNull(u) ? 0l : long(u))
        } catch(ex) {
            writeLog("[TickerWS] Parse error: " + string(ex))
        }
    }

    cnt = eventTimes.size()
    if(cnt > 0) {
        result = table(
            eventTimes as eventTime,
            collectionTimes as collectionTime,
            exchanges as exchange,
            marketTypes as marketType,
            symbols as symbol,
            bidPrices as bidPrice,
            bidQtys as bidQty,
            askPrices as askPrice,
            askQtys as askQty,
            updateIds as updateId
        )
        st.append!(result)
    }
}

// 各市场 onMessage 包装函数
def spotTickerOnMsg(ws, data)    { processBookTicker(ws, data, "Binance-Spot", "SPOT") }
def umPerpTickerOnMsg(ws, data)  { processBookTicker(ws, data, "Binance-UM",  "SWAP") }
def umDelivTickerOnMsg(ws, data) { processBookTicker(ws, data, "Binance-UM",  "FUTURE") }
def cmPerpTickerOnMsg(ws, data)  { processBookTicker(ws, data, "Binance-CM",  "SWAP") }
def cmDelivTickerOnMsg(ws, data) { processBookTicker(ws, data, "Binance-CM",  "FUTURE") }

// ============================================================
// 5. 构建各市场 WebSocket URL
// ============================================================
//
// 币安官方 WebSocket 端点 (Ref: Binance API Docs):
//   现货:        wss://stream.binance.com:9443
//   USDT-M 合约: wss://fstream.binance.com  (永续+交割共用)
//   Coin-M 合约: wss://dstream.binance.com  (永续+交割共用)
//
// 组合流格式: /stream?streams=<stream1>/<stream2>/...
// 单流格式:   /ws/<streamName>
// ============================================================

spotUrl    = buildBookTickerUrl("wss://stream.binance.com:9443", usdtSymbols)
umPerpUrl  = buildBookTickerUrl("wss://fstream.binance.com", usdtSymbols)
umDelivUrl = buildBookTickerUrl("wss://fstream.binance.com", umDeliverySymbols)
cmPerpUrl  = buildBookTickerUrl("wss://dstream.binance.com", coinSymbols)
cmDelivUrl = buildBookTickerUrl("wss://dstream.binance.com", cmDeliverySymbols)

// ============================================================
// 6. 启动 WebSocket 订阅
// ============================================================

config = dict(STRING, ANY)
if(strlen(proxy) > 0) config["proxy"] = proxy
config["reconnectCount"] = -1  // 无限重连

// --- Spot ---
ws1 = WebSocket::createSubJob(spotUrl, tickerOnOpen, spotTickerOnMsg, tickerOnError, tickerOnClose, "spot_ticker", config)
writeLog("[TickerWS] Spot started: " + spotUrl)

// --- USDT-M Perpetual ---
ws2 = WebSocket::createSubJob(umPerpUrl, tickerOnOpen, umPerpTickerOnMsg, tickerOnError, tickerOnClose, "um_perp_ticker", config)
writeLog("[TickerWS] USDT-M Perp started: " + umPerpUrl)

// --- USDT-M Delivery ---
if(umDeliverySymbols.size() > 0) {
    ws3 = WebSocket::createSubJob(umDelivUrl, tickerOnOpen, umDelivTickerOnMsg, tickerOnError, tickerOnClose, "um_deliv_ticker", config)
    writeLog("[TickerWS] USDT-M Delivery started: " + umDelivUrl)
}else{
    writeLog("[TickerWS] USDT-M Delivery: skipped (no symbols configured)")
}

// --- Coin-M Perpetual ---
ws4 = WebSocket::createSubJob(cmPerpUrl, tickerOnOpen, cmPerpTickerOnMsg, tickerOnError, tickerOnClose, "cm_perp_ticker", config)
writeLog("[TickerWS] Coin-M Perp started: " + cmPerpUrl)

// --- Coin-M Delivery ---
if(cmDeliverySymbols.size() > 0) {
    ws5 = WebSocket::createSubJob(cmDelivUrl, tickerOnOpen, cmDelivTickerOnMsg, tickerOnError, tickerOnClose, "cm_deliv_ticker", config)
    writeLog("[TickerWS] Coin-M Delivery started: " + cmDelivUrl)
}else{
    writeLog("[TickerWS] Coin-M Delivery: skipped (no symbols configured)")
}

// ============================================================
// 7. 管理命令 (在 DolphinDB GUI 中手动执行)
// ============================================================
//
// 查看订阅状态:
//   WebSocket::getSubJobStat()
//
// 停止所有订阅:
//   WebSocket::cancelSubJob("spot_ticker")
//   WebSocket::cancelSubJob("um_perp_ticker")
//   WebSocket::cancelSubJob("um_deliv_ticker")
//   WebSocket::cancelSubJob("cm_perp_ticker")
//   WebSocket::cancelSubJob("cm_deliv_ticker")
//
// 查看已采集数据:
//   select top 100 * from Crypto_bookTickerST
//   select count(*) from Crypto_bookTickerST group by exchange, marketType

Binance_Trade_ws.dos

/*
 * Binance_Trade_ws.dos
 *
 * DolphinDB WebSocket 插件版 Binance 逐笔成交(aggTrade)采集器
 *
 * 支持市场:
 *   - Spot            现货           wss://stream.binance.com:9443
 *   - USDT-M Perpetual U本位永续     wss://fstream.binance.com
 *   - USDT-M Delivery  U本位交割     wss://fstream.binance.com
 *   - Coin-M Perpetual 币本位永续    wss://dstream.binance.com
 *   - Coin-M Delivery  币本位交割    wss://dstream.binance.com
 *
 * 数据流: <symbol>@aggTrade
 * Ref:
 *   Spot:   https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams/Trade-Streams
 *   USDs-M: https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Agg-Trade-Streams
 *   Coin-M: https://developers.binance.com/docs/derivatives/coin-margined-futures/websocket-market-streams/Agg-Trade-Streams
 *
 * 前置条件:
 *   - DolphinDB Server 2.00.12+
 *   - WebSocket 插件已安装: installPlugin("WebSocket"); loadPlugin("WebSocket")
 *
 * 使用方法:
 *   loadPlugin("WebSocket")
 *   run("Binance_Trade_ws.dos")
 */


// ============================================================
// 1. 创建数据库与流表
// ============================================================


dbName = "dfs://CryptoTick"
tbName = "trade"
streamTbName = "Crypto_aggTradeST"

colNames = `eventTime`collectionTime`exchange`marketType`symbol`price`quantity`side`tradeId`firstId`lastId
colTypes = [TIMESTAMP, TIMESTAMP, SYMBOL, SYMBOL, SYMBOL, DOUBLE, DOUBLE, STRING, LONG, LONG, LONG]

if(!existsDatabase(dbName)){
    dbDate = database("", RANGE, 2020.01M+(0..120))
    dbSym  = database("", HASH, [SYMBOL, 4])
    db = database(dbName, COMPO, [dbDate, dbSym], engine='TSDB')
}else{
    db = database(dbName)
}
if(!existsTable(dbName, tbName)){
    createPartitionedTable(db, table(1:0, colNames, colTypes), tbName, `
eventTime`symbol, sortColumns=`exchange`symbol`eventTime)
}

try{
    enableTableShareAndPersistence(
        table=keyedStreamTable(`exchange`symbol`lastId, 10000:0, colNames, colTypes),
        tableName=streamTbName,
        cacheSize=100000,
        retentionMinutes=2880
    )
}catch(ex){}

tradeTb = loadTable(dbName, tbName)
try{ unsubscribeTable(tableName=streamTbName, actionName="insertDB") }catch(ex){}
subscribeTable(tableName=streamTbName, actionName="insertDB", offset=-2,
               handler=tradeTb, msgAsTable=true, batchSize=10000,
               throttle=1, persistOffset=true)

// ============================================================
// 2. 品种与代理配置
// ============================================================

// 现货 & USDT-M 公共品种
usdtSymbols = ["btcusdt","ethusdt","solusdt","bnbusdt"]

// Coin-M 品种 (USD 计价)
coinSymbols = ["btcusd_perp","ethusd_perp"]

// USDT-M 交割品种 (有活跃季度合约时填入, 如 "btcusdt_250627","ethusdt_250627")
umDeliverySymbols = ["btcusdt_260925","ethusdt_260925"]

// Coin-M 交割品种 (有活跃季度合约时填入)
cmDeliverySymbols = ["btcusd_260925","ethusd_260925"]

// 代理地址 (无代理则留空字符串)
proxy = "http://192.168.110.5:7890/"

// ============================================================
// 3. 工具函数
// ============================================================

// 构建币安组合流 URL
def buildAggTradeUrl(baseUrl, symbols) {
    if(symbols.size() == 0) return ""
    streams = symbols + "@aggTrade"
    return baseUrl + "/stream?streams=" + concat(streams, "/")
}

// regexFindStr 返回完整匹配而非捕获组, 需用 substr 截取值部分
// 从 JSON 字符串提取字符串字段值: "key":"value" → "value"
def extractStr(json, key) {
    tag = "\"" + key + "\":\""
    matched = regexFindStr(json, tag + "[^\"]*\"", true)
    if(matched == "") return NULL
    tagLen = strlen(tag)
    return substr(matched, tagLen, strlen(matched) - tagLen - 1)
}

// 从 JSON 字符串提取数值字段值: "key":12345 → "12345"
def extractNum(json, key) {
    tag = "\"" + key + "\":"
    matched = regexFindStr(json, tag + "[^,}\\s]+", true)
    if(matched == "") return NULL
    tagLen = strlen(tag)
    return substr(matched, tagLen, strlen(matched) - tagLen)
}

// ============================================================
// 4. WebSocket 回调函数
// ============================================================

def tradeOnOpen(ws) {
    writeLog("[TradeWS] Connected")
}

def tradeOnError(ws, error) {
    writeLog("[TradeWS] Error: " + string(error))
}

def tradeOnClose(ws, statusCode, msg) {
    writeLog("[TradeWS] Closed, code=" + string(statusCode) + ", msg=" + string(msg))
}

// 核心 aggTrade 消息处理: 解析 JSON, 写入流表
// aggTrade 响应格式 (Spot/USDT-M/Coin-M 统一):
//   {"e":"aggTrade","E":1672531200000,"s":"BTCUSDT","a":12345,
//    "p":"16500.00","q":"0.500","f":100,"l":105,"T":1672531200000,"m":true}
// 组合流包装:
//   {"stream":"btcusdt@aggTrade","data":{...}}
def processAggTrade(ws, data, exchange, marketType) {
    n = data.rows()
    if(n == 0) return

    st = objByName("Crypto_aggTradeST")

    eventTimes      = array(TIMESTAMP, 0)
    collectionTimes = array(TIMESTAMP, 0)
    exchanges       = array(SYMBOL, 0)
    marketTypes     = array(SYMBOL, 0)
    symbols         = array(SYMBOL, 0)
    prices          = array(DOUBLE, 0)
    quantities      = array(DOUBLE, 0)
    sides           = array(STRING, 0)
    tradeIds        = array(LONG, 0)
    firstIds        = array(LONG, 0)
    lastIds         = array(LONG, 0)

    for(i in 0..(n-1)) {
        try {
            s = string(data.msg[i])

            // 处理币安应用层 ping (Spot 市场会发送 {"ping":<id>})
            if(regexFind(s, "\"ping\"") >= 0 && regexFind(s, "\"pong\"") < 0) {
                pong = regexReplace(s, "\"ping\"", "\"pong\"")
                WebSocket::send(ws, blob(pong))
                continue
            }

            // 仅处理 aggTrade 事件
            if(regexFind(s, "\"e\":\"aggTrade\"") < 0) continue

            // 提取字段 (使用 regex 从 JSON 中提取)
            // 注意: "E" 是 eventTime, "e" 是事件类型, 大小写不同不会冲突
            evt = extractNum(s, "E")   // eventTime (毫秒时间戳)
            sym = extractStr(s, "s")   // symbol
            p   = extractStr(s, "p")   // price
            q   = extractStr(s, "q")   // quantity
            m   = extractNum(s, "m")   // isBuyerMaker (true=sell, false=buy)
            a   = extractNum(s, "a")   // aggTradeId
            f   = extractNum(s, "f")   // firstId
            l   = extractNum(s, "l")   // lastId

            // 校验必要字段
            if(isNull(evt) || isNull(sym) || isNull(p)) continue

            eventTimes.append!(timestamp(long(evt)))
            collectionTimes.append!(now())
            exchanges.append!(exchange)
            marketTypes.append!(marketType)
            symbols.append!(sym)
            prices.append!(double(p))
            quantities.append!(isNull(q) ? 0d : double(q))
            sides.append!(iif(m == "true", "sell", "buy"))
            tradeIds.append!(isNull(a) ? 0l : long(a))
            firstIds.append!(isNull(f) ? 0l : long(f))
            lastIds.append!(isNull(l) ? 0l : long(l))
        } catch(ex) {
            writeLog("[TradeWS] Parse error: " + string(ex))
        }
    }

    cnt = eventTimes.size()
    if(cnt > 0) {
        result = table(
            eventTimes as eventTime,
            collectionTimes as collectionTime,
            exchanges as exchange,
            marketTypes as marketType,
            symbols as symbol,
            prices as price,
            quantities as quantity,
            sides as side,
            tradeIds as tradeId,
            firstIds as firstId,
            lastIds as lastId
        )
        st.append!(result)
    }
}

// 各市场 onMessage 包装函数
def spotTradeOnMsg(ws, data)    { processAggTrade(ws, data, "Binance-Spot", "SPOT") }
def umPerpTradeOnMsg(ws, data)  { processAggTrade(ws, data, "Binance-UM",  "SWAP") }
def umDelivTradeOnMsg(ws, data) { processAggTrade(ws, data, "Binance-UM",  "FUTURE") }
def cmPerpTradeOnMsg(ws, data)  { processAggTrade(ws, data, "Binance-CM",  "SWAP") }
def cmDelivTradeOnMsg(ws, data) { processAggTrade(ws, data, "Binance-CM",  "FUTURE") }

// ============================================================
// 5. 构建各市场 WebSocket URL
// ============================================================
//
// 币安官方 WebSocket 端点 (Ref: Binance API Docs):
//   现货:        wss://stream.binance.com:9443
//   USDT-M 合约: wss://fstream.binance.com  (永续+交割共用)
//   Coin-M 合约: wss://dstream.binance.com  (永续+交割共用)
//
// 组合流格式: /stream?streams=<stream1>/<stream2>/...
// 单流格式:   /ws/<streamName>
// ============================================================

spotUrl    = buildAggTradeUrl("wss://stream.binance.com:9443", usdtSymbols)
umPerpUrl  = buildAggTradeUrl("wss://fstream.binance.com", usdtSymbols)
umDelivUrl = buildAggTradeUrl("wss://fstream.binance.com", umDeliverySymbols)
cmPerpUrl  = buildAggTradeUrl("wss://dstream.binance.com", coinSymbols)
cmDelivUrl = buildAggTradeUrl("wss://dstream.binance.com", cmDeliverySymbols)

// ============================================================
// 6. 启动 WebSocket 订阅
// ============================================================

config = dict(STRING, ANY)
if(strlen(proxy) > 0) config["proxy"] = proxy
config["reconnectCount"] = -1  // 无限重连

// --- Spot ---
ws1 = WebSocket::createSubJob(spotUrl, tradeOnOpen, spotTradeOnMsg, tradeOnError, tradeOnClose, "spot_trade", config)
writeLog("[TradeWS] Spot started: " + spotUrl)

// --- USDT-M Perpetual ---
ws2 = WebSocket::createSubJob(umPerpUrl, tradeOnOpen, umPerpTradeOnMsg, tradeOnError, tradeOnClose, "um_perp_trade", config)
writeLog("[TradeWS] USDT-M Perp started: " + umPerpUrl)

// --- USDT-M Delivery ---
if(umDeliverySymbols.size() > 0) {
    ws3 = WebSocket::createSubJob(umDelivUrl, tradeOnOpen, umDelivTradeOnMsg, tradeOnError, tradeOnClose, "um_deliv_trade", config)
    writeLog("[TradeWS] USDT-M Delivery started: " + umDelivUrl)
}else{
    writeLog("[TradeWS] USDT-M Delivery: skipped (no symbols configured)")
}

// --- Coin-M Perpetual ---
ws4 = WebSocket::createSubJob(cmPerpUrl, tradeOnOpen, cmPerpTradeOnMsg, tradeOnError, tradeOnClose, "cm_perp_trade", config)
writeLog("[TradeWS] Coin-M Perp started: " + cmPerpUrl)

// --- Coin-M Delivery ---
if(cmDeliverySymbols.size() > 0) {
    ws5 = WebSocket::createSubJob(cmDelivUrl, tradeOnOpen, cmDelivTradeOnMsg, tradeOnError, tradeOnClose, "cm_deliv_trade", config)
    writeLog("[TradeWS] Coin-M Delivery started: " + cmDelivUrl)
}else{
    writeLog("[TradeWS] Coin-M Delivery: skipped (no symbols configured)")
}

// ============================================================
// 7. 管理命令 (在 DolphinDB GUI 中手动执行)
// ============================================================
//
// 查看订阅状态:
//   WebSocket::getSubJobStat()
//
// 停止所有订阅:
//   WebSocket::cancelSubJob("spot_trade")
//   WebSocket::cancelSubJob("um_perp_trade")
//   WebSocket::cancelSubJob("um_deliv_trade")
//   WebSocket::cancelSubJob("cm_perp_trade")
//   WebSocket::cancelSubJob("cm_deliv_trade")
//
// 查看已采集数据:
//   select top 100 * from Crypto_aggTradeST
//   select count(*) from Crypto_aggTradeST group by exchange, marketType

最新文章

随机文章

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-05-10 23:29:28 HTTP/2.0 GET : https://mffb.com.cn/a/533255.html
  2. 运行时间 : 0.108192s [ 吞吐率:9.24req/s ] 内存消耗:4,630.95kb 文件加载:140
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=6d0c4025517b5c098b45487990731acb
  1. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/composer/autoload_static.php ( 4.90 KB )
  7. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  10. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  11. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  12. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  13. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  14. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  15. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  16. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  17. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  18. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  19. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  21. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  22. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/app/provider.php ( 0.19 KB )
  23. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  24. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  25. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  26. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/app/common.php ( 0.03 KB )
  27. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  28. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  29. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/app.php ( 0.95 KB )
  30. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/cache.php ( 0.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/console.php ( 0.23 KB )
  32. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/cookie.php ( 0.56 KB )
  33. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/database.php ( 2.48 KB )
  34. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  35. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/filesystem.php ( 0.61 KB )
  36. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/lang.php ( 0.91 KB )
  37. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/log.php ( 1.35 KB )
  38. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/middleware.php ( 0.19 KB )
  39. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/route.php ( 1.89 KB )
  40. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/session.php ( 0.57 KB )
  41. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/trace.php ( 0.34 KB )
  42. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/config/view.php ( 0.82 KB )
  43. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/app/event.php ( 0.25 KB )
  44. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  45. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/app/service.php ( 0.13 KB )
  46. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/app/AppService.php ( 0.26 KB )
  47. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  48. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  49. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  50. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  51. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  52. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/services.php ( 0.14 KB )
  53. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  54. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  55. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  56. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  57. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  58. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  59. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  60. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  61. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  62. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  63. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  64. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  65. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  66. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  67. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  68. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  69. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  70. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  71. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  72. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  73. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  74. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  75. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  76. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  77. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  78. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  79. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  80. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  81. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  82. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  83. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/app/Request.php ( 0.09 KB )
  84. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  85. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/app/middleware.php ( 0.25 KB )
  86. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  87. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  88. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  89. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  90. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  91. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  92. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  93. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  94. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  95. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  96. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  97. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  98. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  99. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/route/app.php ( 1.72 KB )
  100. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  101. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  102. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  103. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/app/controller/Index.php ( 4.81 KB )
  104. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/app/BaseController.php ( 2.05 KB )
  105. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  106. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  108. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  109. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  110. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  111. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  112. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  113. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  114. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  115. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  116. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  117. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  118. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  119. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  120. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  121. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  122. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  123. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  124. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  125. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  126. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  127. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  128. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  129. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  130. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  131. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  132. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  133. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  134. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  135. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  136. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  137. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  138. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  139. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/runtime/temp/49e0cd3e0528281c5c5fae705243dc37.php ( 11.98 KB )
  140. /yingpanguazai/ssd/ssd1/www/mffb.com.cn/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000538s ] mysql:host=127.0.0.1;port=3306;dbname=no_mfffb;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000910s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000371s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.003331s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000673s ]
  6. SELECT * FROM `set` [ RunTime:0.002008s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000726s ]
  8. SELECT * FROM `article` WHERE `id` = 533255 LIMIT 1 [ RunTime:0.001766s ]
  9. UPDATE `article` SET `lasttime` = 1778426968 WHERE `id` = 533255 [ RunTime:0.015879s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 65 LIMIT 1 [ RunTime:0.005394s ]
  11. SELECT * FROM `article` WHERE `id` < 533255 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.000875s ]
  12. SELECT * FROM `article` WHERE `id` > 533255 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.000467s ]
  13. SELECT * FROM `article` WHERE `id` < 533255 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.000612s ]
  14. SELECT * FROM `article` WHERE `id` < 533255 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.007072s ]
  15. SELECT * FROM `article` WHERE `id` < 533255 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.000811s ]
0.109725s