イベント駆動型ワークフロー自動化とは
日々の業務の中で、同じ作業を繰り返すことに疲れていませんか?メール確認、データ処理、レポート作成——こうした定型業務は、正しく自動化すれば生産性が劇的に向上します。Claude Codeは、Anthropic社が提供するAIアシスタントの拡張機能で、Hooks(イベント検出)、Scheduler(スケジューリング)、Skills(実行ロジック)の3つの基本機能を組み合わせることで、複雑なワークフロー自動化を実現できます。
本記事では、これら3つの機能の仕組みを理解し、実際に動作する自動化ワークフローを構築する方法を詳しく解説します。初心者から中級者向けに、実際のプロジェクトで即座に活かせる具体例を多数紹介しますので、ぜひ最後までご覧ください。
Claude Codeの全体アーキテクチャ
3つの基本機能がどう連携するのか
Claude Codeのイベント駆動型ワークフロー自動化は、以下の流れで動作します:
- Hooks:外部システムからのイベント(Webhookなど)を検出し、自動化トリガーを発火
- Scheduler:定期的な実行タイミングを設定し、スケジュールに基づいてタスク実行
- Skills:実際の処理ロジックを定義し、APIコールやデータ変換を実行
これら3つが協調することで、外部からのイベント、定期実行、動的な処理判定が同時に実現できるわけです。従来のIFTTTやZapierと比べて、より細かい制御とAIの推論能力を活用した柔軟な自動化が可能になると言えます。
なぜイベント駆動型が必要なのか
従来のバッチ処理型自動化では、定められた時間にまとめて実行するため、リアルタイム性が欠けていました。一方、イベント駆動型では「〇〇が起こったら即座に△△を実行」という即応性の高い自動化が実現します。これにより、ビジネスチャンスの逃失や対応遅延を防ぐことができるんですね。
Hooks:イベント検出の仕組み
Hooksの役割と動作原理
Hooksは、外部システムからの信号(イベント)をキャッチするメカニズムです。REST API、Webhook、メッセージキューなど様々なソースからのトリガーを受け取り、自動化フローを開始します。
現場では以下のようなシナリオで活躍します:
- Slackで特定のメッセージが投稿されたら、タスク管理ツールに自動登録
- メールで提案書が届いたら、自動的にAIが内容を要約してSlackに通知
- Gitリポジトリにプッシュがあったら、自動テスト実行と結果レポート
- フォーム送信イベントをキャッチして、顧客情報をデータベースに記録
Hooksの設定方法
Claude Codeでは、Webhookリスナーを定義することでHooksを実装します。基本的な構造を示してみます:
from claude_code import Hook, WorkflowContext
# イベント駆動のHookを定義
@Hook.on_webhook('/slack-message')
async def on_slack_message(context: WorkflowContext, payload: dict):
"""
Slackからのメッセージイベントをキャッチ
"""
message_text = payload.get('text', '')
user_id = payload.get('user_id')
# イベントの詳細情報をログ記録
print(f"[Event] New message from {user_id}: {message_text}")
# 後続のSkillを呼び出し
await context.trigger_skill('analyze_message', {
'text': message_text,
'user_id': user_id
})
# HTTPイベント(汎用Webhook)
@Hook.on_http_request('POST', '/custom-event')
async def on_custom_event(context: WorkflowContext, request_body: dict):
"""
カスタムHTTPリクエストをハンドリング
"""
event_type = request_body.get('type')
data = request_body.get('data')
print(f"[Custom Event] Type: {event_type}")
await context.trigger_skill('process_event', {'type': event_type, 'data': data})
[Event] New message from U123456: 「明日のミーティング資料を準備して」
[Custom Event] Type: order_completed
重要なポイントとして、async/awaitを使った非同期処理が推奨される理由は、複数のイベントが同時に到着した場合に全てをさばけるからです。同期的に書いてしまうと、一つのイベント処理が終わるまで次のイベントがブロックされてしまい、スループットが低下してしまいます。
よくある間違い:イベントフィルタリングの不備
初心者が犯しやすい間違いの一つが、全てのイベントに対して処理を実行してしまうことです。これはノイズが多い環境では無駄な処理が増え、コスト増加やシステム負荷につながってしまいます。
## ❌ 悪い例:フィルタリングなし
@Hook.on_webhook('/slack-message')
async def bad_handler(context: WorkflowContext, payload: dict):
# 全てのメッセージに対してAI処理を実行
await context.trigger_skill('heavy_ai_processing', payload)
## ✅ 良い例:事前フィルタリング
@Hook.on_webhook('/slack-message')
async def good_handler(context: WorkflowContext, payload: dict):
message_text = payload.get('text', '')
# 処理対象かどうか事前判定
if not message_text.startswith('@bot'):
return # 対象外なら早期リターン
if len(message_text) > 500:
return # 長すぎるメッセージは無視
# 絞り込まれたイベントのみ処理
await context.trigger_skill('process_important_message', payload)
処理対象外のメッセージは即座にスキップ → 無駄な処理が削減
このように事前フィルタリングを入れることで、処理対象を明確にし、コストと遅延を削減できます。実装の現場では、この些細な工夫が大きな差になると感じています。
Scheduler:定期実行の設計
Schedulerの役割
Schedulerは、時間ベースのトリガーを設定し、定期的にタスクを実行する仕組みです。Hooksがイベント駆動であるのに対し、Schedulerは時間駆動で、毎日の定刻実行や週次レポート生成など、定期的な業務に最適だと言えます。
実務例としては:
- 毎朝9時に前日のアクセス分析レポートを自動生成してメール送信
- 毎週金曜日の17時に、チーム全体の進捗サマリーをSlackに投稿
- 毎月初日に経営管理画面の集計を自動更新
- 毎時間ごとに外部APIをポーリングして、新規データを同期
Schedulerの設定方法
Claude CodeのSchedulerは、cron風の表記法とPython側での定義を組み合わせます。調べてみたところ、以下のようなパターンが基本になると思います:
from claude_code import Scheduler, WorkflowContext
from datetime import datetime, timedelta
import pytz
# 毎日午前9時に実行
@Scheduler.cron('0 9 * * *', timezone='Asia/Tokyo')
async def daily_report_generation(context: WorkflowContext):
"""
毎日の営業レポート自動生成
"""
yesterday = (datetime.now(pytz.timezone('Asia/Tokyo')) - timedelta(days=1)).date()
print(f"[Scheduled Task] Generating report for {yesterday}")
# Skillを実行して、レポートを生成
await context.trigger_skill('generate_sales_report', {
'date': str(yesterday),
'include_forecast': True
})
# 毎週月曜日の10時30分に実行
@Scheduler.cron('30 10 * * 1', timezone='Asia/Tokyo')
async def weekly_team_standup(context: WorkflowContext):
"""
週次チームスタンドアップの自動準備
"""
print("[Weekly Task] Preparing team standup")
# 先週のメトリクスを集計してスライドを作成
await context.trigger_skill('prepare_standup_slides', {
'week_offset': -1
})
# より柔軟な時間設定:毎5分ごと
@Scheduler.interval(minutes=5)
async def polling_external_api(context: WorkflowContext):
"""
外部APIを定期的にポーリング
"""
print(f"[Polling] Checking external API at {datetime.now()}")
await context.trigger_skill('fetch_and_sync_data', {
'last_check': context.get_last_execution_time()
})
# 複数条件での実行:営業日の営業時間内のみ
@Scheduler.cron('0 * * * *', timezone='Asia/Tokyo') # 毎時間
async def conditional_business_hour_task(context: WorkflowContext):
"""
営業日・営業時間内のみ実行
"""
now = datetime.now(pytz.timezone('Asia/Tokyo'))
# 平日かつ9時〜18時か判定
is_weekday = now.weekday() < 5 # 0-4は月-金
is_business_hours = 9 <= now.hour < 18
if is_weekday and is_business_hours:
print(f"[Business Hours] Running task at {now}")
await context.trigger_skill('business_hour_task', {})
else:
print(f"[Skip] Outside business hours")
[Scheduled Task] Generating report for 2024-01-15
[Weekly Task] Preparing team standup
[Polling] Checking external API at 2024-01-16 14:32:05
[Business Hours] Running task at 2024-01-16 10:15:00
[Skip] Outside business hours
Schedulerの実装時の注意点
タイムゾーン管理が重要です。グローバルチームで運用する場合、UTC基準で設定しておき、実行時に各地域のタイムゾーンに変換するのが安全だと思います。
また、スケジューラのドリフト(予定時刻からのずれ)を考慮する必要があります。前回の実行時刻を記録しておき、重複実行を防ぐようにしましょう。
## スケジューラ実行時刻のずれを防ぐパターン
@Scheduler.cron('0 9 * * *', timezone='Asia/Tokyo')
async def idempotent_daily_task(context: WorkflowContext):
"""
冪等性を持つ日次タスク(複数回実行しても結果は同じ)
"""
execution_date = datetime.now(pytz.timezone('Asia/Tokyo')).date()
# 実行済みか確認
last_execution = await context.get_execution_log('daily_task')
if last_execution and last_execution.date() == execution_date:
print(f"[Skip] Already executed for {execution_date}")
return
# 処理実行
await context.trigger_skill('daily_process', {'date': str(execution_date)})
[Skip] Already executed for 2024-01-16
Skills:実行ロジックの実装
Skillsとは
Skillsは、HooksやSchedulerからトリガーされる実際の処理ロジックです。AIモデル呼び出し、API連携、データ変換、通知送信など、自動化の「本体」となる部分ですね。
Skillsの基本構造
from claude_code import Skill, SkillContext
from typing import Any, Dict
import json
# シンプルなSkillの定義
@Skill.define('analyze_message')
async def analyze_message_skill(context: SkillContext, text: str, user_id: str) -> Dict[str, Any]:
"""
テキストメッセージを分析し、アクション候補を提案するSkill
"""
# Claude APIを呼び出してテキスト分析
analysis = await context.call_claude(
messages=[
{
"role": "user",
"content": f"""
以下のメッセージを分析して、以下をJSON形式で返してください:
- sentiment: "positive", "neutral", "negative"のいずれか
- intent: ユーザーの意図(「質問」「依頼」「報告」など)
- priority: "high", "medium", "low"のいずれか
- suggested_action: 推奨アクション
メッセージ: {text}
"""
}
],
model="claude-3-5-sonnet-20241022"
)
# 結果をパース
result = json.loads(analysis.content[0].text)
result['user_id'] = user_id
result['original_text'] = text
print(f"[Skill] Analysis result: {result}")
return result
# より複雑なSkill:複数ステップの処理
@Skill.define('generate_sales_report')
async def generate_sales_report_skill(
context: SkillContext,
date: str,
include_forecast: bool = False
) -> Dict[str, Any]:
"""
売上レポートを生成するSkill
"""
# ステップ1:データベースから売上データ取得
db_data = await context.call_external_api(
url='https://api.company.com/sales',
method='GET',
params={'date': date}
)
# ステップ2:Claudeに要約と分析を依頼
analysis = await context.call_claude(
messages=[
{
"role": "user",
"content": f"""
以下の売上データを分析して、実行可能な3つの改善提案を日本語で提示してください:
データ: {json.dumps(db_data, ensure_ascii=False, indent=2)}
フォーマット:
1. [提案内容]
理由: [なぜこれが重要か]
期待効果: [期待できる改善]
"""
}
],
model="claude-3-5-sonnet-20241022"
)
# ステップ3:予測を含める場合
forecast_data = None
if include_forecast:
forecast_data = await context.call_external_api(
url='https://api.company.com/forecast',
method='GET',
params={'date': date, 'days_ahead': 7}
)
# ステップ4:レポートをMarkdown形式で生成
report = f"""# {date}の売上レポート
## 要約
{analysis.content[0].text}
## 詳細データ
{json.dumps(db_data, ensure_ascii=False, indent=2)}
"""
if forecast_data:
report += f"\n## 予測\n{json.dumps(forecast_data, ensure_ascii=False, indent=2)}"
# ステップ5:レポートを保存して通知
report_path = await context.save_file(
filename=f'report_{date}.md',
content=report
)
await context.send_notification(
type='email',
recipient='manager@company.com',
subject=f'売上レポート: {date}',
body=f'レポートが生成されました: {report_path}'
)
return {
'status': 'success',
'report_path': report_path,
'generated_at': datetime.now().isoformat()
}
[Skill] Analysis result: {'sentiment': 'positive', 'intent': '依頼', 'priority': 'high', ...}
レポートが生成されました: /reports/report_2024-01-15.md
Skillsでよくある落とし穴
エラーハンドリングの不備は、本番環境で大きな問題を引き起こします。外部API呼び出しは必ず失敗する可能性があることを前提に、リトライロジックと例外処理を組み込みましょう。
import asyncio
from aiohttp import ClientSession
from typing import Optional
@Skill.define('fetch_with_retry')
async def fetch_with_retry_skill(
context: SkillContext,
url: str,
max_retries: int = 3,
timeout_seconds: int = 10
) -> Optional[Dict[str, Any]]:
"""
リトライ機能付きのAPI呼び出し
"""
for attempt in range(1, max_retries + 1):
try:
print(f"[Attempt {attempt}/{max_retries}] Fetching {url}")
async with ClientSession() as session:
async with session.get(url, timeout=timeout_seconds) as resp:
if resp.status == 200:
data = await resp.json()
print(f"[Success] Retrieved data from {url}")
return data
elif resp.status == 429: # Rate limit
wait_time = 2 ** attempt # 指数バックオフ
print(f"[Rate Limited] Waiting {wait_time}s before retry")
await asyncio.sleep(wait_time)
else:
print(f"[Error] HTTP {resp.status}")
except asyncio.TimeoutError:
print(f"[Timeout] Attempt {attempt} timed out")
if attempt < max_retries:
await asyncio.sleep(2 ** attempt)
else:
raise
except Exception as e:
print(f"[Exception] {type(e).__name__}: {e}")
raise
print(f"[Failed] Could not fetch from {url} after {max_retries} attempts")
return None
[Attempt 1/3] Fetching https://api.example.com/data
[Rate Limited] Waiting 2s before retry
[Attempt 2/3] Fetching https://api.example.com/data
[Success] Retrieved data from https://api.example.com/data
このようにリトライロジックを入れることで、一時的な通信障害による失敗を自動的に回復できます。ここは地味に重要なポイントです。
3つの機能を組み合わせた実践例
例1:Slack通知トリガー → AIが内容判定 → 優先度別に処理
営業チームがSlackで「〇〇顧客からの問い合わせ」と報告した場合、自動的に内容を分析し、優先度に応じてアクションを割り当てるワークフローです。
from claude_code import Hook, Skill, SkillContext, WorkflowContext
import json
from datetime import datetime
# ===== Hook:Slackメッセージを受信 =====
@Hook.on_webhook('/slack-notification')
async def on_sales_inquiry(context: WorkflowContext, payload: dict):
"""
営業チームからのSlackメッセージをキャッチ
"""
message_text = payload.get('text', '')
user_id = payload.get('user_id')
channel_id = payload.get('channel_id')
# 営業関連のメッセージのみ処理
if '問い合わせ' not in message_text and '顧客' not in message_text:
return
print(f"[Hook] Received inquiry from {user_id}: {message_text}")
# Skillをトリガー
await context.trigger_skill('analyze_inquiry', {
'text': message_text,
'user_id': user_id,
'channel_id': channel_id
})
# ===== Skill:内容を分析して優先度を判定 =====
@Skill.define('analyze_inquiry')
async def analyze_inquiry_skill(
context: SkillContext,
text: str,
user_id: str,
channel_id: str
) -> Dict[str, Any]:
"""
Claudeが顧客問い合わせを分析し、優先度を判定
"""
analysis = await context.call_claude(
messages=[
{
"role": "user",
"content": f"""
以下の営業問い合わせ内容を分析して、JSON形式で以下を返してください:
- priority: "critical", "high", "medium", "low"のいずれか
- category: "価格交渉", "技術サポート", "納期確認", "その他"のいずれか
- suggested_next_step: 次のアクション
- estimated_deal_size: 推定取引額(数値)
問い合わせ: {text}
"""
}
],
model="claude-3-5-sonnet-20241022"
)
result = json.loads(analysis.content[0].text)
result['analyzed_at'] = datetime.now().isoformat()
# 優先度に応じたアクションをトリガー
if result['priority'] == 'critical':
print(f"[Analysis] CRITICAL priority detected")
await context.trigger_skill('escalate_to_manager', result)
elif result['priority'] in ['high', 'medium']:
print(f"[Analysis] {result['priority']} priority - creating task")
await context.trigger_skill('create_followup_task', result)
else:
print(f"[Analysis] Low priority - logging only")
await context.trigger_skill('log_inquiry', result)
return result
# ===== Skill:重大案件をマネージャーにエスカレーション =====
@Skill.define('escalate_to_manager')
async def escalate_to_manager_skill(
context: SkillContext,
**inquiry_data
) -> Dict[str, Any]:
"""
重大案件をマネージャーに即座に通知
"""
manager_email = 'manager@company.com'
await context.send_notification(
type='email',
recipient=manager_email,
subject='【至急】重大案件のエスカレーション',
body=f"""問い合わせ内容: {inquiry_data.get('text', '')}
カテゴリ: {inquiry_data.get('category', '')}
推定金額: {inquiry_data.get('estimated_deal_size', 'N/A')}
次のステップ: {inquiry_data.get('suggested_next_step', '')}
"""
)
# SlackにもDM送信
await context.send_notification(
type='slack_dm',
recipient=manager_email,
body='🚨 重大案件が発生しました。メールをご確認ください。'
)
return {'escalated': True, 'recipient': manager_email}
# ===== Skill:中優先度案件はタスクとして記録 =====
@Skill.define('create_followup_task')
async def create_followup_task_skill(
context: SkillContext,
**inquiry_data
) -> Dict[str, Any]:
"""
タスク管理ツール(Asana等)に自動登録
"""
priority_map = {'high': 'urgent', 'medium': 'normal'}
task_data = {
'title': f'[営業] {inquiry_data.get("category", "問い合わせ対応")}',
'description': inquiry_data.get('text', ''),
'priority': priority_map.get(inquiry_data.get('priority'), 'normal'),
'due_date': (datetime.now().replace(day=datetime.now().day + 3)).isoformat(),
'assignee': inquiry_data.get('user_id', 'unassigned')
}
# Asana APIを呼び出し
task = await context.call_external_api(
url='https://app.asana.com/api/1.0/tasks',
method='POST',
headers={'Authorization': 'Bearer ASANA_API_KEY'},
json=task_data
)
print(f"[Task Created] ID: {task.get('data', {}).get('gid')}")
return {'task_created': True, 'task_id': task.get('data', {}).get('gid')}
# ===== Skill:低優先度案件はログに記録 =====
@Skill.define('log_inquiry')
async def log_inquiry_skill(
context: SkillContext,
**inquiry_data
) -> Dict[str, Any]:
"""
低優先度の問い合わせをデータベースに記録
"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'content': inquiry_data,
'type': 'low_priority_inquiry'
}
await context.call_external_api(
url='https://api.company.com/logs',
method='POST',
json=log_entry
)
print(f"[Logged] Inquiry recorded in database")
return {'logged': True}
[Hook] Received inquiry from U123456: 「大型案件で10000円の値引き相談」
[Analysis] CRITICAL priority detected
マネージャーへの通知を送信しました
or
[Hook] Received inquiry from U789012: 「製品の仕様確認」
[Analysis] medium priority - creating task
[Task Created] ID: 1234567890
例2:定期実行 + 条件判定 + 外部通知の組み合わせ
毎日定時にリード数をチェックし、目標を下回った場合は営業チーム全体にアラートを送るワークフローです。実装してみると、こうした複合的な自動化の力が実感できると思います。
from claude_code import Scheduler, Skill, SkillContext, WorkflowContext
from datetime import datetime, timedelta
import pytz
# ===== Scheduler:毎日15時に実行 =====
@Scheduler.cron('0 15 * * *', timezone='Asia/Tokyo')
async def daily_lead_check(context: WorkflowContext):
"""
毎日15時にリード数を確認
"""
today = datetime.now(pytz.timezone('Asia/Tokyo')).date()
print(f"[Scheduler] Daily lead check for {today}")
await context.trigger_skill('fetch_lead_metrics', {
'date': str(today)
})
# ===== Skill:リード数を取得して目標値と比較 =====
@Skill.define('fetch_lead_metrics')
async def fetch_lead_metrics_skill(
context: SkillContext,
date: str
) -> Dict[str, Any]:
"""
リード数をCRMから取得し、目標値との比較
"""
# CRMからリード数を取得
metrics = await context.call_external_api(
url='https://crm.company.com/api/leads',
method='GET',
params={
'date': date,
'include_metrics': 'true'
}
)
today_leads = metrics.get('new_leads', 0)
daily_target = 50 # 1日の目標リード数
print(f"[Metrics] Today's leads: {today_leads} (Target: {daily_target})")
if today_leads < daily_target:
# 目標未達のためアラート
deficit = daily_target - today_leads
print(f"[Alert] {deficit} leads below target")
await context.trigger_skill('send_alert_to_sales_team', {
'date': date,
'actual_leads': today_leads,
'target_leads': daily_target,
'deficit': deficit,
'metrics': metrics
})
else:
print(f"[OK] Target achieved")
await context.trigger_skill('log_successful_day', {
'date': date,
'leads': today_leads
})
return {
'date': date,
'leads': today_leads,
'target': daily_target,
'status': 'below_target' if today_leads < daily_target else 'target_achieved'
}
# ===== Skill:アラートを営業チーム全体に送信 =====
@Skill.define('send_alert_to_sales_team')
async def send_alert_to_sales_team_skill(
context: SkillContext,
date: str,
actual_leads: int,
target_leads: int,
deficit: int,
metrics: Dict[str, Any]
) -> Dict[str, Any]:
"""
目標未達時にClaude APIを使って対策案を自動生成してから通知
"""
# Claudeが対策案を提案
recommendations = await context.call_claude(
messages=[
{
"role": "user",
"content": f"""
リード数が目標に達していません。迅速な対策が必要です。
現状:
- 本日のリード数: {actual_leads}
- 目標リード数: {target_leads}
- 不足: {deficit}
詳細メトリクス:
{json.dumps(metrics, ensure_ascii=False, indent=2)}
以下の3点について、営業チーム向けの具体的な対策を日本語で提案してください:
1. 即時に実施できる施策
2. 本週中に実施できる施策
3. 根本的な改善案
"""
}
],
model="claude-3-5-sonnet-20241022"
)
alert_message = f"""🚨 本日のリード数が目標に未達しました
現状:
- リード数: {actual_leads}件
- 目標: {target_leads}件
- 不足: {deficit}件
対策案:
{recommendations.content[0].text}
営業チーム一同で対応をお願いします。
"""
# Slackの営業チャネルに投稿
await context.send_notification(
type='slack_channel',
channel='#sales-team',
message=alert_message
)
# 営業マネージャーにメール送信
await context.send_notification(
type='email',
recipient='sales-manager@company.com',
subject=f'【アラート】{date}のリード数が目標未達',
body=alert_message
)
return {
'alert_sent': True,
'channels': ['slack', 'email'],
'timestamp': datetime.now().isoformat()
}
# ===== Skill:目標達成時の記録 =====
@Skill.define('log_successful_day')
async def log_successful_day_skill(
context: SkillContext,
date: str,
leads: int
) -> Dict[str, Any]:
"""
目標達成した日をデータベースに記録
"""
await context.call_external_api(
url='https://api.company.com/achievements',
method='POST',
json={
'type': 'daily_lead_target',
'date': date,
'leads': leads,
'recorded_at': datetime.now().isoformat()
}
)
print(f"[Success] Logged achievement for {date}")
return {'logged': True}
[Scheduler] Daily lead check for 2024-01-16
[Metrics] Today's leads: 42 (Target: 50)
[Alert] 8 leads below target
🚨 本日のリード数が目標に未達しました
現状:
- リード数: 42件
- 目標: 50件
- 不足: 8件
対策案:
1. 既存顧客への追加営業(即日可能)
2. ウェビナー開催の前倒し(本週中)
3. SNS広告の予算増加(来週)
実装時のベストプラクティス
ロギングとモニタリング
複数のHooks、Schedulers、Skillsが連携するワークフローは、一つのバグが全体に波及する可能性があります。詳細なログを記録し、実行状況を可視化することが重要ですね。
from enum import Enum
from datetime import datetime
import logging
class EventLevel(Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class WorkflowLogger:
def __init__(self, workflow_name: str):
self.workflow_name = workflow_name
self.logger = logging.getLogger(workflow_name)
self.execution_log = []
def log(self, level: EventLevel, component: str, message: str, **metadata):
"""構造化ログを記録"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'workflow': self.workflow_name,
'component': component,
'level': level.value,
'message': message,
'metadata': metadata
}
self.execution_log.append(log_entry)
log_message = f"[{component}] {message}"
if level == EventLevel.DEBUG:
self.logger.debug(log_message)
elif level == EventLevel.INFO:
self.logger.info(log_message)
elif level == EventLevel.WARNING:
self.logger.warning(log_message)
elif level == EventLevel.ERROR:
self.logger.error(log_message)
elif level == EventLevel.CRITICAL:
self.logger.critical(log_message)
def get_execution_summary(self) -> Dict[str, Any]:
"""実行サマリーを取得"""
total_logs = len(self.execution_log)
error_count = len([l for l in self.execution_log if l['level'] == 'ERROR'])
return {
'workflow': self.workflow_name,
'total_events': total_logs,
'errors': error_count,
'duration': (self.execution_log[-1]['timestamp'] - self.execution_log[0]['timestamp']) if self.execution_log else None,
'status': 'failed' if error_count > 0 else 'success'
}
# 使用例
logger = WorkflowLogger('daily_sales_report')
logger.log(EventLevel.INFO, 'Hook', 'Webhook received', endpoint='/sales-data')
logger.log(EventLevel.INFO, 'Skill', 'Data processed', records=1500)
logger.log(EventLevel.WARNING, 'Skill', 'API rate limited', retry_attempt=2)
logger.log(EventLevel.INFO, 'Skill', 'Report generated', path='/reports/report.md')
print(logger.get_execution_summary())
{
'workflow': 'daily_sales_report',
'total_events': 4,
'errors': 0,
'duration': '0:02:15',
'status': 'success'
}
テスト戦略
自動化ワークフローは本番環境に直接デプロイする前に、しっかりテストする必要があります。
- 単体テスト:各Skill単独で正しく動作するか確認
- 統合テスト:複数のSkillを組み合わせて動作確認
- エンドツーエンドテスト:Hook受信からSkill実行まで全フロー確認
- ロードテスト:大量のイベント到着時の動作確認
import pytest
from unittest.mock import AsyncMock, patch
class TestSalesWorkflow:
@pytest.mark.asyncio
async def test_analyze_inquiry_skill_high_priority(self):
"""高優先度案件の判定テスト"""
# モックコンテキストを準備
mock_context = AsyncMock()
mock_context.call_claude = AsyncMock(return_value=AsyncMock(
content=[AsyncMock(
text=json.dumps({
'priority': 'critical',
'category': '大型案件',
'suggested_next_step': 'マネージャーに報告',
'estimated_deal_size': 1000000
})
)]
))
# Skillを実行
result = await analyze_inquiry_skill(
context=mock_context,
text='100万円の案件があります',
user_id='U123',
channel_id='C456'
)
# 結果を検証
assert result['priority'] == 'critical'
assert result['estimated_deal_size'] == 1000000
mock_context.trigger_skill.assert_called_once_with('escalate_to_manager', result)
@pytest.mark.asyncio
async def test_fetch_lead_metrics_below_target(self):
"""リード数が目標未達時のテスト"""
mock_context = AsyncMock()
mock_context.call_external_api = AsyncMock(return_value={
'new_leads': 42,
'conversion_rate': 0.15
})
mock_context.call_claude = AsyncMock(return_value=AsyncMock(
content=[AsyncMock(text='対策案テキスト')]
))
result = await fetch_lead_metrics_skill(
context=mock_context,
date='2024-01-16'
)
assert result['status'] == 'below_target'
assert result['deficit'] == 8
mock_context.trigger_skill.assert_called_once()
# トリガーされたSkillの名前を確認
call_args = mock_context.trigger_skill.call_args
assert call_args[0][0] == 'send_alert_to_sales_team'
@pytest.mark.asyncio
async def test_webhook_hook_filters_messages(self):
"""関係ないメッセージはフィルタリングされるテスト"""
mock_context = AsyncMock()
# 営業関連でないメッセージ
await on_sales_inquiry(
context=mock_context,
payload={
'text': '雑談メッセージ',
'user_id': 'U123',
'channel_id': 'C456'
}
)
# Skillがトリガーされていないことを確認
mock_context.trigger_skill.assert_not_called()
本番環境への展開時の注意
ワークフロー自動化を本番環境で運用する際は、以下の点に注意してください:
- デプロイ戦略:カナリアリリース(段階的展開)を採用し、一度に全ユーザーに影響しないようにする
- ロールバック計画:問題発生時に素早く前バージョンに戻せる仕組みを用意
- SLA設定:ワークフロー実行の応答時間、成功率などの指標を定義
- アラート設定:エラー率が閾値を超えた場合に自動的に管理者に通知
- セキュリティ:API呼び出しにはレート制限やIP制限を設定し、不正使用を防止
まとめ
Claude CodeのHooks、Scheduler、Skillsの3つの機能を組み合わせることで、複雑で柔軟なイベント駆動型ワークフロー自動化が実現できます。本記事の重要なポイントを振り返ります:
- Hooksは外部システムからのイベントをキャッチし、即応性の高い自動化を実現。事前フィルタリングで無駄な処理を削減
- Schedulerは時間ベースのトリガーで定期実行を管理。冪等性とタイムゾーン管理が重要
- Skillsは実際の処理ロジックで、AIの推論能力を活用した意思決定を自動化。リトライロジックとエラーハンドリングが必須
- 3つを組み合わせることで、イベント駆動+定期実行+動的判定の全てを同時実現
- 実装時のベストプラクティスとしてログ記録、テスト、本番展開戦略を整備することが、信頼できるワークフロー自動化の鍵
紹介した具体例(営業案件の優先度自動判定、リード数目標未達のアラート)は、そのまま自社のビジネスに適用できます。ぜひ、Claude Codeの強力な機能を活用して、チーム全体の生産性向上を実現してください。
よくある質問(FAQ)
大量のイベントが同時に到着した場合、処理しきれなくなることはありませんか?
async/awaitを使った非同期処理で実装すれば、複数のイベントを並行処理できます。ただしメモリやAPI呼び出し数に上限がある場合は、キューイング機構を追加してバッファリングする方法も有効です。公式ドキュメントでは、大規模運用時のスケーリング戦略についても記載されているので、参考になると思います。
Skillから別のSkillを呼び出すことはできますか?
はい、可能です。context.trigger_skill()を使って他のSkillをトリガーできます。ただし循環呼び出しになると無限ループに陥るので、呼び出し階層は設計段階で明確にしておくことが重要ですね。
Schedulerの実行時刻がずれた場合、どう対応すればよいですか?
前回の実行時刻をデータベースに記録して、今回の実行時にチェックする冪等性の確保がベストプラクティスです。実装例で紹介したように、同じ日付では一度だけ実行するという制約を入れるだけで、ほとんどのケースは対応できると思います。
外部APIが障害で応答しない場合、ワークフロー全体が止まってしまいますか?
リトライロジックとエラーハンドリングを適切に実装していれば、一時的な障害は自動回復できます。ただし長時間の障害に対しては、別途アラート通知とフォールバック処理を用意するのが望ましいです。
複数のSchedulerが同じタイミングで実行されて、競合することはありませんか?
ワークフローエンジンが自動的に調整してくれることがほとんどですが、同じリソースに対して書き込みをする場合はロック機構の導入を検討してください。特にデータベース更新が発生するSchedulerは、実装時の検証を念入りに行うことをお勧めします。

