reactive.poll

reactive.poll(poll_func, interval_secs=1, *, equals=eq, priority=0, session=MISSING)

Create a reactive polling object.

Polling is a technique that approximates "real-time" or streaming updates, as if a data source were pushing notifications each time it is updated. The data source does not actually push notifications; a polling object repeatedly checks for changes in an efficient way at specified intervals. If a change is detected, the polling object runs a function to re-read the data source.

A reactive polling object is constructed using two functions: a polling function, which is a fast-running, inexpensive function that is used to determine whether some data source has changed (such as the timestamp of a file, or a SELECT MAX(updated) FROM table query); and a slower-running reading function that actually loads and returns the data that is desired. The poll() function is intended to be used as a decorator: the poll function is passed as the poll_func arg to @poll(), while the data reading function is the target of the decorator.

Reactive consumers can invoke the resulting polling object to get the current data, and will automatically invalidate when the polling function detects a change. Polling objects also cache the results of the read function; for this reason, apps where all sessions depend on the same data source may want to declare the polling object at the top level of app.py (outside of the server function).

Both poll_func and the decorated (data reading) function can read reactive values and calc objects. Any invalidations triggered by reactive dependencies will apply to the reactive polling object immediately (not waiting for the interval_secs delay to expire).

Parameters

poll_func: Callable[[], Any] | Callable[[], Awaitable[Any]]

A function to be called frequently to determine whether a data source has changed. The return value should be something that can be compared inexpensively using ==. Both regular functions and co-routine functions are allowed.

Note that the poll_func should NOT return a bool that indicates whether the data source has changed. Rather, each poll_func return value will be checked for equality with its preceding poll_func return value (using == semantics by default), and if it differs, the data source will be considered changed.

interval_secs: float = 1

The number of seconds to wait after each poll_func invocation before polling again. Note: depending on what other tasks are executing, the actual wait time may far exceed this value.

equals: Callable[[Any, Any], bool] = eq

The function that will be used to compare each poll_func return value with its immediate predecessor.

priority: int = 0

Reactive polling is implemented using an effect to call poll_func on a timer; use the priority argument to control the order of this Effect’s execution versus other Effects in your app. See effect for more details.

session: MISSING_TYPE | Session | None = MISSING

A Session instance. If not provided, a session is inferred via get_current_session. If there is no current session (i.e. poll is being created outside of the server function), the lifetime of this reactive poll object will not be tied to any specific session.

Returns

Type Description
Callable[[Callable[[], T]], Callable[[], T]] A decorator that should be applied to a no-argument function that (expensively)
reads whatever data is desired. (This function may be a regular function or a
co-routine function.) The result of the decorator is a reactive
func:~shiny.reactive.calc that always returns up-to-date data, and invalidates
callers when changes are detected via polling.

See Also

Examples

#| standalone: true
#| components: [editor, viewer]
#| layout: vertical
#| viewerHeight: 400

## file: app.py
import asyncio
import random
import sqlite3
from datetime import datetime
from typing import Any, Awaitable

import pandas as pd

from shiny import App, Inputs, Outputs, Session, reactive, render, ui

SYMBOLS = ["AAA", "BBB", "CCC", "DDD", "EEE", "FFF"]


def timestamp() -> str:
    return datetime.now().strftime("%x %X")


def rand_price() -> float:
    return round(random.random() * 250, 2)


# === Initialize the database =========================================


def init_db(con: sqlite3.Connection) -> None:
    cur = con.cursor()
    try:
        cur.executescript(
            """
            CREATE TABLE stock_quotes (timestamp text, symbol text, price real);
            CREATE INDEX idx_timestamp ON stock_quotes (timestamp);
            """
        )
        cur.executemany(
            "INSERT INTO stock_quotes (timestamp, symbol, price) VALUES (?, ?, ?)",
            [(timestamp(), symbol, rand_price()) for symbol in SYMBOLS],
        )
        con.commit()
    finally:
        cur.close()


conn = sqlite3.connect(":memory:")
init_db(conn)


# === Randomly update the database with an asyncio.task ==============


def update_db(con: sqlite3.Connection) -> None:
    """Update a single stock price entry at random"""

    cur = con.cursor()
    try:
        sym = SYMBOLS[random.randint(0, len(SYMBOLS) - 1)]
        print(f"Updating {sym}")
        cur.execute(
            "UPDATE stock_quotes SET timestamp = ?, price = ? WHERE symbol = ?",
            (timestamp(), rand_price(), sym),
        )
        con.commit()
    finally:
        cur.close()


async def update_db_task(con: sqlite3.Connection) -> Awaitable[None]:
    """Task that alternates between sleeping and updating prices"""
    while True:
        await asyncio.sleep(random.random() * 1.5)
        update_db(con)


asyncio.create_task(update_db_task(conn))


# === Create the reactive.poll object ===============================


def tbl_last_modified() -> Any:
    df = pd.read_sql_query("SELECT MAX(timestamp) AS timestamp FROM stock_quotes", conn)
    return df["timestamp"].to_list()


@reactive.poll(tbl_last_modified, 0.5)
def stock_quotes() -> pd.DataFrame:
    return pd.read_sql_query("SELECT timestamp, symbol, price FROM stock_quotes", conn)


# === Define the Shiny UI and server ===============================

app_ui = ui.page_fluid(
    ui.card(
        ui.markdown(
            """
            # `shiny.reactive.poll` demo

            This example app shows how to stream results from a database (in this
            case, an in-memory sqlite3) with the help of `shiny.reactive.poll`.
            """
        ),
        ui.input_selectize(
            "symbols", "Filter by symbol", [""] + SYMBOLS, multiple=True
        ),
        ui.output_data_frame("table"),
        fill=False,
    )
)


def server(input: Inputs, output: Outputs, session: Session) -> None:
    def filtered_quotes():
        df = stock_quotes()
        if input.symbols():
            df = df[df["symbol"].isin(input.symbols())]
        return df

    @render.data_frame
    def table():
        return filtered_quotes()


app = App(app_ui, server)