本笔记记录针对CTP流控限制的优化方案,核心思路:减少主动查询,用Tick驱动状态更新
更新日志:新增GetFrontInfo接口获取前置信息及流控参数
CTP接口有流控限制(约1秒2-3次查询),频繁调用ReqQryTradingAccount和ReqQryInvestorPosition容易触发流控错误。解决方案:

GetFrontInfo用于获取已连接前置的信息,包括前置地址、查询流控参数和FTD流控参数。连接成功后调用可获取正确的前置地址,登录成功后调用可获取完整的流控参数。
virtualvoidGetFrontInfo(CThostFtdcFrontInfoField* pFrontInfo)= 0;///<summary>
/// GetFrontInfo调用示例 - C#封装版本
///</summary>
publicvoidGetFrontInfoExample()
{
// 声明前置信息结构体
CThostFtdcFrontInfoField frontInfo = new CThostFtdcFrontInfoField();
// 调用接口获取信息
pUserApi.GetFrontInfo(ref frontInfo);
// 输出前置地址
Console.WriteLine($"前置地址: {frontInfo.FrontAddr}");
// 输出FTD流控参数(包/秒)
Console.WriteLine($"FTD流控: {frontInfo.FTDPkgFreq} 包/秒");
// 输出查询流控参数(次/秒)
Console.WriteLine($"查询流控: {frontInfo.QryFreq} 次/秒");
}CTP流控有两层限制:
///<summary>
/// 流控参数动态适配器 - 根据GetFrontInfo返回值自动调整
///</summary>
publicclassFlowControlAdapter
{
privateint _maxQueryPerSecond = 2; // 默认值
privateint _maxPacketsPerSecond = 3; // 默认值
privatedouble _queryInterval = 500; // 毫秒
private DateTime _lastQueryTime;
privateint _queryCountInSecond;
private DateTime _secondStart;
privatereadonlyobject _lock = newobject();
///<summary>
/// 从GetFrontInfo更新流控参数
///</summary>
publicvoidUpdateFlowControl(CThostFtdcFrontInfoField frontInfo)
{
if (frontInfo.QryFreq > 0)
{
_maxQueryPerSecond = frontInfo.QryFreq;
// 查询间隔 = 1000 / QryFreq - 安全余量
_queryInterval = Math.Max(100.0, 1000.0 / _maxQueryPerSecond - 50);
Log($"更新查询流控: {_maxQueryPerSecond}次/秒, 间隔{_queryInterval:F0}ms");
}
if (frontInfo.FTDPkgFreq > 0)
{
_maxPacketsPerSecond = frontInfo.FTDPkgFreq;
Log($"更新FTD流控: {_maxPacketsPerSecond}包/秒");
}
}
///<summary>
/// 检查是否可以发起查询
///</summary>
publicboolCanQuery()
{
lock (_lock)
{
DateTime now = DateTime.Now;
// 重置秒计数器
if ((now - _secondStart).TotalSeconds >= 1)
{
_queryCountInSecond = 0;
_secondStart = now;
}
// 检查间隔限制
if ((now - _lastQueryTime).TotalMilliseconds < _queryInterval)
returnfalse;
// 检查秒级限制
if (_queryCountInSecond >= _maxQueryPerSecond)
returnfalse;
returntrue;
}
}
///<summary>
/// 执行查询前调用,记录查询
///</summary>
publicvoidRecordQuery()
{
lock (_lock)
{
_lastQueryTime = DateTime.Now;
_queryCountInSecond++;
}
}
///<summary>
/// 计算安全等待时间(毫秒)
///</summary>
publicintGetWaitTimeMs()
{
lock (_lock)
{
DateTime now = DateTime.Now;
// 如果超过秒级限制,等待到下一秒
if (_queryCountInSecond >= _maxQueryPerSecond)
{
double waitToNextSecond = 1000 - (now - _secondStart).TotalMilliseconds;
return (int)Math.Max(waitToNextSecond, 0);
}
// 间隔限制
double elapsed = (now - _lastQueryTime).TotalMilliseconds;
if (elapsed < _queryInterval)
return (int)(_queryInterval - elapsed);
return0;
}
}
}安全查询间隔:
其中:
最大包速率控制:
///<summary>
/// 自选合约 - 存储在本地
///</summary>
publicclassWatchlistItem
{
publicstring InstrumentId { get; set; } // 合约代码
publicstring InstrumentName { get; set; } // 合约名称
public DateTime AddedTime { get; set; } // 添加时间
publicstring GroupId { get; set; } // 所属分组
publicbool IsSubscribed { get; set; } // 订阅状态
// 预警设置
publicbool EnablePriceAlert { get; set; } // 价格预警开关
publicdouble AlertPrice { get; set; } // 预警价格
publicbool EnablePercentAlert { get; set; } // 涨跌幅预警开关
publicdouble AlertPercent { get; set; } // 预警涨跌幅
// 实时行情缓存
publicdouble LastPrice { get; set; }
publicdouble Change { get; set; }
publicdouble ChangePercent { get; set; }
public DateTime LastUpdateTime { get; set; }
}成交后自动添加逻辑:

///<summary>
/// 成交回调处理 - 这是流控优化的关键入口
///</summary>
publicvoidOnTradeFilled(TradeRsp tradeInfo)
{
string instId = tradeInfo.InstrumentID;
// 步骤1: 检查本地自选表
if (!_watchlist.ContainsKey(instId))
{
// 添加到"持仓相关"分组
AddToWatchlist(instId, tradeInfo.InstrumentName, "position_group");
Log($"自动添加自选: {instId}");
}
// 步骤2: 订阅行情(仅当未订阅时)
if (!_subscribedSet.Contains(instId))
{
SubscribeQuote(instId);
_subscribedSet.Add(instId);
Log($"订阅行情: {instId}");
}
// 步骤3: 更新本地持仓缓存(不查CTP)
UpdateLocalPosition(tradeInfo);
}position_group | ||
pending_group | ||
watch_group | ||
alert_group |
浮动盈亏计算:
对于多头持仓:
对于空头持仓:
其中:
账户权益计算:
风险率计算:
其中 为合约保证金率

///<summary>
/// Tick收到时的处理 - 不查CTP,直接用Tick更新本地状态
///</summary>
publicvoidOnTickReceived(TickData tick)
{
string instId = tick.InstrumentID;
// 1. 更新自选行情显示
if (_watchlist.TryGetValue(instId, outvar item))
{
item.LastPrice = tick.LastPrice;
item.Change = tick.LastPrice - item.PreSettlement;
item.ChangePercent = (item.Change / item.PreSettlement) * 100;
item.LastUpdateTime = DateTime.Now;
// 检查预警
CheckAlert(item, tick.LastPrice);
}
// 2. 更新持仓盈亏(如果有持仓)
if (_localPositions.TryGetValue(instId, outvar pos))
{
// 用Tick价格计算浮动盈亏,不查CTP
double oldProfit = pos.FloatingProfit;
pos.LastPrice = tick.LastPrice;
pos.FloatingProfit = CalculateFloatingProfit(pos, tick.LastPrice);
// 累加账户权益变化
_localAccount.Equity += (pos.FloatingProfit - oldProfit);
_localAccount.RiskRate = CalculateRiskRate(_localAccount.Equity, _localPositions.Values);
}
}///<summary>
/// 本地持仓结构 - 成交时维护,Tick时更新价格
///</summary>
publicclassLocalPosition
{
publicstring InstrumentId { get; set; }
publicstring Direction { get; set; } // "Long" 或 "Short"
publicint Volume { get; set; } // 净持仓
publicint TodayVolume { get; set; } // 今仓
publicint YdVolume { get; set; } // 昨仓
publicdouble OpenPrice { get; set; } // 开仓均价
publicdouble LastPrice { get; set; } // 最新价(来自Tick)
publicdouble FloatingProfit { get; set; } // 浮动盈亏
publicdouble Margin { get; set; } // 占用保证金
///<summary>
/// 开仓均价更新(加权平均)
///</summary>
publicvoidAddPosition(double price, int volume)
{
double totalCost = OpenPrice * Volume + price * volume;
Volume += volume;
OpenPrice = totalCost / Volume;
Recalculate(price);
}
///<summary>
/// 平仓 - 返回平仓盈亏
///</summary>
publicdoubleClosePosition(double price, int volume)
{
double realizedProfit = (Direction == "Long" ? price - OpenPrice : OpenPrice - price) * volume;
Volume -= volume;
return realizedProfit;
}
///<summary>
/// 重新计算浮动盈亏和保证金
///</summary>
publicvoidRecalculate(double lastPrice)
{
LastPrice = lastPrice;
if (Direction == "Long")
FloatingProfit = (LastPrice - OpenPrice) * Volume * GetMultiplier(InstrumentId);
else
FloatingProfit = (OpenPrice - LastPrice) * Volume * GetMultiplier(InstrumentId);
Margin = LastPrice * Volume * GetMultiplier(InstrumentId) * GetMarginRate(InstrumentId);
}
}///<summary>
/// 定时查询配置 - 平衡流控和数据准确性
/// 动态调整为基于GetFrontInfo的流控参数
///</summary>
publicclassQueryScheduler
{
private Timer _accountTimer; // 账户查询定时器
private Timer _positionTimer; // 持仓查询定时器
private FlowControlAdapter _flowAdapter;
publicQueryScheduler(FlowControlAdapter flowAdapter)
{
_flowAdapter = flowAdapter;
// 动态确定查询间隔(基于流控参数)
int queryInterval = DetermineOptimalInterval();
_accountTimer = new Timer(QueryAccountCallback, null, queryInterval, queryInterval);
_positionTimer = new Timer(QueryPositionCallback, null, queryInterval, queryInterval);
}
///<summary>
/// 确定最优查询间隔
/// QryFreq = 2 → 间隔400-500ms
/// QryFreq = 3 → 间隔280-330ms
/// 默认 → 500ms
///</summary>
privateintDetermineOptimalInterval()
{
int qryFreq = _flowAdapter?.GetQueryFreq() ?? 2;
// 间隔 = 1000 / QryFreq - 安全余量
return Math.Max(300, 1000 / qryFreq - 100);
}
///<summary>
/// 定时查询账户 - 用于校准本地状态
///</summary>
privatevoidQueryAccountCallback(object state)
{
// 流控检查
if (!_flowAdapter.CanQuery())
{
int waitMs = _flowAdapter.GetWaitTimeMs();
Log($"流控限制,延迟{waitMs}ms后查询");
Thread.Sleep(waitMs);
}
// 仅在本地计算超过阈值时查询
if (Math.Abs(_localAccount.Equity - _lastQueriedEquity) > 1000)
{
_flowAdapter.RecordQuery();
CTPQueryAccount();
}
}
}///<summary>
/// 智能查询策略 - 减少不必要的CTP查询
///</summary>
publicclassSmartQueryStrategy
{
// 查询触发条件
privateconstdouble EQUITY_THRESHOLD = 1000; // 权益变化超1000元
privateconstdouble POSITION_THRESHOLD = 0.01; // 持仓变化超1%
privateconstint MAX_QUERY_INTERVAL = 60000; // 最长间隔60秒
privateconstint MIN_QUERY_INTERVAL = 30000; // 最短间隔30秒
private DateTime _lastQueryTime;
privatedouble _lastQueriedEquity;
private FlowControlAdapter _flowAdapter;
publicboolNeedQueryAccount(double currentLocalEquity)
{
// 条件0:流控检查
if (!_flowAdapter.CanQuery())
returnfalse;
// 条件1:间隔时间
if ((DateTime.Now - _lastQueryTime).TotalMilliseconds < MIN_QUERY_INTERVAL)
returnfalse;
// 条件2:变化幅度
double changePercent = Math.Abs(currentLocalEquity - _lastQueriedEquity) /
Math.Max(_lastQueriedEquity, 1.0);
if (changePercent > POSITION_THRESHOLD)
returntrue;
// 条件3:强制查询(超时)
if ((DateTime.Now - _lastQueryTime).TotalMilliseconds > MAX_QUERY_INTERVAL)
returntrue;
returnfalse;
}
}///<summary>
/// 完整初始化流程 - 包含GetFrontInfo调用
///</summary>
publicclassCTPInitializer
{
private CThostFtdcUserApi _userApi;
private FlowControlAdapter _flowAdapter;
private QueryScheduler _scheduler;
publicvoidInitialize(string frontAddr)
{
// 步骤1: 创建API实例
_userApi = CThostFtdcUserApi.CreateFtdcUserApi("./ctp_config/");
_userApi.RegisterFront(frontAddr);
// 步骤2: 注册回调
_userApi.RegisterSpi(new CTPTraderSpi(this));
// 步骤3: 连接
_userApi.Init();
// 等待连接成功...
}
///<summary>
/// 连接成功回调 - 调用GetFrontInfo
///</summary>
publicvoidOnFrontConnected()
{
Log("前置连接成功");
// 关键:调用GetFrontInfo获取流控参数
CThostFtdcFrontInfoField frontInfo = new CThostFtdcFrontInfoField();
_userApi.GetFrontInfo(ref frontInfo);
Log($"前置地址: {frontInfo.FrontAddr}");
Log($"FTD流控: {frontInfo.FTDPkgFreq} 包/秒");
Log($"查询流控: {frontInfo.QryFreq} 次/秒");
// 初始化流控适配器
_flowAdapter = new FlowControlAdapter();
_flowAdapter.UpdateFlowControl(frontInfo);
// 登录
ReqUserLogin();
}
///<summary>
/// 登录成功回调 - 再次调用GetFrontInfo获取完整参数
///</summary>
publicvoidOnRspUserLogin()
{
Log("用户登录成功");
// 登录成功后再次获取,此时流控参数更准确
CThostFtdcFrontInfoField frontInfo = new CThostFtdcFrontInfoField();
_userApi.GetFrontInfo(ref frontInfo);
_flowAdapter.UpdateFlowControl(frontInfo);
// 启动定时查询调度器(基于动态参数)
_scheduler = new QueryScheduler(_flowAdapter);
// 首次全量查询账户和持仓
FullQuery();
}
}///<summary>
/// 流控错误码映射
///</summary>
publicclassFlowControlErrorHandler
{
// CTP流控错误码
privateconstint ERR_FLOW_CONTROL_PACKET = 10003; // 包数超限
privateconstint ERR_FLOW_CONTROL_QUERY = 10004; // 查询频次超限
publicvoidOnRspError(RspInfoField rspInfo)
{
if (rspInfo.ErrorID == ERR_FLOW_CONTROL_PACKET)
{
Log($"FTD流控超限: {rspInfo.ErrorMsg}");
// 降低请求频率
ReduceRequestRate(1.5);
}
elseif (rspInfo.ErrorID == ERR_FLOW_CONTROL_QUERY)
{
Log($"查询流控超限: {rspInfo.ErrorMsg}");
// 增加查询间隔
IncreaseQueryInterval(500);
}
}
privatevoidReduceRequestRate(double factor)
{
// 动态降低请求速率
_maxPacketsPerSecond = (int)(_maxPacketsPerSecond / factor);
_queryInterval *= factor;
}
}///<summary>
/// 流控参数的动态调整
///</summary>
publicvoidUpdateFlowControlParams()
{
// 获取当前流控参数
CThostFtdcFrontInfoField frontInfo = new CThostFtdcFrontInfoField();
// 在连接成功后调用
_userApi.GetFrontInfo(ref frontInfo);
// 更新本地参数
_qryFreq = frontInfo.QryFreq;
_ftdFreq = frontInfo.FTDPkgFreq;
// 动态计算公式
// 查询间隔 = 1000 / QryFreq - 安全余量(100ms)
double safeInterval = 1000.0 / _qryFreq - 100;
_minQueryInterval = Math.Max(200, (int)safeInterval);
// 每秒最大包数 = FTD流控 * 80% (安全系数)
_maxPacketsPerSec = (int)(_ftdFreq * 0.8);
// 订阅、查询各分配比例
// 查询占30%,订阅占20%,预留50%
_maxQueryPerSec = (int)(_maxPacketsPerSec * 0.3);
}///<summary>
/// 开仓时的本地状态更新(不查CTP)
///</summary>
publicvoidOnOpenPosition(TradeRsp trade)
{
string id = trade.InstrumentID;
// 1. 确保在自选和订阅中
EnsureInWatchlistAndSubscribed(id);
// 2. 更新本地持仓
if (!_localPositions.ContainsKey(id))
{
_localPositions[id] = new LocalPosition
{
InstrumentId = id,
Direction = trade.Direction,
Volume = trade.Volume,
TodayVolume = trade.IsToday ? trade.Volume : 0,
YdVolume = trade.IsToday ? 0 : trade.Volume,
OpenPrice = trade.Price,
LastPrice = _latestTicks.ContainsKey(id) ? _latestTicks[id].LastPrice : trade.Price
};
}
else
{
var pos = _localPositions[id];
// 同向:加仓,更新均价
if (pos.Direction == trade.Direction)
{
pos.AddPosition(trade.Price, trade.Volume);
if (trade.IsToday) pos.TodayVolume += trade.Volume;
else pos.YdVolume += trade.Volume;
}
else
{
// 反向:平仓
double profit = pos.ClosePosition(trade.Price, trade.Volume);
_realizedProfit += profit;
}
}
// 3. 更新本地账户
double margin = trade.Price * trade.Volume * GetMultiplier(id) * GetMarginRate(id);
_localAccount.Margin += _localPositions.ContainsKey(id) ?
_localPositions[id].Margin : margin;
_localAccount.Available -= margin;
}安全查询间隔:
最大包速率:
动态查询频率:
开仓均价(加权平均):
浮动盈亏(多头):
浮动盈亏(空头):
平仓盈亏:
总权益:
风险率:
保证金计算:

///<summary>
/// 流控优化配置参数(基于GetFrontInfo动态获取)
///</summary>
publicclassFlowControlConfig
{
// ====== 从GetFrontInfo获取 ======
publicint QryFreq { get; set; } = 2; // 查询流控(次/秒)
publicint FTDPkgFreq { get; set; } = 3; // FTD流控(包/秒)
publicstring FrontAddr { get; set; } // 前置地址
// ====== 动态计算参数 ======
publicint SafeQueryInterval => Math.Max(200, 1000 / Math.Max(1, QryFreq) - 100);
publicint MaxQueryPerSec => (int)(FTDPkgFreq * 0.3);
// ====== 定时查询配置 ======
publicint AccountQueryInterval { get; set; } = 30000; // 账户30秒
publicint PositionQueryInterval { get; set; } = 30000; // 持仓30秒
// ====== 触发查询的变化阈值 ======
publicdouble EquityChangeThreshold { get; set; } = 1000; // 权益变化1000元
publicdouble PositionChangeThreshold { get; set; } = 0.01; // 持仓变化1%
// ====== 自选管理 ======
publicbool AutoAddOnTrade { get; set; } = true; // 成交自动添加
publicint PositionGroupRetention { get; set; } = 30; // 持仓组保留30分钟
// ====== 预警设置 ======
publicbool AlertEnabled { get; set; } = true;
}A: 建议调用两次:
OnFrontConnected回调后立即调用,获取前置地址和基础流控参数OnRspUserLogin登录成功后再次调用,此时流控参数更准确A:
QryFreq:限制主动发起查询的次数(如ReqQryTradingAccount)FTDPkgFreq:限制所有数据包的总数(包括查询、订阅、行情推送、成交回报等)A: 使用公式 ,例如:
A:
GetFrontInfo重新获取当前流控参数A: 会有限制。建议:
A: 定期校验机制会处理:
注意事项: