History Collection (SQLite)

mt5cli.history

SQLite storage helpers for the collect-history incremental data pipeline.

DEFAULT_HISTORY_TIMEFRAMES module-attribute

DEFAULT_HISTORY_TIMEFRAMES: tuple[str, ...] = (
    TIMEFRAME_NAMES
)

SqliteConnOrPath module-attribute

SqliteConnOrPath = Connection | Path | str

logger module-attribute

logger = getLogger(__name__)

DedupScope dataclass

DedupScope(
    where: str,
    params: tuple[object, ...],
    required_columns: frozenset[str],
)

Scoped deduplication predicate and the columns it references.

Attributes:

Name Type Description
where str

SQL predicate appended to the duplicate-removal query.

params tuple[object, ...]

Parameters bound to the scope predicate.

required_columns frozenset[str]

Columns that must be present in the written table for the scope to run.

params instance-attribute

params: tuple[object, ...]

required_columns instance-attribute

required_columns: frozenset[str]

where instance-attribute

where: str

RateTarget dataclass

RateTarget(symbol: str | None, timeframe: int | str)

A single rate series identified by symbol and timeframe.

Attributes:

Name Type Description
symbol str | None

MT5 symbol name, or None when the rate series is addressed only by an explicit table (for example a custom SQLite view).

timeframe int | str

MT5 timeframe as an integer or name (for example M1).

symbol instance-attribute

symbol: str | None

timeframe instance-attribute

timeframe: int | str

timeframe_int property

timeframe_int: int

Return the timeframe as its integer MT5 value.

__post_init__

__post_init__() -> None

Normalize accepted timeframe aliases to the stored integer value.

Source code in mt5cli/history.py
def __post_init__(self) -> None:
    """Normalize accepted timeframe aliases to the stored integer value."""
    if not isinstance(self.timeframe, int):
        object.__setattr__(self, "timeframe", parse_timeframe(self.timeframe))

append_dataframe

append_dataframe(
    conn: Connection,
    frame: DataFrame,
    table_name: str,
    if_exists: IfExists,
) -> bool

Append a DataFrame to SQLite when it has a schema.

Returns:

Type Description
bool

True if a table was written, False if the frame had no columns.

Source code in mt5cli/history.py
def append_dataframe(
    conn: sqlite3.Connection,
    frame: pd.DataFrame,
    table_name: str,
    if_exists: IfExists,
) -> bool:
    """Append a DataFrame to SQLite when it has a schema.

    Returns:
        True if a table was written, False if the frame had no columns.
    """
    if len(frame.columns) == 0:
        logger.warning("Skipping %s: dataset returned no columns", table_name)
        return False
    frame.to_sql(  # type: ignore[reportUnknownMemberType]
        table_name,
        conn,
        if_exists=if_exists.value,
        index=False,
        chunksize=50_000,
    )
    return True

augment_written_columns_from_sqlite

augment_written_columns_from_sqlite(
    conn: Connection,
    datasets: set[Dataset],
    written_columns: dict[Dataset, set[str]],
) -> None

Add existing table columns to the written column map.

Source code in mt5cli/history.py
def augment_written_columns_from_sqlite(
    conn: sqlite3.Connection,
    datasets: set[Dataset],
    written_columns: dict[Dataset, set[str]],
) -> None:
    """Add existing table columns to the written column map."""
    for dataset in datasets:
        columns = get_table_columns(conn, dataset.table_name)
        if not columns:
            continue
        if dataset in written_columns:
            written_columns[dataset].update(columns)
        else:
            written_columns[dataset] = columns

build_rate_targets

build_rate_targets(
    symbols: Sequence[str],
    timeframes: Sequence[int | str],
    *,
    allow_missing_symbol: bool = False,
) -> list[RateTarget]

Build rate targets for every symbol and timeframe combination.

Parameters:

Name Type Description Default
symbols Sequence[str]

MT5 symbol names. May be empty when allow_missing_symbol.

required
timeframes Sequence[int | str]

MT5 timeframes as integers or names (for example M1).

required
allow_missing_symbol bool

When True and symbols is empty, build targets with symbol=None for each timeframe instead of raising.

False

Returns:

Type Description
list[RateTarget]

Targets in row-major order: every timeframe for the first symbol, then

list[RateTarget]

every timeframe for the next symbol, and so on.

Raises:

Type Description
ValueError

If timeframes is empty, or symbols is empty and allow_missing_symbol is False.

Source code in mt5cli/history.py
def build_rate_targets(
    symbols: Sequence[str],
    timeframes: Sequence[int | str],
    *,
    allow_missing_symbol: bool = False,
) -> list[RateTarget]:
    """Build rate targets for every symbol and timeframe combination.

    Args:
        symbols: MT5 symbol names. May be empty when ``allow_missing_symbol``.
        timeframes: MT5 timeframes as integers or names (for example ``M1``).
        allow_missing_symbol: When True and ``symbols`` is empty, build targets
            with ``symbol=None`` for each timeframe instead of raising.

    Returns:
        Targets in row-major order: every timeframe for the first symbol, then
        every timeframe for the next symbol, and so on.

    Raises:
        ValueError: If ``timeframes`` is empty, or ``symbols`` is empty and
            ``allow_missing_symbol`` is False.
    """
    if not timeframes:
        msg = "At least one timeframe is required."
        raise ValueError(msg)
    if not symbols:
        if not allow_missing_symbol:
            msg = "At least one symbol is required."
            raise ValueError(msg)
        return [RateTarget(symbol=None, timeframe=tf) for tf in timeframes]
    return [
        RateTarget(symbol=symbol, timeframe=tf)
        for symbol in symbols
        for tf in timeframes
    ]

build_rate_view_name

build_rate_view_name(
    *,
    symbol: str,
    granularity: str,
    granularity_count: int,
    timeframe: int,
) -> str

Return a collision-free offline optimize view name.

View names always include the timeframe integer after a __ separator so a symbol such as EURUSD_M1 cannot collide with EURUSD at timeframe M1.

Source code in mt5cli/history.py
def build_rate_view_name(
    *,
    symbol: str,
    granularity: str,
    granularity_count: int,
    timeframe: int,
) -> str:
    """Return a collision-free offline optimize view name.

    View names always include the timeframe integer after a ``__`` separator so
    a symbol such as ``EURUSD_M1`` cannot collide with ``EURUSD`` at timeframe
    ``M1``.
    """
    if granularity_count == 1:
        return f"rate_{symbol}__{timeframe}"
    return f"rate_{symbol}__{granularity}_{timeframe}"

create_cash_events_view

create_cash_events_view(
    conn: Connection, deals_columns: set[str]
) -> bool

Create the cash_events SQLite view derived from history_deals.

Returns:

Type Description
bool

True if the view was created, False if required columns are missing.

Source code in mt5cli/history.py
def create_cash_events_view(
    conn: sqlite3.Connection,
    deals_columns: set[str],
) -> bool:
    """Create the cash_events SQLite view derived from history_deals.

    Returns:
        True if the view was created, False if required columns are missing.
    """
    if "type" not in deals_columns:
        logger.warning("Skipping cash_events view: history_deals.type is missing")
        return False
    conn.execute("DROP VIEW IF EXISTS cash_events")
    conn.execute(
        "CREATE VIEW cash_events AS"  # noqa: S608
        f" SELECT * FROM history_deals WHERE type NOT IN {_TRADE_DEAL_TYPES_SQL}",
    )
    return True

create_history_indexes

create_history_indexes(
    conn: Connection,
    written_columns: dict[Dataset, set[str]],
) -> None

Create useful indexes for collected history tables when present.

Source code in mt5cli/history.py
def create_history_indexes(
    conn: sqlite3.Connection,
    written_columns: dict[Dataset, set[str]],
) -> None:
    """Create useful indexes for collected history tables when present."""
    if {"symbol", "timeframe", "time"}.issubset(
        written_columns.get(Dataset.rates, set()),
    ):
        conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_rates_symbol_timeframe_time"
            " ON rates(symbol, timeframe, time)",
        )
    if {"symbol", "time"}.issubset(written_columns.get(Dataset.ticks, set())):
        conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_ticks_symbol_time ON ticks(symbol, time)",
        )
    if {"position_id", "symbol"}.issubset(
        written_columns.get(Dataset.history_deals, set()),
    ):
        conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_history_deals_position_symbol"
            " ON history_deals(position_id, symbol)",
        )

create_positions_reconstructed_view

create_positions_reconstructed_view(
    conn: Connection, deals_columns: set[str]
) -> bool

Create the positions_reconstructed SQLite view derived from history_deals.

Returns:

Type Description
bool

True if the view was created, False if required columns are missing.

Source code in mt5cli/history.py
def create_positions_reconstructed_view(
    conn: sqlite3.Connection,
    deals_columns: set[str],
) -> bool:
    """Create the positions_reconstructed SQLite view derived from history_deals.

    Returns:
        True if the view was created, False if required columns are missing.
    """
    if not _POSITIONS_VIEW_REQUIRED_COLUMNS.issubset(deals_columns):
        missing = ", ".join(sorted(_POSITIONS_VIEW_REQUIRED_COLUMNS - deals_columns))
        logger.warning(
            "Skipping positions_reconstructed view: history_deals missing columns: %s",
            missing,
        )
        return False
    conn.execute("DROP VIEW IF EXISTS positions_reconstructed")
    conn.execute(
        "CREATE VIEW positions_reconstructed AS"  # noqa: S608
        " SELECT"
        " position_id,"
        " symbol,"
        " MIN(CASE WHEN entry = 0 THEN time END) AS open_time,"
        " MAX(CASE WHEN entry IN (1, 2, 3) THEN time END) AS close_time,"
        " MIN(CASE WHEN entry = 0 THEN type END) AS direction,"
        " SUM(CASE WHEN entry = 0 THEN volume ELSE 0 END) AS volume_open,"
        " SUM(CASE WHEN entry IN (1, 2, 3) THEN volume ELSE 0 END) AS volume_close,"
        " SUM(CASE WHEN entry = 2 THEN volume ELSE 0 END) AS volume_reversal,"
        " CASE"
        " WHEN SUM(CASE WHEN entry = 0 THEN volume ELSE 0 END) > 0"
        " THEN SUM(CASE WHEN entry = 0 THEN price * volume ELSE 0 END)"
        " / SUM(CASE WHEN entry = 0 THEN volume ELSE 0 END)"
        " END AS open_price,"
        " CASE"
        " WHEN SUM(CASE WHEN entry IN (1, 2, 3) THEN volume ELSE 0 END) > 0"
        " THEN SUM(CASE WHEN entry IN (1, 2, 3) THEN price * volume ELSE 0 END)"
        " / SUM(CASE WHEN entry IN (1, 2, 3) THEN volume ELSE 0 END)"
        " END AS close_price,"
        " SUM(profit) AS total_profit,"
        " SUM(CASE WHEN entry = 2 THEN 1 ELSE 0 END) AS reversal_count,"
        " COUNT(*) AS deals_count"
        " FROM history_deals"
        f" WHERE type IN {_TRADE_DEAL_TYPES_SQL} AND position_id != 0"
        " GROUP BY position_id, symbol"
        " HAVING SUM(CASE WHEN entry IN (1, 2, 3) THEN 1 ELSE 0 END) > 0",
    )
    return True

create_rate_compatibility_views

create_rate_compatibility_views(conn: Connection) -> None

Create rate compatibility views from the normalized rates table.

Source code in mt5cli/history.py
def create_rate_compatibility_views(conn: sqlite3.Connection) -> None:
    """Create rate compatibility views from the normalized rates table."""
    columns = get_table_columns(conn, Dataset.rates.table_name)
    if not {"symbol", "timeframe", "time"}.issubset(columns):
        return
    drop_rate_compatibility_views(conn)
    select_columns = sorted(columns - {"symbol", "timeframe"})
    quoted_columns = ", ".join(f'"{column}"' for column in select_columns)
    rows = conn.execute(
        "SELECT DISTINCT symbol, timeframe FROM rates ORDER BY symbol, timeframe",
    ).fetchall()
    timeframes_by_symbol: dict[str, list[int]] = {}
    for symbol, timeframe in rows:
        timeframes_by_symbol.setdefault(str(symbol), []).append(int(timeframe))
    for symbol, timeframes in timeframes_by_symbol.items():
        for timeframe in timeframes:
            granularity = resolve_granularity_name(timeframe)
            view_name = build_rate_view_name(
                symbol=symbol,
                granularity=granularity,
                granularity_count=len(timeframes),
                timeframe=timeframe,
            )
            quoted_view_name = quote_sqlite_identifier(view_name)
            escaped_symbol = symbol.replace("'", "''")
            conn.execute(
                f"CREATE VIEW {quoted_view_name} AS"  # noqa: S608
                f" SELECT {quoted_columns} FROM rates"
                f" WHERE symbol = '{escaped_symbol}'"
                f" AND timeframe = {timeframe}",
            )

deduplicate_history_tables

deduplicate_history_tables(
    conn: Connection,
    written_columns: dict[Dataset, set[str]],
    written_tables: set[Dataset],
    dedup_scopes: Mapping[Dataset, Sequence[DedupScope]]
    | None = None,
) -> None

Deduplicate appended history tables by stable identifiers.

Scopes whose required columns are not present in the written table are skipped. If all scopes for a dataset are skipped, the table receives one unscoped deduplication pass instead.

Source code in mt5cli/history.py
def deduplicate_history_tables(
    conn: sqlite3.Connection,
    written_columns: dict[Dataset, set[str]],
    written_tables: set[Dataset],
    dedup_scopes: Mapping[Dataset, Sequence[DedupScope]] | None = None,
) -> None:
    """Deduplicate appended history tables by stable identifiers.

    Scopes whose required columns are not present in the written table are
    skipped. If all scopes for a dataset are skipped, the table receives one
    unscoped deduplication pass instead.
    """
    cursor = conn.cursor()
    for dataset in written_tables:
        columns = written_columns.get(dataset, set())
        table = dataset.table_name
        keys = next(
            (
                candidate
                for candidate in _HISTORY_DEDUP_KEYS[dataset]
                if set(candidate).issubset(columns)
            ),
            None,
        )
        if keys is None:
            logger.warning(
                "Skipping %s deduplication: no supported key columns",
                table,
            )
            continue
        raw_scopes: Sequence[DedupScope] = (
            dedup_scopes.get(dataset, ()) if dedup_scopes else ()
        )
        scopes = [scope for scope in raw_scopes if scope.required_columns <= columns]
        if scopes:
            for scope in scopes:
                drop_duplicates_in_table(
                    cursor,
                    table,
                    list(keys),
                    keep="last",
                    scope_where=scope.where,
                    scope_params=scope.params,
                )
            continue
        drop_duplicates_in_table(cursor, table, list(keys), keep="last")

drop_duplicates_in_table

drop_duplicates_in_table(
    cursor: Cursor,
    table: str,
    ids: list[str],
    *,
    keep: Literal["first", "last"] = "last",
    scope_where: str | None = None,
    scope_params: tuple[object, ...] = (),
) -> None

Remove duplicate rows, keeping the first or last ROWID per key group.

Raises:

Type Description
ValueError

If the table or column names are invalid.

Source code in mt5cli/history.py
def drop_duplicates_in_table(
    cursor: sqlite3.Cursor,
    table: str,
    ids: list[str],
    *,
    keep: Literal["first", "last"] = "last",
    scope_where: str | None = None,
    scope_params: tuple[object, ...] = (),
) -> None:
    """Remove duplicate rows, keeping the first or last ROWID per key group.

    Raises:
        ValueError: If the table or column names are invalid.
    """
    if not table.isidentifier():
        msg = f"Invalid table name: {table}"
        raise ValueError(msg)
    if invalid := {column for column in ids if not column.isidentifier()}:
        msg = f"Invalid column names: {', '.join(sorted(invalid))}"
        raise ValueError(msg)
    ids_csv = ", ".join(f'"{column}"' for column in ids)
    rowid_selector = "MIN" if keep == "first" else "MAX"
    if scope_where:
        delete_sql = (
            f"DELETE FROM {table} WHERE {scope_where} AND ROWID NOT IN"  # noqa: S608
            f" (SELECT {rowid_selector}(ROWID) FROM {table} WHERE {scope_where}"
            f" GROUP BY {ids_csv})"
        )
        cursor.execute(delete_sql, scope_params + scope_params)
        return
    cursor.execute(
        f"DELETE FROM {table} WHERE ROWID NOT IN"  # noqa: S608
        f" (SELECT {rowid_selector}(ROWID) FROM {table} GROUP BY {ids_csv})",
    )

drop_forming_rate_bar

drop_forming_rate_bar(df_rate: DataFrame) -> DataFrame

Return closed bars from chronologically ordered MT5 rate data.

MetaTrader 5 copy_rates_from_pos(start_pos=0) includes the still-forming current bar as the last row. Slice it off so downstream logic only sees completed bars. Empty frames and single-row frames return empty results.

Parameters:

Name Type Description Default
df_rate DataFrame

Rate data ordered oldest-to-newest with the forming bar last.

required

Returns:

Type Description
DataFrame

A new DataFrame with all rows except the last. Index and columns are

DataFrame

preserved. The input frame is not modified.

Source code in mt5cli/history.py
def drop_forming_rate_bar(df_rate: pd.DataFrame) -> pd.DataFrame:
    """Return closed bars from chronologically ordered MT5 rate data.

    MetaTrader 5 ``copy_rates_from_pos(start_pos=0)`` includes the still-forming
    current bar as the last row. Slice it off so downstream logic only sees
    completed bars. Empty frames and single-row frames return empty results.

    Args:
        df_rate: Rate data ordered oldest-to-newest with the forming bar last.

    Returns:
        A new DataFrame with all rows except the last. Index and columns are
        preserved. The input frame is not modified.
    """
    return df_rate.iloc[:-1].copy()

drop_rate_compatibility_views

drop_rate_compatibility_views(conn: Connection) -> None

Drop all mt5cli-managed rate_* compatibility views.

Source code in mt5cli/history.py
def drop_rate_compatibility_views(conn: sqlite3.Connection) -> None:
    """Drop all mt5cli-managed ``rate_*`` compatibility views."""
    rows = conn.execute(
        "SELECT name FROM sqlite_master WHERE type = 'view' AND name GLOB 'rate_*'",
    ).fetchall()
    for (view_name,) in rows:
        quoted_view_name = quote_sqlite_identifier(str(view_name))
        conn.execute(f"DROP VIEW IF EXISTS {quoted_view_name}")

filter_incremental_history_deals_frame

filter_incremental_history_deals_frame(
    frame: DataFrame,
    symbols: Sequence[str],
    start_by_symbol: dict[str, datetime],
    account_event_start: datetime,
) -> DataFrame

Filter incrementally fetched history_deals by symbol and event start times.

Returns:

Type Description
DataFrame

Rows for selected symbols at or after each symbol start, plus account

DataFrame

events at or after account_event_start.

Source code in mt5cli/history.py
def filter_incremental_history_deals_frame(
    frame: pd.DataFrame,
    symbols: Sequence[str],
    start_by_symbol: dict[str, datetime],
    account_event_start: datetime,
) -> pd.DataFrame:
    """Filter incrementally fetched history_deals by symbol and event start times.

    Returns:
        Rows for selected symbols at or after each symbol start, plus account
        events at or after ``account_event_start``.
    """
    if frame.empty:
        return frame.copy()
    parsed_times = _frame_parsed_times(frame)
    time_valid = parsed_times.notna()
    account_event_mask = _history_deals_account_event_mask(frame)
    account_keep = account_event_mask & (parsed_times >= account_event_start)
    trade_keep = pd.Series(data=False, index=frame.index)
    if "symbol" in frame.columns:
        for symbol in symbols:
            trade_keep |= (
                (frame["symbol"] == symbol)
                & (parsed_times >= start_by_symbol[symbol])
                & ~account_event_mask
            )
    keep = (account_keep | trade_keep) & time_valid
    return frame.loc[keep].copy()

filter_trade_history_frame

filter_trade_history_frame(
    frame: DataFrame,
    symbols: Sequence[str],
    *,
    include_account_events: bool,
) -> DataFrame

Filter trade history rows to selected symbols and account events.

Returns:

Type Description
DataFrame

Filtered history rows.

Source code in mt5cli/history.py
def filter_trade_history_frame(
    frame: pd.DataFrame,
    symbols: Sequence[str],
    *,
    include_account_events: bool,
) -> pd.DataFrame:
    """Filter trade history rows to selected symbols and account events.

    Returns:
        Filtered history rows.
    """
    if "symbol" not in frame.columns:
        return frame
    symbol_mask = frame["symbol"].isin(symbols)
    if not include_account_events:
        return frame.loc[symbol_mask].copy()
    account_event_mask = _history_deals_account_event_mask(frame)
    return frame.loc[symbol_mask | account_event_mask].copy()

get_history_deals_account_event_start_datetime

get_history_deals_account_event_start_datetime(
    conn: Connection, *, fallback_start: datetime
) -> datetime

Return the next update start for account-level history_deals rows.

Source code in mt5cli/history.py
def get_history_deals_account_event_start_datetime(
    conn: sqlite3.Connection,
    *,
    fallback_start: datetime,
) -> datetime:
    """Return the next update start for account-level history_deals rows."""
    table = Dataset.history_deals.table_name
    columns = get_table_columns(conn, table)
    if "time" not in columns:
        return fallback_start
    if "type" in columns:
        where_clause = f"type NOT IN {_TRADE_DEAL_TYPES_SQL}"
    elif "symbol" in columns:
        where_clause = "symbol IS NULL OR symbol = ''"
    else:
        return fallback_start
    row = conn.execute(
        f"SELECT MAX(time) FROM {table} WHERE {where_clause}",  # noqa: S608
    ).fetchone()
    parsed = parse_sqlite_timestamp(row[0] if row else None)
    return parsed if parsed is not None else fallback_start

get_incremental_start_datetime

get_incremental_start_datetime(
    conn: Connection,
    dataset: Dataset,
    *,
    symbol: str,
    timeframe: int | None,
    fallback_start: datetime,
) -> datetime

Return the next update start datetime from existing MAX(time).

Source code in mt5cli/history.py
def get_incremental_start_datetime(
    conn: sqlite3.Connection,
    dataset: Dataset,
    *,
    symbol: str,
    timeframe: int | None,
    fallback_start: datetime,
) -> datetime:
    """Return the next update start datetime from existing MAX(time)."""
    timeframes = [timeframe] if timeframe is not None else None
    starts = load_incremental_start_datetimes(
        conn,
        dataset,
        symbols=[symbol],
        timeframes=timeframes,
        fallback_start=fallback_start,
    )
    return starts[symbol, timeframe]

get_table_columns

get_table_columns(conn: Connection, table: str) -> set[str]

Return existing SQLite columns for a table.

Source code in mt5cli/history.py
def get_table_columns(conn: sqlite3.Connection, table: str) -> set[str]:
    """Return existing SQLite columns for a table."""
    quoted_table = quote_sqlite_identifier(table)
    rows = conn.execute(f"PRAGMA table_info({quoted_table})").fetchall()
    return {str(row[1]) for row in rows}

load_incremental_start_datetimes

load_incremental_start_datetimes(
    conn: Connection,
    dataset: Dataset,
    *,
    symbols: Sequence[str],
    timeframes: Sequence[int] | None = None,
    fallback_start: datetime,
) -> dict[tuple[str, int | None], datetime]

Return next update start datetimes keyed by symbol and optional timeframe.

Source code in mt5cli/history.py
def load_incremental_start_datetimes(
    conn: sqlite3.Connection,
    dataset: Dataset,
    *,
    symbols: Sequence[str],
    timeframes: Sequence[int] | None = None,
    fallback_start: datetime,
) -> dict[tuple[str, int | None], datetime]:
    """Return next update start datetimes keyed by symbol and optional timeframe."""
    table = dataset.table_name
    columns = get_table_columns(conn, table)
    if dataset is Dataset.rates and columns:
        _validate_rates_schema(columns)

    if "time" not in columns:
        if dataset is Dataset.rates and timeframes is not None:
            return {
                (symbol, timeframe): fallback_start
                for symbol in symbols
                for timeframe in timeframes
            }
        return {(symbol, None): fallback_start for symbol in symbols}

    parsed_by_key: dict[tuple[str, int | None], datetime] = {}
    if (
        dataset is Dataset.rates
        and timeframes is not None
        and {"symbol", "timeframe"}.issubset(columns)
    ):
        symbol_placeholders = ", ".join("?" for _ in symbols)
        timeframe_placeholders = ", ".join("?" for _ in timeframes)
        grouped_rates_query = (
            "SELECT symbol, timeframe, MAX(time) FROM "  # noqa: S608
            f"{table} WHERE symbol IN ({symbol_placeholders})"
            f" AND timeframe IN ({timeframe_placeholders})"
            " GROUP BY symbol, timeframe"
        )
        rows = conn.execute(
            grouped_rates_query,
            [*symbols, *timeframes],
        ).fetchall()
        for row_symbol, row_timeframe, max_time in rows:
            parsed = parse_sqlite_timestamp(max_time)
            if parsed is not None:
                parsed_by_key[str(row_symbol), int(row_timeframe)] = parsed
        return {
            (symbol, timeframe): parsed_by_key.get(
                (symbol, timeframe),
                fallback_start,
            )
            for symbol in symbols
            for timeframe in timeframes
        }

    if "symbol" in columns:
        symbol_placeholders = ", ".join("?" for _ in symbols)
        rows = conn.execute(
            f"SELECT symbol, MAX(time) FROM {table}"  # noqa: S608
            f" WHERE symbol IN ({symbol_placeholders}) GROUP BY symbol",
            list(symbols),
        ).fetchall()
        for row_symbol, max_time in rows:
            parsed = parse_sqlite_timestamp(max_time)
            if parsed is not None:
                parsed_by_key[str(row_symbol), None] = parsed
        return {
            (symbol, None): parsed_by_key.get((symbol, None), fallback_start)
            for symbol in symbols
        }

    row = conn.execute(f"SELECT MAX(time) FROM {table}").fetchone()  # noqa: S608
    parsed = parse_sqlite_timestamp(row[0] if row else None)
    shared_start = parsed if parsed is not None else fallback_start
    return {(symbol, None): shared_start for symbol in symbols}

load_rate_data

load_rate_data(
    conn_or_path: SqliteConnOrPath,
    table: str,
    count: int | None = None,
) -> DataFrame

Load rate-like data from a SQLite database path or connection.

Parameters:

Name Type Description Default
conn_or_path SqliteConnOrPath

SQLite database path or open connection.

required
table str

Source table or view name.

required
count int | None

Optional number of most recent rows to load.

None

Returns:

Type Description
DataFrame

DataFrame indexed by ascending time.

Source code in mt5cli/history.py
def load_rate_data(
    conn_or_path: SqliteConnOrPath,
    table: str,
    count: int | None = None,
) -> pd.DataFrame:
    """Load rate-like data from a SQLite database path or connection.

    Args:
        conn_or_path: SQLite database path or open connection.
        table: Source table or view name.
        count: Optional number of most recent rows to load.

    Returns:
        DataFrame indexed by ascending ``time``.

    """
    conn, should_close = _open_existing_sqlite_database(conn_or_path)
    try:
        return load_rate_data_from_connection(conn, table, count=count)
    finally:
        if should_close:
            conn.close()

load_rate_data_from_connection

load_rate_data_from_connection(
    connection: Connection,
    table: str,
    count: int | None = None,
) -> DataFrame

Load rate-like data from a SQLite table or view.

Parameters:

Name Type Description Default
connection Connection

Open SQLite connection.

required
table str

Source table or view name.

required
count int | None

Optional number of most recent rows to load.

None

Returns:

Type Description
DataFrame

DataFrame indexed by ascending time.

Raises:

Type Description
ValueError

If inputs, schema, timestamps are invalid, or the table or view contains no rows.

Source code in mt5cli/history.py
def load_rate_data_from_connection(
    connection: sqlite3.Connection,
    table: str,
    count: int | None = None,
) -> pd.DataFrame:
    """Load rate-like data from a SQLite table or view.

    Args:
        connection: Open SQLite connection.
        table: Source table or view name.
        count: Optional number of most recent rows to load.

    Returns:
        DataFrame indexed by ascending ``time``.

    Raises:
        ValueError: If inputs, schema, timestamps are invalid, or the table
            or view contains no rows.
    """
    table_name = _validate_rate_load_request(table, count)
    columns = get_table_columns(connection, table_name)
    _ensure_rate_columns(columns, table_name)
    quoted_table = quote_sqlite_identifier(table_name)
    if count is None:
        frame = cast(
            "pd.DataFrame",
            pd.read_sql_query(  # type: ignore[reportUnknownMemberType]
                f"SELECT * FROM {quoted_table} ORDER BY time ASC",  # noqa: S608
                connection,
            ),
        )
    else:
        frame = cast(
            "pd.DataFrame",
            pd.read_sql_query(  # type: ignore[reportUnknownMemberType]
                f"SELECT * FROM {quoted_table} ORDER BY time DESC LIMIT ?",  # noqa: S608
                connection,
                params=(count,),
            ),
        )
    if frame.empty:
        msg = f"SQLite table or view {table_name!r} contains no rows."
        raise ValueError(msg)
    return _parse_rate_time_index(frame, table_name)

load_rate_series_by_granularity

load_rate_series_by_granularity(
    conn_or_path: SqliteConnOrPath,
    symbols: Sequence[str],
    granularities: Sequence[int | str],
    count: int,
    *,
    explicit_tables: Sequence[str] | None = None,
    allow_missing_symbol: bool = False,
) -> dict[tuple[str | None, str], DataFrame]

Load rate series keyed by symbol and string granularity name.

Builds targets with :func:build_rate_targets and loads them with :func:load_rate_series_from_sqlite, then rekeys the result by granularity name (for example M1) instead of the integer timeframe to reduce downstream boilerplate.

Parameters:

Name Type Description Default
conn_or_path SqliteConnOrPath

SQLite database path or open connection.

required
symbols Sequence[str]

MT5 symbol names. May be empty when allow_missing_symbol.

required
granularities Sequence[int | str]

MT5 timeframes as integers or names (for example M1).

required
count int

Number of most recent rows to load per series.

required
explicit_tables Sequence[str] | None

Optional explicit table or view names matching the built targets in row-major order. Required when symbols are omitted.

None
allow_missing_symbol bool

When True and symbols is empty, build targets with symbol=None for each granularity instead of raising.

False

Returns:

Type Description
dict[tuple[str | None, str], DataFrame]

Mapping keyed by (symbol | None, granularity_name) to each rate

dict[tuple[str | None, str], DataFrame]

DataFrame. Propagates ValueError (via :func:build_rate_targets and

dict[tuple[str | None, str], DataFrame]

func:load_rate_series_from_sqlite) when inputs are empty or invalid,

dict[tuple[str | None, str], DataFrame]

table resolution fails, or duplicate targets are present.

Source code in mt5cli/history.py
def load_rate_series_by_granularity(
    conn_or_path: SqliteConnOrPath,
    symbols: Sequence[str],
    granularities: Sequence[int | str],
    count: int,
    *,
    explicit_tables: Sequence[str] | None = None,
    allow_missing_symbol: bool = False,
) -> dict[tuple[str | None, str], pd.DataFrame]:
    """Load rate series keyed by symbol and string granularity name.

    Builds targets with :func:`build_rate_targets` and loads them with
    :func:`load_rate_series_from_sqlite`, then rekeys the result by granularity
    name (for example ``M1``) instead of the integer timeframe to reduce
    downstream boilerplate.

    Args:
        conn_or_path: SQLite database path or open connection.
        symbols: MT5 symbol names. May be empty when ``allow_missing_symbol``.
        granularities: MT5 timeframes as integers or names (for example ``M1``).
        count: Number of most recent rows to load per series.
        explicit_tables: Optional explicit table or view names matching the
            built targets in row-major order. Required when symbols are omitted.
        allow_missing_symbol: When True and ``symbols`` is empty, build targets
            with ``symbol=None`` for each granularity instead of raising.

    Returns:
        Mapping keyed by ``(symbol | None, granularity_name)`` to each rate
        DataFrame. Propagates ``ValueError`` (via :func:`build_rate_targets` and
        :func:`load_rate_series_from_sqlite`) when inputs are empty or invalid,
        table resolution fails, or duplicate targets are present.
    """
    targets = build_rate_targets(
        symbols,
        granularities,
        allow_missing_symbol=allow_missing_symbol,
    )
    series = load_rate_series_from_sqlite(
        conn_or_path,
        targets,
        count,
        explicit_tables=explicit_tables,
    )
    return {
        (symbol, resolve_granularity_name(timeframe)): frame
        for (symbol, timeframe), frame in series.items()
    }

load_rate_series_from_sqlite

load_rate_series_from_sqlite(
    conn_or_path: SqliteConnOrPath,
    targets: None = None,
    count: int | None = None,
    explicit_tables: None = None,
    *,
    table: str,
) -> DataFrame
load_rate_series_from_sqlite(
    conn_or_path: SqliteConnOrPath,
    targets: None = None,
    count: int | None = None,
    explicit_tables: Sequence[str] | None = None,
    *,
    table: None = None,
) -> dict[tuple[str | None, int], DataFrame]
load_rate_series_from_sqlite(
    conn_or_path: SqliteConnOrPath,
    targets: Sequence[RateTarget],
    count: int,
    explicit_tables: Sequence[str] | None = None,
    *,
    table: None = None,
) -> dict[tuple[str | None, int], DataFrame]
load_rate_series_from_sqlite(
    conn_or_path: SqliteConnOrPath,
    targets: Sequence[RateTarget] | None = None,
    count: int | None = None,
    explicit_tables: Sequence[str] | None = None,
    *,
    table: str | None = None,
) -> dict[tuple[str | None, int], DataFrame] | DataFrame

Load one table/view or multiple rate series from a SQLite database.

Parameters:

Name Type Description Default
conn_or_path SqliteConnOrPath

SQLite database path or open connection.

required
targets Sequence[RateTarget] | None

Rate targets to load. Each (symbol, timeframe_int) pair must be unique. Omit when loading a single explicit table.

None
count int | None

Optional number of most recent rows to load per series.

None
explicit_tables Sequence[str] | None

Optional explicit table or view names matching targets. When omitted, managed rate_* compatibility views must already exist in the database.

None
table str | None

Optional single table or view name to load directly.

None

Returns:

Type Description
dict[tuple[str | None, int], DataFrame] | DataFrame

A DataFrame when table is provided, otherwise a mapping keyed by

dict[tuple[str | None, int], DataFrame] | DataFrame

(symbol, timeframe_int) to each rate DataFrame.

Raises:

Type Description
ValueError

If count is not positive, targets are empty, duplicate (symbol, timeframe_int) pairs are present, or table resolution fails.

Source code in mt5cli/history.py
def load_rate_series_from_sqlite(
    conn_or_path: SqliteConnOrPath,
    targets: Sequence[RateTarget] | None = None,
    count: int | None = None,
    explicit_tables: Sequence[str] | None = None,
    *,
    table: str | None = None,
) -> dict[tuple[str | None, int], pd.DataFrame] | pd.DataFrame:
    """Load one table/view or multiple rate series from a SQLite database.

    Args:
        conn_or_path: SQLite database path or open connection.
        targets: Rate targets to load. Each ``(symbol, timeframe_int)`` pair must
            be unique. Omit when loading a single explicit ``table``.
        count: Optional number of most recent rows to load per series.
        explicit_tables: Optional explicit table or view names matching targets.
            When omitted, managed ``rate_*`` compatibility views must already
            exist in the database.
        table: Optional single table or view name to load directly.

    Returns:
        A DataFrame when ``table`` is provided, otherwise a mapping keyed by
        ``(symbol, timeframe_int)`` to each rate DataFrame.

    Raises:
        ValueError: If ``count`` is not positive, targets are empty, duplicate
            ``(symbol, timeframe_int)`` pairs are present, or table resolution
            fails.
    """
    if table is not None:
        return load_rate_data(conn_or_path, table, count=count)
    if count is None or count <= 0:
        msg = "count must be positive."
        raise ValueError(msg)
    if targets is None:
        msg = "targets are required when table is not provided."
        raise ValueError(msg)
    target_list = list(targets)
    if not target_list:
        msg = "At least one rate target is required."
        raise ValueError(msg)
    if explicit_tables is None and any(target.symbol is None for target in target_list):
        msg = (
            "Cannot resolve a rate table for a target without a symbol; "
            "provide explicit_tables."
        )
        raise ValueError(msg)
    seen_keys: set[tuple[str | None, int]] = set()
    for target in target_list:
        key = (target.symbol, target.timeframe_int)
        if key in seen_keys:
            symbol_repr = repr(target.symbol)
            msg = f"Duplicate rate target: ({symbol_repr}, {target.timeframe_int})"
            raise ValueError(msg)
        seen_keys.add(key)
    tables = (
        resolve_rate_tables(None, target_list, explicit_tables)
        if explicit_tables is not None
        else None
    )
    conn, should_close = _open_existing_sqlite_database(conn_or_path)
    try:
        resolved_tables = tables or resolve_rate_tables(
            conn,
            target_list,
            require_existing=True,
        )
        return {
            (target.symbol, target.timeframe_int): load_rate_data_from_connection(
                conn,
                table,
                count=count,
            )
            for target, table in zip(target_list, resolved_tables, strict=True)
        }
    finally:
        if should_close:
            conn.close()

parse_sqlite_timestamp

parse_sqlite_timestamp(value: object) -> datetime | None

Parse a SQLite history timestamp value.

Returns:

Type Description
datetime | None

Parsed timezone-aware datetime, or None when parsing fails.

Source code in mt5cli/history.py
def parse_sqlite_timestamp(value: object) -> datetime | None:
    """Parse a SQLite history timestamp value.

    Returns:
        Parsed timezone-aware datetime, or None when parsing fails.
    """
    if value is None:
        return None
    if isinstance(value, datetime):
        return value if value.tzinfo is not None else value.replace(tzinfo=UTC)
    if isinstance(value, int | float):
        return datetime.fromtimestamp(float(value), tz=UTC)
    if isinstance(value, str):
        return _parse_string_sqlite_timestamp(value)
    logger.warning("Ignoring unsupported history timestamp type: %s", type(value))
    return None

quote_sqlite_identifier

quote_sqlite_identifier(identifier: str) -> str

Return a safely quoted SQLite identifier using double quotes.

Source code in mt5cli/history.py
def quote_sqlite_identifier(identifier: str) -> str:
    """Return a safely quoted SQLite identifier using double quotes."""
    return '"' + identifier.replace('"', '""') + '"'

record_written_columns

record_written_columns(
    written_columns: dict[Dataset, set[str]],
    dataset: Dataset,
    frame: DataFrame,
) -> None

Remember columns for datasets written during collection.

Source code in mt5cli/history.py
def record_written_columns(
    written_columns: dict[Dataset, set[str]],
    dataset: Dataset,
    frame: pd.DataFrame,
) -> None:
    """Remember columns for datasets written during collection."""
    columns = set(frame.columns)
    if dataset in written_columns:
        written_columns[dataset].update(columns)
    else:
        written_columns[dataset] = columns

resolve_granularity_name

resolve_granularity_name(timeframe: int) -> str

Return a granularity name for a timeframe integer when known.

Source code in mt5cli/history.py
def resolve_granularity_name(timeframe: int) -> str:
    """Return a granularity name for a timeframe integer when known."""
    try:
        name = _get_timeframe_name(timeframe)
    except ValueError:
        return str(timeframe)
    return name.removeprefix("TIMEFRAME_")

resolve_history_datasets

resolve_history_datasets(
    datasets: set[Dataset] | None,
) -> set[Dataset]

Resolve configured history datasets.

Returns:

Type Description
set[Dataset]

All supported datasets when datasets is None, otherwise the

set[Dataset]

configured selection (which may be empty).

Source code in mt5cli/history.py
def resolve_history_datasets(datasets: set[Dataset] | None) -> set[Dataset]:
    """Resolve configured history datasets.

    Returns:
        All supported datasets when ``datasets`` is None, otherwise the
        configured selection (which may be empty).
    """
    if datasets is None:
        return set(Dataset)
    return set(datasets)

resolve_history_tick_flags

resolve_history_tick_flags(flags: int | str) -> int

Resolve tick copy flags from an integer or name.

Returns:

Type Description
int

Integer tick flag value.

Source code in mt5cli/history.py
def resolve_history_tick_flags(flags: int | str) -> int:
    """Resolve tick copy flags from an integer or name.

    Returns:
        Integer tick flag value.
    """
    return parse_tick_flags(flags)

resolve_history_timeframes

resolve_history_timeframes(
    timeframes: Sequence[int | str] | None,
) -> list[int]

Resolve rate timeframes, deduplicating aliases for the same integer.

Returns:

Type Description
list[int]

Ordered list of unique timeframe integers.

Source code in mt5cli/history.py
def resolve_history_timeframes(
    timeframes: Sequence[int | str] | None,
) -> list[int]:
    """Resolve rate timeframes, deduplicating aliases for the same integer.

    Returns:
        Ordered list of unique timeframe integers.
    """
    raw = timeframes if timeframes is not None else DEFAULT_HISTORY_TIMEFRAMES
    seen: set[int] = set()
    resolved: list[int] = []
    for value in raw:
        tf = parse_timeframe(value)
        if tf not in seen:
            seen.add(tf)
            resolved.append(tf)
    return resolved

resolve_rate_table_name

resolve_rate_table_name(
    symbol: str, granularity: str
) -> str

Return the canonical normalized SQLite rate table name.

The normalized history table stores all symbols and timeframes in rates; use :func:resolve_rate_view_name for per-symbol compatibility view names.

Returns:

Type Description
str

Canonical normalized rates table name.

Raises:

Type Description
ValueError

If symbol or granularity is invalid.

Source code in mt5cli/history.py
def resolve_rate_table_name(symbol: str, granularity: str) -> str:
    """Return the canonical normalized SQLite rate table name.

    The normalized history table stores all symbols and timeframes in
    ``rates``; use :func:`resolve_rate_view_name` for per-symbol compatibility
    view names.

    Returns:
        Canonical normalized rates table name.

    Raises:
        ValueError: If ``symbol`` or ``granularity`` is invalid.
    """
    parse_timeframe(granularity)
    if not symbol.strip():
        msg = "symbol must not be empty."
        raise ValueError(msg)
    return Dataset.rates.table_name

resolve_rate_tables

resolve_rate_tables(
    conn_or_path: SqliteConnOrPath | None,
    targets: Sequence[RateTarget],
    explicit_tables: Sequence[str] | None = None,
    *,
    require_existing: bool = False,
) -> list[str]

Resolve SQLite table or view names for rate targets.

Parameters:

Name Type Description Default
conn_or_path SqliteConnOrPath | None

SQLite database path or open connection. May be None when explicit_tables is provided, or when require_existing is False and deterministic default view names are sufficient.

required
targets Sequence[RateTarget]

Rate targets to resolve.

required
explicit_tables Sequence[str] | None

Optional explicit table or view names. When provided, they are used as-is and must match the number of targets.

None
require_existing bool

When True, require the database and managed views to exist for each symbol target. Ignored when explicit_tables is provided.

False

Returns:

Type Description
list[str]

Table or view names aligned with targets.

Raises:

Type Description
ValueError

If targets is empty, explicit_tables length does not match the target count, a target without a symbol is resolved without an explicit table, or require_existing is True and the database or a managed view is missing.

Source code in mt5cli/history.py
def resolve_rate_tables(
    conn_or_path: SqliteConnOrPath | None,
    targets: Sequence[RateTarget],
    explicit_tables: Sequence[str] | None = None,
    *,
    require_existing: bool = False,
) -> list[str]:
    """Resolve SQLite table or view names for rate targets.

    Args:
        conn_or_path: SQLite database path or open connection. May be None when
            ``explicit_tables`` is provided, or when ``require_existing`` is
            False and deterministic default view names are sufficient.
        targets: Rate targets to resolve.
        explicit_tables: Optional explicit table or view names. When provided,
            they are used as-is and must match the number of targets.
        require_existing: When True, require the database and managed views to
            exist for each symbol target. Ignored when ``explicit_tables`` is
            provided.

    Returns:
        Table or view names aligned with ``targets``.

    Raises:
        ValueError: If ``targets`` is empty, ``explicit_tables`` length does not
            match the target count, a target without a symbol is resolved
            without an explicit table, or ``require_existing`` is True and the
            database or a managed view is missing.
    """
    target_list = list(targets)
    if not target_list:
        msg = "At least one rate target is required."
        raise ValueError(msg)
    if explicit_tables is not None:
        tables = list(explicit_tables)
        if len(tables) != len(target_list):
            msg = (
                f"Expected {len(target_list)} explicit table(s) "
                f"to match the targets, got {len(tables)}."
            )
            raise ValueError(msg)
        return tables
    if any(target.symbol is None for target in target_list):
        msg = (
            "Cannot resolve a rate table for a target without a symbol; "
            "provide explicit_tables."
        )
        raise ValueError(msg)
    conn, should_close = _open_history_connection(conn_or_path)
    try:
        if conn is None:
            if require_existing:
                path = (
                    conn_or_path
                    if isinstance(conn_or_path, (Path, str))
                    else "database"
                )
                msg = f"SQLite database not found: {path}"
                raise ValueError(msg)
            timeframe_counts = None
            existing_views: set[str] = set()
        else:
            timeframe_counts = _load_rates_timeframe_counts(conn)
            existing_views = _load_existing_rate_views(conn)
        resolved: list[str] = []
        for target in target_list:
            symbol = cast("str", target.symbol)
            timeframe = target.timeframe_int
            resolved.append(
                _resolve_rate_view_name_from_context(
                    symbol=symbol,
                    timeframe=timeframe,
                    granularity_name=resolve_granularity_name(timeframe),
                    timeframe_counts=timeframe_counts,
                    existing_views=existing_views,
                    require_existing=require_existing,
                ),
            )
        return resolved
    finally:
        if should_close and conn is not None:
            conn.close()

resolve_rate_view_name

resolve_rate_view_name(
    conn_or_path: SqliteConnOrPath | None,
    symbol: str,
    granularity: str,
    *,
    require_existing: bool = False,
) -> str

Resolve the mt5cli-managed rate compatibility view name.

Parameters:

Name Type Description Default
conn_or_path SqliteConnOrPath | None

SQLite database path or open connection. When None or a non-existing path and require_existing is False, the deterministic default view name is returned without creating a database file.

required
symbol str

Symbol stored in the normalized rates table.

required
granularity str

Timeframe name (for example M1) or integer string.

required
require_existing bool

When True, require the database and a managed view to exist.

False

Returns:

Type Description
str

View name such as rate_EURUSD__1 or rate_EURUSD__M1_1.

Raises:

Type Description
ValueError

If require_existing is True and the database or view is missing.

Source code in mt5cli/history.py
def resolve_rate_view_name(
    conn_or_path: SqliteConnOrPath | None,
    symbol: str,
    granularity: str,
    *,
    require_existing: bool = False,
) -> str:
    """Resolve the mt5cli-managed rate compatibility view name.

    Args:
        conn_or_path: SQLite database path or open connection. When None or a
            non-existing path and ``require_existing`` is False, the deterministic
            default view name is returned without creating a database file.
        symbol: Symbol stored in the normalized ``rates`` table.
        granularity: Timeframe name (for example ``M1``) or integer string.
        require_existing: When True, require the database and a managed view to exist.

    Returns:
        View name such as ``rate_EURUSD__1`` or ``rate_EURUSD__M1_1``.

    Raises:
        ValueError: If ``require_existing`` is True and the database or view is missing.
    """
    timeframe = parse_timeframe(granularity)
    granularity_name = resolve_granularity_name(timeframe)
    conn, should_close = _open_history_connection(conn_or_path)
    try:
        if conn is None:
            if require_existing:
                path = (
                    conn_or_path
                    if isinstance(conn_or_path, (Path, str))
                    else "database"
                )
                msg = f"SQLite database not found: {path}"
                raise ValueError(msg)
            return build_rate_view_name(
                symbol=symbol,
                granularity=granularity_name,
                granularity_count=1,
                timeframe=timeframe,
            )
        return _resolve_rate_view_name_from_context(
            symbol=symbol,
            timeframe=timeframe,
            granularity_name=granularity_name,
            timeframe_counts=_load_rates_timeframe_counts(conn),
            existing_views=_load_existing_rate_views(conn),
            require_existing=require_existing,
        )
    finally:
        if should_close and conn is not None:
            conn.close()

resolve_rate_view_names

resolve_rate_view_names(
    conn_or_path: SqliteConnOrPath | None,
    symbols: Sequence[str],
    granularities: Sequence[str],
    *,
    require_existing: bool = False,
) -> list[str]

Resolve rate compatibility view names for symbol and granularity pairs.

Parameters:

Name Type Description Default
conn_or_path SqliteConnOrPath | None

SQLite database path or open connection. When None or a non-existing path and require_existing is False, deterministic default view names are returned without creating a database file.

required
symbols Sequence[str]

Symbols stored in the normalized rates table.

required
granularities Sequence[str]

Timeframe names (for example M1) or integer strings.

required
require_existing bool

When True, require the database and managed views to exist.

False

Returns:

Type Description
list[str]

View names in row-major order: every granularity for the first

list[str]

symbol, then every granularity for the next symbol, and so on.

Source code in mt5cli/history.py
def resolve_rate_view_names(
    conn_or_path: SqliteConnOrPath | None,
    symbols: Sequence[str],
    granularities: Sequence[str],
    *,
    require_existing: bool = False,
) -> list[str]:
    """Resolve rate compatibility view names for symbol and granularity pairs.

    Args:
        conn_or_path: SQLite database path or open connection. When None or a
            non-existing path and ``require_existing`` is False, deterministic
            default view names are returned without creating a database file.
        symbols: Symbols stored in the normalized ``rates`` table.
        granularities: Timeframe names (for example ``M1``) or integer strings.
        require_existing: When True, require the database and managed views to exist.

    Returns:
        View names in row-major order: every ``granularity`` for the first
        symbol, then every granularity for the next symbol, and so on.
    """
    conn, should_close = _open_history_connection(conn_or_path)
    try:
        if conn is None:
            return [
                resolve_rate_view_name(
                    conn_or_path,
                    symbol,
                    granularity,
                    require_existing=require_existing,
                )
                for symbol in symbols
                for granularity in granularities
            ]
        timeframe_counts = _load_rates_timeframe_counts(conn)
        existing_views = _load_existing_rate_views(conn)
        resolved: list[str] = []
        for symbol in symbols:
            for granularity in granularities:
                timeframe = parse_timeframe(granularity)
                resolved.append(
                    _resolve_rate_view_name_from_context(
                        symbol=symbol,
                        timeframe=timeframe,
                        granularity_name=resolve_granularity_name(timeframe),
                        timeframe_counts=timeframe_counts,
                        existing_views=existing_views,
                        require_existing=require_existing,
                    ),
                )
        return resolved
    finally:
        if should_close and conn is not None:
            conn.close()

write_collected_datasets

write_collected_datasets(
    conn: Connection,
    client: Mt5DataClient,
    symbols: Sequence[str],
    datasets: set[Dataset],
    timeframe: int,
    flags: int,
    date_from: datetime,
    date_to: datetime,
    if_exists: IfExists,
) -> tuple[set[Dataset], dict[Dataset, set[str]]]

Collect selected datasets and stream each symbol frame into SQLite.

Returns:

Type Description
tuple[set[Dataset], dict[Dataset, set[str]]]

Written datasets and their columns.

Source code in mt5cli/history.py
def write_collected_datasets(
    conn: sqlite3.Connection,
    client: Mt5DataClient,
    symbols: Sequence[str],
    datasets: set[Dataset],
    timeframe: int,
    flags: int,
    date_from: datetime,
    date_to: datetime,
    if_exists: IfExists,
) -> tuple[set[Dataset], dict[Dataset, set[str]]]:
    """Collect selected datasets and stream each symbol frame into SQLite.

    Returns:
        Written datasets and their columns.
    """
    written_columns: dict[Dataset, set[str]] = {}
    written_tables: set[Dataset] = set()
    if Dataset.rates in datasets and write_rates_dataset(
        conn,
        client,
        symbols,
        timeframe,
        date_from,
        date_to,
        if_exists,
        written_columns,
    ):
        written_tables.add(Dataset.rates)
    if Dataset.ticks in datasets and write_ticks_dataset(
        conn,
        client,
        symbols,
        flags,
        date_from,
        date_to,
        if_exists,
        written_columns,
    ):
        written_tables.add(Dataset.ticks)
    if Dataset.history_orders in datasets and write_history_dataset(
        conn,
        client.history_orders_get_as_df,
        Dataset.history_orders,
        symbols,
        date_from,
        date_to,
        if_exists,
        written_columns,
        include_account_events=False,
    ):
        written_tables.add(Dataset.history_orders)
    if Dataset.history_deals in datasets and write_history_dataset(
        conn,
        client.history_deals_get_as_df,
        Dataset.history_deals,
        symbols,
        date_from,
        date_to,
        if_exists,
        written_columns,
        include_account_events=False,
    ):
        written_tables.add(Dataset.history_deals)
    return written_tables, written_columns

write_history_dataset

write_history_dataset(
    conn: Connection,
    fetch: Callable[..., DataFrame],
    dataset: Dataset,
    symbols: Sequence[str],
    date_from: datetime,
    date_to: datetime,
    if_exists: IfExists,
    written_columns: dict[Dataset, set[str]],
    *,
    include_account_events: bool = False,
) -> bool

Stream a history dataset into SQLite.

Returns:

Type Description
bool

True if the target table was written.

Source code in mt5cli/history.py
def write_history_dataset(
    conn: sqlite3.Connection,
    fetch: Callable[..., pd.DataFrame],
    dataset: Dataset,
    symbols: Sequence[str],
    date_from: datetime,
    date_to: datetime,
    if_exists: IfExists,
    written_columns: dict[Dataset, set[str]],
    *,
    include_account_events: bool = False,
) -> bool:
    """Stream a history dataset into SQLite.

    Returns:
        True if the target table was written.
    """
    table_exists = False
    if include_account_events:
        frame = filter_trade_history_frame(
            fetch(date_from=date_from, date_to=date_to),
            symbols,
            include_account_events=True,
        )
        return write_streamed_frame(
            conn,
            frame,
            dataset,
            table_exists,
            if_exists,
            written_columns,
        )

    def _fetch_history_frame(sym: str) -> pd.DataFrame:
        return filter_trade_history_frame(
            fetch(date_from=date_from, date_to=date_to, symbol=sym),
            [sym],
            include_account_events=False,
        )

    return _stream_symbol_frames(
        conn,
        symbols,
        dataset,
        if_exists,
        written_columns,
        _fetch_history_frame,
    )

write_incremental_datasets

write_incremental_datasets(
    conn: Connection,
    client: Mt5DataClient,
    symbols: Sequence[str],
    selected_datasets: set[Dataset],
    resolved_timeframes: list[int],
    resolved_tick_flags: int,
    fallback_start: datetime,
    end_date: datetime,
    *,
    deduplicate: bool,
    create_rate_views: bool,
    with_views: bool,
    include_account_events: bool,
) -> tuple[set[Dataset], dict[Dataset, set[str]]]

Append selected datasets incrementally and refresh indexes and views.

Returns:

Type Description
tuple[set[Dataset], dict[Dataset, set[str]]]

Written datasets and their columns.

Source code in mt5cli/history.py
def write_incremental_datasets(  # noqa: PLR0913
    conn: sqlite3.Connection,
    client: Mt5DataClient,
    symbols: Sequence[str],
    selected_datasets: set[Dataset],
    resolved_timeframes: list[int],
    resolved_tick_flags: int,
    fallback_start: datetime,
    end_date: datetime,
    *,
    deduplicate: bool,
    create_rate_views: bool,
    with_views: bool,
    include_account_events: bool,
) -> tuple[set[Dataset], dict[Dataset, set[str]]]:
    """Append selected datasets incrementally and refresh indexes and views.

    Returns:
        Written datasets and their columns.
    """
    written_columns: dict[Dataset, set[str]] = {}
    written_tables: set[Dataset] = set()
    dedup_scopes: dict[Dataset, list[DedupScope]] = {}
    if Dataset.rates in selected_datasets:
        _write_incremental_rates(
            conn,
            client,
            symbols,
            resolved_timeframes,
            fallback_start,
            end_date,
            written_columns,
            written_tables,
            dedup_scopes,
        )
    if Dataset.ticks in selected_datasets:
        _write_incremental_ticks(
            conn,
            client,
            symbols,
            resolved_tick_flags,
            fallback_start,
            end_date,
            written_columns,
            written_tables,
            dedup_scopes,
        )
    if Dataset.history_orders in selected_datasets:
        _write_incremental_history_orders(
            conn,
            client,
            symbols,
            fallback_start,
            end_date,
            written_columns,
            written_tables,
            dedup_scopes,
        )
    if Dataset.history_deals in selected_datasets:
        _write_incremental_history_deals(
            conn,
            client,
            symbols,
            fallback_start,
            end_date,
            written_columns,
            written_tables,
            dedup_scopes,
            include_account_events=include_account_events,
        )
    _finalize_incremental_writes(
        conn,
        selected_datasets,
        written_columns,
        written_tables,
        dedup_scopes,
        deduplicate=deduplicate,
        create_rate_views=create_rate_views,
        with_views=with_views,
    )
    return written_tables, written_columns

write_rates_dataset

write_rates_dataset(
    conn: Connection,
    client: Mt5DataClient,
    symbols: Sequence[str],
    timeframe: int,
    date_from: datetime,
    date_to: datetime,
    if_exists: IfExists,
    written_columns: dict[Dataset, set[str]],
) -> bool

Stream rates frames into SQLite.

Returns:

Type Description
bool

True if the rates table was written.

Source code in mt5cli/history.py
def write_rates_dataset(
    conn: sqlite3.Connection,
    client: Mt5DataClient,
    symbols: Sequence[str],
    timeframe: int,
    date_from: datetime,
    date_to: datetime,
    if_exists: IfExists,
    written_columns: dict[Dataset, set[str]],
) -> bool:
    """Stream rates frames into SQLite.

    Returns:
        True if the rates table was written.
    """

    def _fetch_rates_frame(sym: str) -> pd.DataFrame:
        frame = client.copy_rates_range_as_df(
            symbol=sym,
            timeframe=timeframe,
            date_from=date_from,
            date_to=date_to,
        ).drop(columns=["symbol", "timeframe"], errors="ignore")
        if len(frame.columns) != 0:
            frame.insert(0, "symbol", sym)
            frame.insert(1, "timeframe", timeframe)
        return frame

    return _stream_symbol_frames(
        conn,
        symbols,
        Dataset.rates,
        if_exists,
        written_columns,
        _fetch_rates_frame,
    )

write_streamed_frame

write_streamed_frame(
    conn: Connection,
    frame: DataFrame,
    dataset: Dataset,
    table_exists: bool,
    if_exists: IfExists,
    written_columns: dict[Dataset, set[str]],
) -> bool

Write one streamed dataset frame and track table state.

Returns:

Type Description
bool

True if the dataset table exists after this write attempt.

Source code in mt5cli/history.py
def write_streamed_frame(
    conn: sqlite3.Connection,
    frame: pd.DataFrame,
    dataset: Dataset,
    table_exists: bool,
    if_exists: IfExists,
    written_columns: dict[Dataset, set[str]],
) -> bool:
    """Write one streamed dataset frame and track table state.

    Returns:
        True if the dataset table exists after this write attempt.
    """
    write_mode = IfExists.APPEND if table_exists else if_exists
    if append_dataframe(conn, frame, dataset.table_name, write_mode):
        record_written_columns(written_columns, dataset, frame)
        return True
    return table_exists

write_ticks_dataset

write_ticks_dataset(
    conn: Connection,
    client: Mt5DataClient,
    symbols: Sequence[str],
    flags: int,
    date_from: datetime,
    date_to: datetime,
    if_exists: IfExists,
    written_columns: dict[Dataset, set[str]],
) -> bool

Stream ticks frames into SQLite.

Returns:

Type Description
bool

True if the ticks table was written.

Source code in mt5cli/history.py
def write_ticks_dataset(
    conn: sqlite3.Connection,
    client: Mt5DataClient,
    symbols: Sequence[str],
    flags: int,
    date_from: datetime,
    date_to: datetime,
    if_exists: IfExists,
    written_columns: dict[Dataset, set[str]],
) -> bool:
    """Stream ticks frames into SQLite.

    Returns:
        True if the ticks table was written.
    """

    def _fetch_ticks_frame(sym: str) -> pd.DataFrame:
        frame = client.copy_ticks_range_as_df(
            symbol=sym,
            date_from=date_from,
            date_to=date_to,
            flags=flags,
        ).drop(columns=["symbol"], errors="ignore")
        if len(frame.columns) != 0:
            frame.insert(0, "symbol", sym)
        return frame

    return _stream_symbol_frames(
        conn,
        symbols,
        Dataset.ticks,
        if_exists,
        written_columns,
        _fetch_ticks_frame,
    )

collect-history schema

The collect-history command (and the matching collect_history SDK function) writes selected MT5 datasets into one SQLite database. Each dataset becomes a table; column names and types mirror the pdmt5 DataFrame schema for that export, with two additions:

  • symbol is prepended on every table.
  • timeframe is prepended on rates so appended runs at different bar sizes stay distinguishable.

SQLite does not declare foreign keys. Rows are linked logically by symbol, time windows, and (for deals) position_id / order. Duplicate rows are removed on append using dataset-specific keys (for example ticket on history tables, or (symbol, timeframe, time) on rates).

Optional views are created when --with-views is set and the history-deals dataset was written.

Entity-relationship diagram

Sample layout for a full collection with --with-views:

erDiagram
    rates {
        TEXT symbol "dedup key"
        INTEGER timeframe "dedup key"
        TEXT time "dedup key"
        REAL open
        REAL high
        REAL low
        REAL close
        INTEGER tick_volume
        INTEGER spread
        INTEGER real_volume
    }

    ticks {
        TEXT symbol "dedup key"
        TEXT time "dedup key"
        INTEGER time_msc "dedup key (preferred)"
        REAL bid
        REAL ask
        REAL last
        INTEGER volume
        INTEGER flags
        REAL volume_real
    }

    history_orders {
        INTEGER ticket "dedup key"
        TEXT symbol
        TEXT time
        INTEGER type
        INTEGER state
        REAL volume_initial
        REAL price_open
        REAL price_current
        INTEGER magic
    }

    history_deals {
        INTEGER ticket "dedup key"
        INTEGER order
        INTEGER position_id "groups position view"
        TEXT symbol
        TEXT time
        INTEGER type "0/1 trade, else cash event"
        INTEGER entry "0 IN, 1 OUT, 2 INOUT, 3 OUT_BY"
        REAL volume
        REAL price
        REAL profit
        REAL commission
        REAL swap
        REAL fee
    }

    cash_events {
        INTEGER ticket
        TEXT symbol
        TEXT time
        INTEGER type
        REAL profit
    }

    positions_reconstructed {
        INTEGER position_id
        TEXT symbol
        TEXT open_time
        TEXT close_time
        INTEGER direction
        REAL volume_open
        REAL volume_close
        REAL volume_reversal
        REAL open_price
        REAL close_price
        REAL total_profit
        INTEGER reversal_count
        INTEGER deals_count
    }

    rates ||--o{ history_deals : "symbol (logical)"
    ticks ||--o{ history_deals : "symbol (logical)"
    history_orders ||--o{ history_deals : "order ~ ticket (logical)"
    history_deals ||--|| cash_events : "VIEW: type NOT IN (0,1)"
    history_deals ||--o{ positions_reconstructed : "VIEW: GROUP BY position_id"

Tables and views

Object Kind Source Notes
rates table copy_rates_range Indexed on (symbol, timeframe, time) when columns exist.
ticks table copy_ticks_range Indexed on (symbol, time) when columns exist.
history_orders table history_orders_get Fetched per --symbol, then concatenated.
history_deals table history_deals_get Fetched per --symbol, then concatenated. Indexed on (position_id, symbol) when present.
cash_events view history_deals Non-trade deal types (deposits, balance ops, etc.). Requires type column.
positions_reconstructed view history_deals One row per closed position_id; volume-weighted prices and reversal stats.

Column sets can vary with terminal and pdmt5 version. Views are skipped with a warning when required columns are missing.

Incremental collection

The update_history SDK path uses the same base tables and optional cash_events / positions_reconstructed views. It additionally maintains rate_<symbol>__<timeframe> compatibility views when create_rate_views=True.

Rate view resolution

Downstream tools can resolve mt5cli-managed compatibility view names from an existing SQLite history database without creating files or guessing naming schemes:

from pathlib import Path

from mt5cli.history import resolve_rate_view_name, resolve_rate_view_names

# Single symbol and granularity
view = resolve_rate_view_name(Path("history.db"), "EURUSD", "M1")

# Batch resolution in row-major order
views = resolve_rate_view_names(
    Path("history.db"),
    ["EURUSD", "GBPUSD"],
    ["M1", "H1"],
)

Resolution rules:

  • Returns rate_<symbol>__<timeframe> when a symbol stores one timeframe.
  • Returns rate_<symbol>__<granularity>_<timeframe> when multiple timeframes are stored for the same symbol.
  • When multiple naming candidates apply, prefers an existing managed rate_*__* view from the candidate list.
  • Falls back to single-timeframe naming when the database path is missing or rates metadata is unavailable.
  • Pass require_existing=True to raise ValueError instead of returning a best-guess name when the database or view is missing.
  • Accepts either a SQLite path or an open sqlite3.Connection.

Rate data loading

The canonical normalized rate table is rates; compatibility views are named with rate_<symbol>__<timeframe> for single-timeframe symbols or rate_<symbol>__<granularity>_<timeframe> when a symbol has multiple stored timeframes. resolve_rate_table_name() returns rates, while resolve_rate_view_name() returns the per-symbol compatibility view name.

Use load_rate_data() or load_rate_series_from_sqlite(..., table=...) to load a single table or view from a SQLite path. Use load_rate_series_by_granularity() to load multiple instrument/granularity targets without hard-coding view names:

from pathlib import Path

from mt5cli import (
    load_rate_data,
    load_rate_series_by_granularity,
    load_rate_series_from_sqlite,
    resolve_rate_table_name,
)
from mt5cli.history import resolve_rate_view_name

view = resolve_rate_view_name(Path("history.db"), "EURUSD", "M1", require_existing=True)
rates = load_rate_data(Path("history.db"), view, count=1000)
same_rates = load_rate_series_from_sqlite(Path("history.db"), table=view, count=1000)

table = resolve_rate_table_name("EURUSD", "M1")  # "rates"
series = load_rate_series_by_granularity(
    Path("history.db"),
    symbols=["EURUSD", "GBPUSD"],
    granularities=["M1", "H1"],
    count=500,
)

count returns the latest rows while preserving chronological order. Missing tables/views and mismatched explicit_tables lengths raise ValueError with the requested database target in the message.

The loader accepts close-based OHLC rate data or tick-like bid/ask data. It validates that time exists, parses timestamps with pandas, and returns a DataFrame indexed by ascending DatetimeIndex named time.

Multi-series rate loading

For loading many rate series at once, build neutral RateTarget pairs and load them from SQLite in one call. View names are resolved via the same compatibility-view rules, or you can pass explicit_tables to bypass resolution:

from pathlib import Path

from mt5cli import build_rate_targets, load_rate_series_from_sqlite

targets = build_rate_targets(["EURUSD", "GBPUSD"], ["M1", "H1"])
series = load_rate_series_from_sqlite(Path("history.db"), targets, count=1000)
frame = series["EURUSD", 1]  # keyed by (symbol, integer timeframe)
  • build_rate_targets() returns RateTarget(symbol, timeframe) pairs in row-major order, normalizing timeframe names such as "M1" to their integer values; set allow_missing_symbol=True to address series solely by explicit_tables (targets carry symbol=None).
  • resolve_rate_tables() maps targets to table or view names and validates that any explicit_tables count matches the target count. Pass require_existing=True to raise ValueError instead of returning a best-guess name when the database or managed view is missing. When explicit_tables is provided, names are returned as-is and require_existing is ignored.
  • load_rate_series_from_sqlite() returns a mapping keyed by (symbol, integer timeframe). Unless explicit_tables is supplied, it requires existing managed rate_* compatibility views and raises ValueError when they are missing. Duplicate (symbol, timeframe) targets are rejected.
  • load_rate_series_by_granularity() is a thin wrapper that builds the targets, loads the series, and rekeys the result by granularity name to avoid converting integer timeframes downstream:
from mt5cli import load_rate_series_by_granularity

series = load_rate_series_by_granularity(
    "history.db", ["EURUSD"], ["M1", "H1"], count=1000
)
frame = series["EURUSD", "M1"]  # keyed by (symbol | None, granularity_name)