レシピ・実装例¶
anchor.fm REST API と GraphQL API を使った Python 実装の実践的なフロー。
エピソード更新(タイトル・概要欄・公開日時)¶
最も基本的なユースケース。既存エピソードを更新する標準フロー。
import requests
from spotifyconnector import SpotifyConnector
# 事前に .env から sp_dc, sp_key, show_id を読み込む想定
# (auth.md の .env テンプレートを参照)
# 1. Bearer トークン取得
connector = SpotifyConnector(
base_url="https://generic.wg.spotify.com/podcasters/v0",
client_id="05a1371ee5194c27860b3ff3ff3979d2",
podcast_id=show_id,
sp_dc=sp_dc,
sp_key=sp_key,
)
connector._authenticate()
bearer = connector._bearer
headers_get = {"Authorization": f"Bearer {bearer}", "Accept": "application/json"}
headers_post = {
**headers_get,
"Content-Type": "application/json",
"Origin": "https://creators.spotify.com",
"Referer": "https://creators.spotify.com/",
}
# 2. Spotify Episode ID → Anchor Numeric ID
r = requests.get(
f"https://api-v5.anchor.fm/v3/episodes/spotify:episode:{spotify_ep_id}/episodeId?isMumsCompatible=true",
headers=headers_get,
)
anchor_id = r.json()["episodeId"]
# 3. エピソード現状取得(scheduled/published/draft の判定に必要)
r = requests.get(
f"https://api-v5.anchor.fm/v3/episodes/{anchor_id}/overview?isMumsCompatible=true&returnWebIds=true",
headers=headers_get,
)
overview = r.json()
# 4. エピソード更新
payload = {
"userId": overview["userId"],
"title": "新しいタイトル",
"description": "<p>新しい概要欄</p>",
"episodeType": overview.get("podcastEpisodeType", "full"),
"isPublished": overview.get("isPublished", False),
"podcastEpisodeIsExplicit": overview.get("podcastEpisodeIsExplicit", False),
"publishOn": "2026-06-01T21:00:00.000Z", # UTC で指定
"wizardDraftedToPublishOn": "2026-06-01T21:00:00.000Z",
}
r = requests.post(
f"https://api-v5.anchor.fm/v3/episodes/{anchor_id}/update?isMumsCompatible=true",
json=payload,
headers=headers_post,
)
print("成功" if r.status_code == 200 else f"失敗: {r.status_code}")
# 5. 予約状態の確認
r = requests.get(
f"https://api-v5.anchor.fm/v3/episodes/{anchor_id}/overview?isMumsCompatible=true",
headers=headers_get,
)
ep = r.json()
if not ep["isPublished"] and ep.get("publishOn"):
print(f"予約済み: {ep['publishOn']}")
JST → UTC 変換ユーティリティ¶
publishOn は UTC で指定する必要がある。JST で考えて UTC に変換する場合。
from datetime import datetime, timezone, timedelta
JST = timezone(timedelta(hours=9))
def jst_to_utc_str(year, month, day, hour, minute=0, second=0):
"""JST の日時を UTC ISO8601 文字列に変換する"""
publish_jst = datetime(year, month, day, hour, minute, second, tzinfo=JST)
return publish_jst.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
# JST 06:00 → UTC 前日 21:00
utc_str = jst_to_utc_str(2026, 6, 1, 6, 0, 0)
# → "2026-05-31T21:00:00.000Z"
エピソード一覧の取得¶
spotifyconnector のネイティブ機能を使う。
connector = SpotifyConnector(
base_url="https://generic.wg.spotify.com/podcasters/v0",
client_id="05a1371ee5194c27860b3ff3ff3979d2",
podcast_id=show_id,
sp_dc=sp_dc,
sp_key=sp_key,
)
connector._authenticate()
episodes = connector.episodes() # ページネーション対応
for ep in episodes:
print(ep["id"], ep["title"]) # id が Spotify Episode ID
Spotify Show ID → Anchor numeric IDs 変換¶
ショーレベルの操作に必要な stationId を取得する。
r = requests.get(
f"https://api-v5.anchor.fm/v3/shows/{spotify_show_id}/legacyIds?isMumsCompatible=true",
headers=headers_get,
)
ids = r.json()
station_id = ids["stationId"] # エピソード一覧・メタデータ等で使う
user_id = ids["userId"] # エピソード作成等で使う
音声ファイルアップロード(完全フロー)¶
import requests
import os
# 事前準備:bearer, anchor_id, station_id が必要
filename = "episode.mp3"
mime_type = "audio/mpeg"
file_path = "/path/to/episode.mp3"
# Step 1: 署名付きURL取得
r = requests.get(
f"https://api-v5.anchor.fm/v3/episodes/{anchor_id}/upload/signedUrl"
f"?filename={filename}&type={mime_type}&isMumsCompatible=true",
headers={"Authorization": f"Bearer {bearer}", "Accept": "application/json"},
)
signed = r.json()
upload_id = signed["uploadId"]
signed_url = signed["url"]
# Step 2: GCS への直接 PUT
with open(file_path, "rb") as f:
audio_data = f.read()
r = requests.put(
signed_url,
data=audio_data,
headers={"Content-Type": mime_type},
)
r.raise_for_status()
etag = r.headers["ETag"].strip('"') # ダブルクォートを除去
# Step 3: process_upload
payload = {
"userId": user_id,
"uploadType": "default", # 音声ファイルは "default"("audio" は誤り)
"origin": "episode-media:upload",
"caption": filename,
"isExtractedFromVideo": False,
"isMultipartUpload": True,
"parts": [{"partNumber": 1, "etag": etag}],
"uploadId": upload_id,
"episodeId": anchor_id,
"stationId": station_id,
}
r = requests.post(
f"https://api-v5.anchor.fm/v3/upload/{upload_id}/process_upload?isMumsCompatible=true",
json=payload,
headers={
"Authorization": f"Bearer {bearer}",
"Content-Type": "application/json",
"Origin": "https://creators.spotify.com",
"Referer": "https://creators.spotify.com/",
},
)
r.raise_for_status()
# Step 4: 処理完了待ち(ポーリング)
import time
for _ in range(30):
r = requests.get(
f"https://api-v5.anchor.fm/v3/upload/media/{upload_id}"
f"?includeMediaValidation=true&isMumsCompatible=true",
headers={"Authorization": f"Bearer {bearer}"},
)
status = r.json().get("status")
print(f"status: {status}")
if status == "completed":
print("アップロード完了")
break
elif status == "failed":
raise RuntimeError("アップロード失敗")
time.sleep(5)
コメント一覧取得と返信(GraphQL)¶
import requests
graphql_url = "https://creators-graph.spotify.com/v2/graph-pq"
headers = {
"Authorization": f"Bearer {bearer}",
"Content-Type": "application/json",
}
episode_uri = f"spotify:episode:{spotify_ep_id}"
# コメント一覧取得
query_payload = {
"operationName": "getCommentsForEpisode",
"variables": {
"episodeUri": episode_uri,
"primaryFilters": [
"LIST_COMMENT_PRIMARY_FILTER_PUBLISHED",
"LIST_COMMENT_PRIMARY_FILTER_NEEDS_REVIEW",
],
"commentTypesFilters": [
"LIST_COMMENT_TYPE_FILTER_ROOT",
],
"secondaryFilters": [],
"repliesFilter": [
"LIST_COMMENT_PRIMARY_FILTER_PUBLISHED",
],
"pageSize": 15,
},
"query": """
query getCommentsForEpisode(
$episodeUri: String!
$primaryFilters: [String!]!
$commentTypesFilters: [String!]!
$secondaryFilters: [String!]!
$repliesFilter: [String!]!
$pageSize: Int!
) {
getCommentsForEpisode(
episodeUri: $episodeUri
primaryFilters: $primaryFilters
commentTypesFilters: $commentTypesFilters
secondaryFilters: $secondaryFilters
repliesFilter: $repliesFilter
pageSize: $pageSize
) {
items {
uri
text
status
author {
username
}
}
}
}
""",
}
r = requests.post(graphql_url, json=query_payload, headers=headers)
comments = r.json()["data"]["getCommentsForEpisode"]["items"]
# 最初の審査中コメントを承認して返信する
for comment in comments:
if comment["status"] == "NEEDS_REVIEW":
comment_uri = comment["uri"]
# 承認
approve_payload = {
"operationName": "publishCommentByCommentUri",
"variables": {"commentUri": comment_uri},
"query": """
mutation publishCommentByCommentUri($commentUri: String!) {
publishCommentByCommentUri(commentUri: $commentUri)
}
""",
}
requests.post(graphql_url, json=approve_payload, headers=headers)
# 返信
reply_payload = {
"operationName": "createCommentReplyByCommentUri",
"variables": {
"parentCommentUri": comment_uri,
"replyStr": "ご視聴ありがとうございます!",
},
"query": """
mutation createCommentReplyByCommentUri(
$parentCommentUri: String!
$replyStr: String!
) {
createCommentReplyByCommentUri(
parentCommentUri: $parentCommentUri
replyStr: $replyStr
)
}
""",
}
requests.post(graphql_url, json=reply_payload, headers=headers)
print(f"承認・返信完了: {comment_uri}")
break