最近hakoの話ばかりしてる @hatappi
業務でとある分散バッチシステムを作成する時にhako oneshotを使用しています
hakoのYAMLファイルにはAutoScalingGroupを設定していてリソースが足りない時は追加してパラレルで実行していきます
検証した時は最大で約60個のhako oneshot
で生み出されたタスクが存在していました
60個並列して走らせた結果何が起きたのか?
Hakoではhako oneshotしたタスクが終わったかどうかの判定をECSのDescribeTasksをpollingしていてタスクのスタータスがSTOPPED
になるまでwaitをかけつつloopさせます
コードだとこのへん
1つ1つのタスクが自分が終わったかをpollingし続けるので60個も並列すると1秒間に数十リクエストは飛ぶ
これによりAPI制限がかかっていたようです
一部はのっているようですが今回に該当する明確な制限のしきい値までは明記はされていないようでした
どのよに対応したのか
おそらくHakoの作者のeagletmtさんもこの壁にあたったのか現在のタスクの状態をS3にはいてそれを見に行くようにするメソッドが生えてました
コードはこちら
実装の大まなかな流れ
Amazon CloudWatch EventsからECSのタスクのイベントを取り出してS3におく
HakoのYAMLファイルにイベントが吐かれたファイルのS3のパスを指定する
Amazon CloudWatch EventsからECSのタスクのイベントを取り出してS3におく
CloudWatch Eventsでルールを作成していく
まずイベントパターンには下記を用いる
これを行うことで特定のクラスターのタスクのステータスが変更された時に発火するようになる
{ "source": [ "aws.ecs" ], "detail-type": [ "ECS Task State Change" ], "detail": { "clusterArn": [ "arn:aws:ecs:ap-northeast-1:111111111:cluster/test-cluster" ] } }
次にターゲットを設定するのだが今回はLambda Functionを使用することにしました
今回はPython3を使用しています
import json import boto3 BUCKET_NAME='hoge_bucket' def lambda_handler(event, context): task_arn = event['detail']['taskArn'] if event['detail'].get('stoppedAt') is not None: filename = 'stopped.json' elif event['detail'].get('startedAt') is not None: filename = 'started.json' else: return 'not running' s3 = boto3.resource('s3') bucket = s3.Bucket(BUCKET_NAME) ret = bucket.put_object( ACL='private', Body=json.dumps(event), Key="ecs-status/" + task_arn + "/" + filename, ContentType='application/json' ) return str(ret)
こんな感じに行うと s3://hoge_bucket/ecs-status
配下にタスクごとのステータスが開始した時のものが started.json
, 終了した時のものが stopped.json
が出力されるようになる
後はHakoのYAMLファイルにて下記のようにoneshot_notification_prefix
にS3のパスを設定する
scheduler: type: ecs region: ap-northeast-1 cluster: test-cluster oneshot_notification_prefix: s3://hoge_bucket/ecs-status autoscaling_group_for_oneshot: test-auto-scaling-group app: image: busybox memory: 1000
後は実行するだけでポーリングの処理をECSのAPIではなくS3にはかれたファイルを見に行くようになる
コード的にはここで分岐している