Section outline

  • 在线服务 (Online Serving)


    简介

    除了回测,检验模型有效性的一种方法是在真实市场条件下进行预测,甚至根据这些预测进行真实交易。在线服务是一套模块,用于使用最新数据进行在线模型预测,包括 Online ManagerOnline StrategyOnline ToolUpdater

    此处有几个示例可供参考,它们展示了在线服务的不同功能。如果您有许多模型或任务需要管理,请考虑任务管理。这些示例基于任务管理中的一些组件,例如 TrainerRMCollector

    注意:用户应该保持其数据源更新,以支持在线服务。例如,Qlib 提供一批脚本来帮助用户更新 Yahoo 每日数据。

    当前已知限制

    • 目前支持下一个交易日的每日更新预测。但由于公共数据的限制,不支持为下一个交易日生成订单。


    在线管理器 (Online Manager)

    OnlineManager 可以管理一组 Online Strategy 并动态运行它们。

    随着时间的变化,决定性的模型也会随之改变。在此模块中,我们将这些贡献模型称为在线模型。在每次例行程序(例如每天或每分钟)中,在线模型可能会发生变化,其预测需要更新。因此,该模块提供了一系列方法来控制此过程。

    该模块还提供了一种在历史中模拟 Online Strategy 的方法。这意味着您可以验证您的策略或找到更好的策略。

    在不同情况下使用不同的训练器总共有 4 种情况:

    情况 描述
    Online + Trainer 当您想执行真实例行程序时,Trainer 将帮助您训练模型。它将逐个任务、逐个策略地训练模型。
    Online + DelayTrainer DelayTrainer 将跳过具体的训练,直到所有任务都由不同的策略准备好。它让用户可以在 routinefirst_train 结束时并行训练所有任务。否则,当每个策略准备任务时,这些函数会卡住。
    Simulation + Trainer 它的行为与 Online + Trainer 相同。唯一的区别是它用于模拟/回测而不是在线交易。
    Simulation + DelayTrainer 当您的模型没有任何时间依赖性时,您可以使用 DelayTrainer 来实现多任务处理。这意味着所有例行程序中的所有任务都可以在模拟结束时进行真实训练。信号将在不同的时间段(基于是否有新模型上线)准备好。

    以下是一些伪代码,展示了每种情况的工作流。

    • 为简单起见

      • 策略中只使用一个策略。

      • update_online_pred 仅在在线模式下调用并被忽略。

    Online + Trainer

    Python
    tasks = first_train()
    models = trainer.train(tasks)
    trainer.end_train(models)
    for day in online_trading_days:
        # OnlineManager.routine
        models = trainer.train(strategy.prepare_tasks())  # for each strategy
        strategy.prepare_online_models(models)  # for each strategy
    
        trainer.end_train(models)
        prepare_signals()  # prepare trading signals daily
    

    Online + DelayTrainer:工作流与 Online + Trainer 相同。

    Simulation + DelayTrainer

    Python
    # simulate
    tasks = first_train()
    models = trainer.train(tasks)
    for day in historical_calendars:
        # OnlineManager.routine
        models = trainer.train(strategy.prepare_tasks())  # for each strategy
        strategy.prepare_online_models(models)  # for each strategy
    # delay_prepare()
    # FIXME: Currently the delay_prepare is not implemented in a proper way.
    trainer.end_train(<for all previous models>)
    prepare_signals()
    

    我们能简化当前的工作流吗?

    • 可以减少任务状态的数量吗?

    • 对于每个任务,我们有三个阶段(即任务、部分训练的任务、最终训练的任务)。

    class qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')

    OnlineManager 可以通过 Online Strategy 管理在线模型。它还提供了一个历史记录,记录了在什么时间哪些模型在线。

    __init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')

    初始化 OnlineManager。一个 OnlineManager 必须至少有一个 OnlineStrategy。

    参数:

    • strategies (Union[OnlineStrategy, List[OnlineStrategy]]) – OnlineStrategy 的一个实例或 OnlineStrategy 的一个列表。

    • begin_time (Union[str, pd.Timestamp], optional) – OnlineManager 将在此时间开始。默认为 None 以使用最新日期。

    • trainer (qlib.model.trainer.Trainer) – 用于训练任务的训练器。None 表示使用 TrainerR

    • freq (str, optional) – 数据频率。默认为“day”。

    first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})

    从每个策略的 first_tasks 方法获取任务并训练它们。如果使用 DelayTrainer,它可以在每个策略的 first_tasks 之后一起完成所有训练。

    参数:

    • strategies (List[OnlineStrategy]) – 策略列表(添加策略时需要此参数)。None 表示使用默认策略。

    • model_kwargs (dict) – prepare_online_models 的参数。

    routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})

    每个策略的典型更新过程并记录在线历史。

    例行程序之后的典型更新过程,例如日复一日或月复一月。该过程是:更新预测 -> 准备任务 -> 准备在线模型 -> 准备信号。

    如果使用 DelayTrainer,它可以在每个策略的 prepare_tasks 之后一起完成所有训练。

    参数:

    • cur_time (Union[str, pd.Timestamp], optional) – 在此时间运行例行程序方法。默认为 None

    • task_kwargs (dict) – prepare_tasks 的参数。

    • model_kwargs (dict) – prepare_online_models 的参数。

    • signal_kwargs (dict) – prepare_signals 的参数。

    get_collector(**kwargs) -> MergeCollector

    获取 Collector 的实例,以从每个策略收集结果。此收集器可以作为信号准备的基础。

    参数:

    • **kwargs – get_collector 的参数。

      返回:

      用于合并其他收集器的收集器。

      返回类型:MergeCollector

    add_strategy(strategies: OnlineStrategy | List[OnlineStrategy])

    向 OnlineManager 添加一些新策略。

    参数:

    • strategy (Union[OnlineStrategy, List[OnlineStrategy]]) – OnlineStrategy 的一个列表。

    prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)

    在准备好上次例行程序的数据(箱形图中的一个框)之后,即例行程序结束时,我们可以为下一次例行程序准备交易信号。

    注意:给定一组预测,所有在这些预测结束时间之前的信号都将准备好。即使最新的信号已经存在,最新的计算结果也会被覆盖。

    注意

    • 给定某个时间的预测,所有在此时间之前的信号都将准备好。

      参数:

    • prepare_func (Callable, optional) – 从收集后的字典中获取信号。默认为 AverageEnsemble()MergeCollector 收集的结果必须是 {xxx:pred}

    • over_write (bool, optional) – 如果为 True,新信号将覆盖。如果为 False,新信号将追加到信号末尾。默认为 False。

      返回:

      信号。

      返回类型:pd.DataFrame

    get_signals() -> Series | DataFrame

    获取准备好的在线信号。

    返回:

    pd.Series 用于每个日期时间只有一个信号。pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。

    返回类型:Union[pd.Series, pd.DataFrame]

    simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) -> Series | DataFrame

    从当前时间开始,此方法将模拟 OnlineManager 中的每个例行程序,直到结束时间。

    考虑到并行训练,模型和信号可以在所有例行程序模拟之后准备好。

    延迟训练方式可以是 DelayTrainer,延迟准备信号方式可以是 delay_prepare。

    参数:

    • end_time – 模拟将结束的时间。

    • frequency – 日历频率。

    • task_kwargs (dict) – prepare_tasks 的参数。

    • model_kwargs (dict) – prepare_online_models 的参数。

    • signal_kwargs (dict) – prepare_signals 的参数。

      返回:

      pd.Series 用于每个日期时间只有一个信号。pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。

      返回类型:Union[pd.Series, pd.DataFrame]

    delay_prepare(model_kwargs={}, signal_kwargs={})

    如果某些东西正在等待准备,则准备所有模型和信号。

    参数:

    • model_kwargsend_train 的参数。

    • signal_kwargsprepare_signals 的参数。


    在线策略 (Online Strategy)

    OnlineStrategy 模块是在线服务的一个元素。

    class qlib.workflow.online.strategy.OnlineStrategy(name_id: str)

    OnlineStrategy 与 Online Manager 协同工作,负责如何生成任务、更新模型和准备信号。

    __init__(name_id: str)

    初始化 OnlineStrategy。此模块必须使用 Trainer 来完成模型训练。

    参数:

    • name_id (str) – 唯一的名称或 ID。

    • trainer (qlib.model.trainer.Trainer, optional) – Trainer 的一个实例。默认为 None

    prepare_tasks(cur_time, **kwargs) -> List[dict]

    在例行程序结束之后,根据 cur_time(None 表示最新)检查我们是否需要准备和训练一些新任务。返回等待训练的新任务。

    您可以通过 OnlineTool.online_models 找到上次在线的模型。

    prepare_online_models(trained_models, cur_time=None) -> List[object]

    从训练好的模型中选择一些模型并将其设置为在线模型。这是一个将所有训练好的模型上线的典型实现,您可以重写它以实现复杂的方法。如果您仍然需要上次在线的模型,可以通过 OnlineTool.online_models 找到它们。

    注意:将所有在线模型重置为训练好的模型。如果没有训练好的模型,则什么也不做。

    注意:

    • 当前的实现非常简单。以下是一个更接近实际场景的更复杂情况。

      1. test_start 前一天(时间戳 T)训练新模型。

      2. 在 test_start(通常是时间戳 T + 1)切换模型。

        参数:

    • models (list) – 模型列表。

    • cur_time (pd.Dataframe) – 来自 OnlineManger 的当前时间。None 表示最新。

      返回:

      一个在线模型列表。

      返回类型:List[object]

    first_tasks() -> List[dict]

    首先生成一系列任务并返回它们。

    返回类型:List[dict]

    get_collector() -> Collector

    获取 Collector 的实例,以收集此策略的不同结果。

    例如:

    • Recorder 中收集预测。

    • 在文本文件中收集信号。

      返回:

      Collector

    class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

    此示例策略始终使用最新的滚动模型作为在线模型。

    __init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

    初始化 RollingStrategy。

    假设:name_id 的字符串、实验名称和训练器的实验名称相同。

    参数:

    • name_id (str) – 唯一的名称或 ID。也将是实验的名称。

    • task_template (Union[dict, List[dict]]) – task_template 的一个列表或单个模板,将用于使用 rolling_gen 生成许多任务。

    • rolling_gen (RollingGen) – RollingGen 的一个实例。

    get_collector(process_list=[<qlib.model.ens.group.RollingGroup object>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)

    获取 Collector 的实例以收集结果。返回的收集器必须区分不同模型中的结果。

    假设:模型可以根据模型名称和滚动测试段进行区分。如果您不希望此假设成立,请实现您的方法或使用另一个 rec_key_func。

    参数:

    • rec_key_func (Callable) – 获取记录器键的函数。如果为 None,则使用记录器 ID。

    • rec_filter_func (Callable, optional) – 通过返回 TrueFalse 来筛选记录器。默认为 None

    • artifacts_key (List[str], optional) – 您想要获取的工件键。如果为 None,则获取所有工件。

    first_tasks() -> List[dict]

    使用 rolling_gen 根据 task_template 生成不同的任务。

    返回:

    任务列表。

    返回类型:List[dict]

    prepare_tasks(cur_time) -> List[dict]

    根据 cur_time(None 表示最新)准备新任务。

    您可以通过 OnlineToolR.online_models 找到上次在线的模型。

    返回:

    一个新任务列表。

    返回类型:List[dict]

    在线工具 (Online Tool)


    OnlineTool 是一个用于设置和取消设置一系列在线模型的模块。在线模型是在某些时间点具有决定性的模型,它们可以随着时间的变化而变化。这使我们能够随着市场风格的变化而使用高效的子模型。

    class qlib.workflow.online.utils.OnlineTool

    OnlineTool 将在一个包含模型记录器的实验中管理在线模型。

    __init__()

    初始化 OnlineTool。

    set_online_tag(tag, recorder: list | object)

    为模型设置标签,以标记是否在线。

    参数:

    • tag (str) – ONLINE_TAGOFFLINE_TAG 中的标签。

    • recorder (Union[list, object]) – 模型的记录器。

    get_online_tag(recorder: object) -> str

    给定一个模型记录器并返回其在线标签。

    参数:

    • recorder (Object) – 模型的记录器。

      返回:

      在线标签。

      返回类型:str

    reset_online_tag(recorder: list | object)

    将所有模型下线,并将记录器设置为“在线”。

    参数:

    • recorder (Union[list, object]) – 您想要重置为“在线”的记录器。

    online_models() -> list

    获取当前在线模型。

    返回:

    一个在线模型列表。

    返回类型:list

    update_online_pred(to_date=None)

    将在线模型的预测更新到 to_date。

    参数:

    • to_date (pd.Timestamp) – 在此日期之前的预测将被更新。None 表示更新到最新。

    class qlib.workflow.online.utils.OnlineToolR(default_exp_name: str | None = None)

    基于 (R)ecorder 的 OnlineTool 实现。

    __init__(default_exp_name: str | None = None)

    初始化 OnlineToolR。

    参数:

    • default_exp_name (str) – 默认实验名称。

    set_online_tag(tag, recorder: Recorder | List)

    为模型的记录器设置标签,以标记是否在线。

    参数:

    • tag (str) – ONLINE_TAGNEXT_ONLINE_TAGOFFLINE_TAG 中的标签。

    • recorder (Union[Recorder, List]) – Recorder 的一个列表或一个实例。

    get_online_tag(recorder: Recorder) -> str

    给定一个模型记录器并返回其在线标签。

    参数:

    • recorder (Recorder) – 记录器的一个实例。

      返回:

      在线标签。

      返回类型:str

    reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)

    将所有模型下线,并将记录器设置为“在线”。

    参数:

    • recorder (Union[Recorder, List]) – 您想要重置为“在线”的记录器。

    • exp_name (str) – 实验名称。如果为 None,则使用 default_exp_name

    online_models(exp_name: str | None = None) -> list

    获取当前在线模型。

    参数:

    • exp_name (str) – 实验名称。如果为 None,则使用 default_exp_name。

      返回:

      一个在线模型列表。

      返回类型:list

    update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)

    将在线模型的预测更新到 to_date。

    参数:

    • to_date (pd.Timestamp) – 在此日期之前的预测将被更新。None 表示更新到 Calendar 中的最新时间。

    • exp_name (str) – 实验名称。如果为 None,则使用 default_exp_name


    更新器 (Updater)

    Updater 是一个模块,用于在股票数据更新时更新预测等工件。

    class qlib.workflow.online.update.RMDLoader(rec: Recorder)

    记录器模型数据集加载器。

    __init__(rec: Recorder)

    get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) -> DatasetH

    加载、配置和设置数据集。

    此数据集用于推理。

    参数:

    • start_time – 基础数据的开始时间。

    • end_time – 基础数据的结束时间。

    • segmentsdict 数据集的分段配置。由于时间序列数据集 (TSDatasetH),测试段可能与 start_timeend_time 不同。

    • unprepared_dataset – Optional[DatasetH] 如果用户不想从记录器加载数据集,请指定用户的数据集。

      返回:

      DatasetH 的实例。

      返回类型:DatasetH

    class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)

    更新特定的记录器。

    __init__(record: Recorder, *args, **kwargs)

    abstract update(*args, **kwargs)

    更新特定记录器的信息。

    class qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

    基于数据集的更新器。

    为基于 Qlib 数据集更新数据提供更新功能。

    假设

    • 基于 Qlib 数据集。

    • 要更新的数据是多级索引的 pd.DataFrame。例如,标签、预测。

                             LABEL0
    datetime   instrument
    2021-05-10 SH600000    0.006965
               SH600004    0.003407
    ...                         ...
    2021-05-28 SZ300498    0.015748
               SZ300676   -0.001321
    

    __init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

    初始化 PredUpdater。

    在以下情况下的预期行为:

    • 如果 to_date 大于日历中的最大日期,数据将更新到最新日期。

    • 如果在 from_date 之前或 to_date 之后有数据,则只影响 from_date 和 to_date 之间的数据。

      参数:

    • recordRecorder

    • to_date

      • 将预测更新到 to_date

      • 如果 to_dateNone:数据将更新到最新日期。

    • from_date

      • 更新将从 from_date 开始。

      • 如果 from_dateNone:更新将发生在历史数据中最新数据之后的下一个刻度。

    • hist_ref

      • int 有时,数据集会依赖于历史数据。将问题留给用户来设置历史依赖的长度。如果用户未指定此参数,Updater 将尝试加载数据集以自动确定 hist_ref

      • 注意start_time 不包含在 hist_ref 中;因此在大多数情况下,hist_ref 将是 step_len - 1

    • loader_clstype 用于加载模型和数据集的类。

    prepare_data(unprepared_dataset: DatasetH | None = None) -> DatasetH

    加载数据集:

    • 如果指定了 unprepared_dataset,则直接准备数据集。

    • 否则,...

    • 分离此函数将使其更容易重用数据集。

      返回:

      DatasetH 的实例。

      返回类型:DatasetH

    update(dataset: DatasetH | None = None, write: bool = True, ret_new: bool = False) -> object | None

    参数:

    • dataset (DatasetH) – DatasetH 的实例。None 表示再次准备它。

    • write (bool) – 是否执行写入操作。

    • ret_new (bool) – 是否返回更新后的数据。

      返回:

      更新后的数据集。

      返回类型:Optional[object]

    abstract get_update_data(dataset: Dataset) -> DataFrame

    根据给定数据集返回更新后的数据。

    get_update_data 和 update 之间的区别:

    • update_date 只包含一些特定于数据的功能。

    • update 包含一些通用的例行步骤(例如,准备数据集、检查)。

    class qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

    更新 Recorder 中的预测。

    get_update_data(dataset: Dataset) -> DataFrame

    根据给定数据集返回更新后的数据。

    get_update_data 和 update 之间的区别:

    • update_date 只包含一些特定于数据的功能。

    • update 包含一些通用的例行步骤(例如,准备数据集、检查)。

    class qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)

    更新记录器中的标签。

    假设

    • 标签是从 record_temp.SignalRecord 生成的。

    __init__(record: Recorder, to_date=None, **kwargs)

    初始化 PredUpdater。

    在以下情况下的预期行为:

    • 如果 to_date 大于日历中的最大日期,数据将更新到最新日期。

    • 如果在 from_date 之前或 to_date 之后有数据,则只影响 from_date 和 to_date 之间的数据。

      参数:

    • recordRecorder

    • to_date

      • 将预测更新到 to_date

      • 如果 to_dateNone:数据将更新到最新日期。

    • from_date

      • 更新将从 from_date 开始。

      • 如果 from_dateNone:更新将发生在历史数据中最新数据之后的下一个刻度。

    • hist_ref

      • int 有时,数据集会依赖于历史数据。将问题留给用户来设置历史依赖的长度。如果用户未指定此参数,Updater 将尝试加载数据集以自动确定 hist_ref

      • 注意start_time 不包含在 hist_ref 中;因此在大多数情况下,hist_ref 将是 step_len - 1

    • loader_clstype 用于加载模型和数据集的类。

    get_update_data(dataset: Dataset) -> DataFrame

    根据给定数据集返回更新后的数据。

    get_update_data 和 update 之间的区别:

    • update_date 只包含一些特定于数据的功能。

    • update 包含一些通用的例行步骤(例如,准备数据集、检查)。