from dagster_airbyte import build_airbyte_assets
from dagster_dbt import load_assets_from_dbt_project
from dagster import asset, Definitions
from .constants import AIRBYTE_CONNECTION_ID, DBT_PROJECT_DIR
airbyte_assets = build_airbyte_assets(
connection_id=AIRBYTE_CONNECTION_ID,
destination_tables=["orders", "users"],
asset_key_prefix=["postgres_replica"],
)
dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR)
@asset(compute_kind="python")
def order_forecast_model(daily_order_summary: pd.DataFrame) -> np.ndarray:
train_set = daily_order_summary.to_numpy()
xdata, ydata = train_set[:, 0], train_set[:, 2]
return optimize.curve_fit(f=model_func, xdata=xdata, ydata=ydata, p0=[10, 100])[0]
defs = Definitions(
assets=[*airbyte_assets, *dbt_assets, order_forecast_model]
)