A couple of weeks ago, I learned about an AI Agent from this Microsoft DevBlogs, which mainly talks about building an AI Agent on top of Copilot Studio. So, as a good student, I tried to build my own Agent to learn about Indonesian Stocks. But, for this part, I just want to show the Azure Functions where it will scrape Yahoo data and store it in SharePoint. And interestingly enough, it runs using Python!
Code
Here is the main function:
from __future__ import annotations
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List
import azure.functions as func
from trading_scrape.graph_sharepoint import (
GraphClient,
download_bytes,
load_graph_creds_from_env,
load_sharepoint_target_from_env,
upload_bytes_replace,
)
from trading_scrape.yfinance_downloader import DownloadOptions, dedupe_keep_order, download_one_to_csv_text, sanitize_filename
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
def _parse_tickers_text(text: str) -> List[str]:
tickers: List[str] = []
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
parts = [p.strip() for p in line.split(",") if p.strip()]
tickers.extend(parts)
return tickers
def _parse_tickers(req: func.HttpRequest, body: Dict[str, Any]) -> List[str]:
tickers: List[str] = []
# Prefer JSON body
body_tickers = body.get("tickers")
if isinstance(body_tickers, list):
tickers.extend([str(t) for t in body_tickers])
elif isinstance(body_tickers, str) and body_tickers.strip():
tickers.extend([t.strip() for t in body_tickers.split(",") if t.strip()])
# Query param fallback: ?tickers=AAPL,MSFT
qp = req.params.get("tickers")
if qp:
tickers.extend([t.strip() for t in qp.split(",") if t.strip()])
# Env fallback (inline)
if not tickers:
env_tickers = os.environ.get("TICKERS", "").strip()
if env_tickers:
tickers.extend([t.strip() for t in env_tickers.split(",") if t.strip()])
return dedupe_keep_order(tickers)
def _process_ticker(ticker: str, options: DownloadOptions, graph: GraphClient, sp_target: SharePointTarget) -> Dict[str, Any]:
try:
csv_text = download_one_to_csv_text(ticker, options)
file_name = f"{sanitize_filename(ticker)}.csv"
uploaded = upload_bytes_replace(
graph=graph,
target=sp_target,
file_name=file_name,
content=csv_text.encode("utf-8"),
content_type="text/csv",
)
return {
"success": True,
"ticker": ticker,
"file": file_name,
"sharepoint": {
"id": uploaded.get("id"),
"name": uploaded.get("name"),
"webUrl": uploaded.get("webUrl"),
},
}
except Exception as e:
return {"success": False, "ticker": ticker, "error": str(e)}
@app.route(route="download")
def download_to_sharepoint(req: func.HttpRequest) -> func.HttpResponse:
try:
try:
body = req.get_json()
if not isinstance(body, dict):
body = {}
except ValueError:
body = {}
graph = GraphClient(load_graph_creds_from_env())
sp_target = load_sharepoint_target_from_env()
tickers: List[str] = []
# Optional: load tickers from a SharePoint .txt file
tickers_file = (
body.get("tickers_file")
or body.get("tickersFile")
or req.params.get("tickers_file")
or os.environ.get("SP_TICKERS_FILE_PATH", "").strip()
or os.environ.get("TICKERS_FILE", "").strip() # backward-compatible name
)
if isinstance(tickers_file, str) and tickers_file.strip():
raw = download_bytes(graph=graph, target=sp_target, name_or_path=tickers_file.strip())
tickers.extend(_parse_tickers_text(raw.decode("utf-8")))
tickers.extend(_parse_tickers(req, body))
tickers = dedupe_keep_order(tickers)
if not tickers:
return func.HttpResponse(
json.dumps(
{
"error": "No tickers provided. Provide JSON {tickers:[...]} or ?tickers=AAPL,MSFT or set tickers_file (SharePoint path) / SP_TICKERS_FILE_PATH."
}
),
status_code=400,
mimetype="application/json",
)
period = str(body.get("period") or req.params.get("period") or "3y")
interval = str(body.get("interval") or req.params.get("interval") or "1d")
auto_adjust = bool(body.get("auto_adjust", False))
threads = not bool(body.get("no_threads", False))
options = DownloadOptions(
period=period,
interval=interval,
auto_adjust=auto_adjust,
threads=threads,
)
results: List[Dict[str, Any]] = []
failures: List[Dict[str, Any]] = []
with ThreadPoolExecutor(max_workers=min(len(tickers), 10)) as executor:
futures = [executor.submit(_process_ticker, ticker, options, graph, sp_target) for ticker in tickers]
for future in as_completed(futures):
result = future.result()
if result["success"]:
results.append({
"ticker": result["ticker"],
"file": result["file"],
"sharepoint": result["sharepoint"],
})
else:
failures.append({"ticker": result["ticker"], "error": result["error"]})
status = 200 if not failures else 207 # Multi-Status
return func.HttpResponse(
json.dumps(
{
"ok": len(failures) == 0,
"period": period,
"interval": interval,
"uploaded": results,
"failed": failures,
},
ensure_ascii=False,
),
status_code=status,
mimetype="application/json",
)
except Exception as e:
return func.HttpResponse(
json.dumps({"error": str(e)}),
status_code=500,
mimetype="application/json",
)
The code will retrieve the tickers file (all the stocks that need to be processed, which will be explained later) and loop through the tickers one by one (calling _process_ticker). For each of the operations, it will download the data using trading_scrape.yfinance_downloader.download_one_to_csv_text and upload the file into SharePoint using trading_scrape.graph_sharepoint.upload_bytes_replace.
Here is the yfinance_downloader class:
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from io import StringIO
from typing import Iterable, List
import yfinance as yf
@dataclass(frozen=True)
class DownloadOptions:
period: str
interval: str
auto_adjust: bool
threads: bool
def sanitize_filename(value: str) -> str:
value = value.strip()
value = re.sub(r"[^A-Za-z0-9\^\.\-=]+", "_", value)
value = value.replace(os.sep, "_")
if os.altsep:
value = value.replace(os.altsep, "_")
return value or "UNKNOWN"
def dedupe_keep_order(items: Iterable[str]) -> List[str]:
seen: set[str] = set()
out: List[str] = []
for item in items:
t = item.strip()
if not t:
continue
if t not in seen:
seen.add(t)
out.append(t)
return out
def download_one_to_csv_text(ticker: str, options: DownloadOptions) -> str:
df = yf.download(
tickers=ticker,
period=options.period,
interval=options.interval,
auto_adjust=options.auto_adjust,
threads=options.threads,
progress=False,
)
if df is None or df.empty:
raise RuntimeError(f"No data returned for ticker '{ticker}'.")
buf = StringIO()
df.to_csv(buf, index=True)
return buf.getvalue()
As you can see, this class is just a wrapper of the yfinance library to help scrape the Yahoo data into CSV.
Here is the implementation of graph_sharepoint:
from __future__ import annotations
import json
import os
import time
from dataclasses import dataclass
from typing import Any, Dict, Optional
from urllib.parse import quote
import base64
import requests
# Override for sovereign clouds (e.g., https://graph.microsoft.us/v1.0)
GRAPH_BASE = os.environ.get("GRAPH_BASE", "https://graph.microsoft.com/v1.0").strip() or "https://graph.microsoft.com/v1.0"
# OAuth scope for client-credentials tokens. For sovereign clouds, set this to match the Graph host
# (e.g., https://graph.microsoft.us/.default). Keep this without /v1.0.
GRAPH_SCOPE = os.environ.get("GRAPH_SCOPE", "https://graph.microsoft.com/.default").strip() or "https://graph.microsoft.com/.default"
# When enabled, include a small, non-secret token claim summary in auth-related errors.
GRAPH_DEBUG_AUTH = os.environ.get("GRAPH_DEBUG_AUTH", "").strip().lower() in {"1", "true", "yes", "y"}
def _jwt_claims_summary(jwt_token: str) -> Dict[str, Any]:
"""Decode a JWT payload without verification and return a small safe summary.
This is for diagnostics only; do not log the token itself.
"""
try:
parts = (jwt_token or "").split(".")
if len(parts) < 2:
return {}
payload_b64 = parts[1]
# base64url padding
payload_b64 += "=" * (-len(payload_b64) % 4)
payload = base64.urlsafe_b64decode(payload_b64.encode("utf-8"))
claims = json.loads(payload.decode("utf-8"))
if not isinstance(claims, dict):
return {}
roles = claims.get("roles")
roles_count = len(roles) if isinstance(roles, list) else 0
return {
"aud": claims.get("aud"),
"tid": claims.get("tid"),
"appid": claims.get("appid") or claims.get("azp"),
"iss": claims.get("iss"),
"roles_count": roles_count,
}
except Exception:
return {}
def _format_graph_error(resp: requests.Response) -> str:
"""Best-effort formatter for Microsoft Graph error responses."""
request_id = resp.headers.get("request-id") or resp.headers.get("client-request-id")
date = resp.headers.get("Date")
www_auth = resp.headers.get("WWW-Authenticate")
content_type = (resp.headers.get("Content-Type") or "").lower()
details: Dict[str, Any] = {}
if "application/json" in content_type:
try:
details = resp.json() or {}
except Exception:
details = {}
# Graph commonly returns: {"error": {"code": "...", "message": "...", "innerError": {...}}}
error_obj = details.get("error") if isinstance(details, dict) else None
if isinstance(error_obj, dict):
code = error_obj.get("code")
message = error_obj.get("message")
inner = error_obj.get("innerError")
inner_request_id = inner.get("request-id") if isinstance(inner, dict) else None
inner_date = inner.get("date") if isinstance(inner, dict) else None
rid = inner_request_id or request_id
when = inner_date or date
parts = [p for p in [code, message] if p]
base = " - ".join(parts) if parts else (resp.text or "<no body>")
meta = []
if rid:
meta.append(f"request-id={rid}")
if when:
meta.append(f"date={when}")
if www_auth:
meta.append(f"www-authenticate={www_auth}")
if meta:
return f"{base} ({', '.join(meta)})"
return base
# Fallback for non-standard/HTML/text responses
meta = []
if request_id:
meta.append(f"request-id={request_id}")
if date:
meta.append(f"date={date}")
if www_auth:
meta.append(f"www-authenticate={www_auth}")
suffix = f" ({', '.join(meta)})" if meta else ""
body = resp.text.strip() if resp.text else "<no body>"
return f"{body}{suffix}"
@dataclass(frozen=True)
class GraphAppCredentials:
tenant_id: str
client_id: str
client_secret: str
@dataclass(frozen=True)
class SharePointTarget:
site_id: str | None
drive_id: str
folder_path: str
class GraphClient:
def __init__(self, creds: GraphAppCredentials, session: Optional[requests.Session] = None) -> None:
self._creds = creds
self._session = session or requests.Session()
self._token: Optional[str] = None
self._token_expires_at: float = 0.0
self._token_claims_summary: Dict[str, Any] = {}
def _get_token(self) -> str:
now = time.time()
if self._token and now < (self._token_expires_at - 60):
return self._token
url = f"https://login.microsoftonline.com/{self._creds.tenant_id}/oauth2/v2.0/token"
data = {
"client_id": self._creds.client_id,
"client_secret": self._creds.client_secret,
"grant_type": "client_credentials",
"scope": GRAPH_SCOPE,
}
resp = self._session.post(url, data=data, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Failed to acquire Graph token: {resp.status_code} {_format_graph_error(resp)}")
payload = resp.json()
access_token = payload.get("access_token")
expires_in = float(payload.get("expires_in", 3600))
if not access_token:
raise RuntimeError("Graph token response missing access_token")
self._token = access_token
self._token_expires_at = now + expires_in
if GRAPH_DEBUG_AUTH:
self._token_claims_summary = _jwt_claims_summary(access_token)
# For client-credentials tokens, Graph authorization is conveyed via the `roles` claim.
# If no roles are present, the app likely has no *Application* permissions configured
# (or admin consent hasn't been granted), and Graph will respond 401/403.
if self._token_claims_summary.get("roles_count") == 0:
raise RuntimeError(
"Graph access token has 0 application roles. "
"Add Microsoft Graph *Application* permissions to the app registration "
"(e.g., Sites.Read.All for download; Sites.ReadWrite.All for upload) and grant admin consent. "
f"token_claims={self._token_claims_summary}"
)
return access_token
def token_claims_summary(self) -> Dict[str, Any]:
return dict(self._token_claims_summary) if self._token_claims_summary else {}
def request(self, method: str, url: str, **kwargs: Any) -> requests.Response:
headers_in = kwargs.pop("headers", {})
def _do_request() -> requests.Response:
token = self._get_token()
headers = {**headers_in, "Authorization": f"Bearer {token}"}
return self._session.request(method, url, headers=headers, **kwargs)
resp = _do_request()
# If the cached token is stale/revoked, refresh once and retry.
if resp.status_code == 401:
self._token = None
self._token_expires_at = 0.0
resp = _do_request()
return resp
def _normalize_folder_path(folder_path: str) -> str:
fp = (folder_path or "").strip().strip("/")
return fp
def _join_item_path(folder_path: str, name_or_path: str) -> str:
folder = _normalize_folder_path(folder_path)
p = (name_or_path or "").strip().lstrip("/")
if not p:
raise RuntimeError("Empty SharePoint file path")
# If caller passed a path (contains /), treat as drive-root relative.
if "/" in p:
return p
# Otherwise, treat as file name inside configured folder.
if folder:
return f"{folder}/{p}"
return p
def download_bytes(
graph: GraphClient,
target: SharePointTarget,
name_or_path: str,
) -> bytes:
"""Download a file from SharePoint drive by path.
If name_or_path includes '/', it is treated as a path relative to the drive root.
Otherwise it is treated as a file name inside target.folder_path.
"""
item_path = _join_item_path(target.folder_path, name_or_path)
encoded = quote(item_path, safe="/")
meta_url = f"{GRAPH_BASE}/drives/{target.drive_id}/root:/{encoded}"
# Prefer @microsoft.graph.downloadUrl from driveItem metadata.
# This URL is pre-authenticated and avoids redirect/auth header edge cases.
meta = graph.request("GET", meta_url, timeout=60)
if meta.status_code >= 400:
extra = ""
if GRAPH_DEBUG_AUTH and meta.status_code in (401, 403):
extra = f" token_claims={graph.token_claims_summary()}"
raise RuntimeError(f"Download meta failed ({meta_url}): {meta.status_code} {_format_graph_error(meta)}{extra}")
try:
meta_json = meta.json() or {}
except Exception:
meta_json = {}
download_url = meta_json.get("@microsoft.graph.downloadUrl") if isinstance(meta_json, dict) else None
if isinstance(download_url, str) and download_url.strip():
dl = requests.get(download_url, timeout=60)
if dl.status_code >= 400:
raise RuntimeError(f"Download failed (downloadUrl): {dl.status_code} {_format_graph_error(dl)}")
return dl.content
# Fallback to /content endpoint with manual redirects.
url = f"{GRAPH_BASE}/drives/{target.drive_id}/root:/{encoded}:/content"
max_redirects = 5
current_url = url
resp: requests.Response
for _ in range(max_redirects + 1):
resp = graph.request("GET", current_url, timeout=60, allow_redirects=False)
if resp.status_code in (301, 302, 303, 307, 308):
loc = resp.headers.get("Location")
if not loc:
break
current_url = loc
continue
break
if resp.status_code >= 400:
extra = ""
if GRAPH_DEBUG_AUTH and resp.status_code in (401, 403):
extra = f" token_claims={graph.token_claims_summary()}"
raise RuntimeError(f"Download failed ({url}): {resp.status_code} {_format_graph_error(resp)}{extra}")
return resp.content
def upload_bytes_replace(
graph: GraphClient,
target: SharePointTarget,
file_name: str,
content: bytes,
content_type: str = "text/csv",
) -> Dict[str, Any]:
"""Upload file into SharePoint folder, replacing if exists.
Uses an upload session with conflictBehavior=replace (works for both small and large files).
"""
folder_path = _normalize_folder_path(target.folder_path)
if folder_path:
item_path = f"{folder_path}/{file_name}"
else:
item_path = file_name
encoded_item_path = quote(item_path, safe="/")
create_session_url = f"{GRAPH_BASE}/drives/{target.drive_id}/root:/{encoded_item_path}:/createUploadSession"
body = {
"item": {
"@microsoft.graph.conflictBehavior": "replace",
"name": file_name,
}
}
resp = graph.request(
"POST",
create_session_url,
json=body,
timeout=30,
)
if resp.status_code >= 400:
raise RuntimeError(f"Create upload session failed ({create_session_url}): {resp.status_code} {_format_graph_error(resp)}")
upload_url = resp.json().get("uploadUrl")
if not upload_url:
raise RuntimeError("Upload session response missing uploadUrl")
# Upload in chunks (recommended by Graph); 320 KiB aligns with Graph examples.
chunk_size = 320 * 1024
total = len(content)
start = 0
while start < total:
end = min(start + chunk_size, total) - 1
chunk = content[start : end + 1]
headers = {
"Content-Length": str(len(chunk)),
"Content-Range": f"bytes {start}-{end}/{total}",
"Content-Type": content_type,
}
up = graph.request(
"PUT",
upload_url,
data=chunk,
headers=headers,
timeout=120,
)
# 202 = accepted (more chunks), 201/200 = completed
if up.status_code in (200, 201):
return up.json()
if up.status_code == 202:
start = end + 1
continue
raise RuntimeError(f"Upload failed: {up.status_code} {_format_graph_error(up)}")
raise RuntimeError("Upload did not complete")
def load_sharepoint_target_from_env() -> SharePointTarget:
site_id = os.environ.get("SP_SITE_ID", "").strip()
drive_id = os.environ.get("SP_DRIVE_ID", "").strip()
folder_path = os.environ.get("SP_FOLDER_PATH", "").strip()
if not drive_id:
raise RuntimeError("Missing SP_DRIVE_ID app setting")
return SharePointTarget(site_id=site_id or None, drive_id=drive_id, folder_path=folder_path)
def load_graph_creds_from_env() -> GraphAppCredentials:
tenant_id = os.environ.get("SP_TENANT_ID", "").strip()
client_id = os.environ.get("SP_CLIENT_ID", "").strip()
client_secret = os.environ.get("SP_CLIENT_SECRET", "").strip()
if not tenant_id:
raise RuntimeError("Missing SP_TENANT_ID app setting")
if not client_id:
raise RuntimeError("Missing SP_CLIENT_ID app setting")
if not client_secret:
raise RuntimeError("Missing SP_CLIENT_SECRET app setting")
return GraphAppCredentials(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret)
Basically, for uploading data to SharePoint, we are using the GraphQL API to upload the data there.
App User
For sure, we need to add permission to the App User for these specific tasks. Go to Azure Portal > App Registration > API Permissions. Here are all the permissions given for this specific (probably, you can set more stricter permission):

Set API Permissions
Deploying the Code and setting up the App Settings
For deploying the code, I installed the Azure Functions extensions in my VS Code > You just need to right-click on the empty part on the Explorer tab > and click Deploy to Function App:

Deploying to Azure Function App
There, it will guide you to log in to the environment > you can select/create a new function app. If you click New function app, it will ask for the name, select subscription, region, etc. Once the system has created all the necessary components, it will automatically deploy the function app, which you can verify in the Azure Portal (portal.azure.com).
Here is the Overview of the Function App:

Trading Scrape overview page in Azure Function App
As you can see, it detected that there is one function named “download_to_sharepoint” in the bottom part.
Last, I need to set several App settings (some of them automatically generated after we deployed):

App Settings
SharePoint Settings
For the SharePoint setting, I created a new folder named data (this will be given in App Settings.SP_FOLDER_PATH) and provided the tickers.txt in the format below:

Tickers.txt
Basically, those are all the Indonesian stocks that I wanted to monitor.
To execute the Azure Functions, you can go to the Azure Portal > select your Function App > click the “download_to_sharepoint” App, and you can see the “Get function URL” button:

Get function URL button
If you click that button, it will open a side pane, and you can copy the default (Function key). It provided you with the URL to execute the Azure Function for that specific logic.
In my SharePoint folder, here is the result after I executed the URL:

Result
Because I want to ensure the data is up to date daily, I created a Power Automate Flow to execute the Azure Function using a time trigger:

Power Automate Flow to call the scrape trigger
And, at last, in my Agent, I can add the Knowledge and pointed my SharePoint Folder:

Add my SharePoint as the Knowledge
At last, I can use the data for my AI Agent to consume!
Happy learning!

