Rook/Ceph のクラスタ作成処理を読む
この記事は Rookと仲間たち、クラウドネイティブなストレージの Advent Calendar 2020 の21日目の記事です。
今回は、kubectl apply -f cephcluster.yaml
を実行した際に、
rook がどのように ceph cluster を作成するのかの処理を追ってみました。
rook の Version は v1.5.3 を利用しました。
まず、reconcile loop の始まりを見つけます。
https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/controller.go#L229 の reconcile で reconcilation loop が実行されます。
この関数から見ていきます。
func (r *ReconcileCephCluster) reconcile
まず CephCluster に Finalizer を付けます。
これは Delete 処理が実行された際に特定の Cleanup 処理を実行するためのものです。
Finalizer がないリソースは、Delete された際に即座に削除されます。
err = opcontroller.AddFinalizerIfNotPresent(r.client, cephCluster)
create, delete で処理が分岐します。まずは createを見てみます。
create は以下の関数により実行されます。
関数は https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/controller.go#L325 にあります。
func (c *ClusterController) onAdd(clusterObj *cephv1.CephCluster, ref *metav1.OwnerReference) error
controller は Namespace ごとに cluster 構造体の情報を持っているらしかったです。
cluster 構造体は以下のメンバを持っており、ユーザーに直接見せるインターフェースよりも、
クラスタの構築に必要な関数を持つ構造体となっています。
type cluster struct {
ClusterInfo *client.ClusterInfo
context *clusterd.Context
Namespace string
Spec *cephv1.ClusterSpec
crdName string
mons *mon.Cluster
stopCh chan struct{}
closedStopCh bool
ownerRef metav1.OwnerReference
isUpgrade bool
watchersActivated bool
monitoringChannels map[string]*clusterHealth
}
Namespace ごとに独立した cluster を定義できるらしい。なるほど?
cluster, ok := c.clusterMap[clusterObj.Namespace]
if !ok {
// It's a new cluster so let's populate the struct
cluster = newCluster(clusterObj, c.context, c.csiConfigMutex, ref)
}
CSIMutex をロックし、callback を実行後アンロックします。何をやっているかは置いておく。
最後に initalizeCluster を実行し、Ceph クラスタの作成処理が実行されます。
return c.initializeCluster(cluster, clusterObj)
この関数の中で clusterObj は Spec を cluster に引き渡す役割のみを行い、
後続の関数では全て cluster を引き回す形で実装されています。
この関数は https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/cluster.go#L167 にあります。
func (c *ClusterController) initializeCluster(cluster *cluster, clusterObj *cephv1.CephCluster) error
cluster の初期化処理が続き、その後 ExternalCluster かどうかで分岐が入ります。
if cluster.Spec.External.Enable {
err := c.configureExternalCephCluster(cluster)
if err != nil {
config.ConditionExport(c.context, c.namespacedName, cephv1.ConditionFailure, v1.ConditionTrue, "ClusterFailure", "Failed to configure external ceph cluster")
return errors.Wrap(err, "failed to configure external ceph cluster")
}
} else {
...
}
...
c.configureCephMonitoring(cluster, cluster.ClusterInfo)
return nil
externalCluster の設定は、なんとこの関数だけで実行されます。わかりやすい。
(正確には、後続で monitoring の設定が実行される)
err := c.configureExternalCephCluster(cluster)
externalCluster の処理は置いておいて、localCluster の作成の処理を続けて追っていきます。
mgr の Deployment が存在するか確認します。
app=rook-ceph-mgr
というラベルをもつ Deployment を探してくるらしい。
mgr Pod が起動しているかどうかの確認はしていないようでした。
opts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", k8sutil.AppAttr, mgr.AppName)}
mgrDeployments, err := c.context.Clientset.AppsV1().Deployments(cluster.Namespace).List(ctx, opts)
if err == nil && len(mgrDeployments.Items) > 0 && cluster.ClusterInfo != nil {
c.configureCephMonitoring(cluster, clusterInfo)
}
err = c.configureLocalCephCluster(cluster)
localCluster の構築も、このようにひとつの関数で行われます。
err = c.configureLocalCephCluster(cluster)
この関数の中身を見ていきます。
関数は https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/cluster.go#L220 にあります。
func (c *ClusterController) configureLocalCephCluster(cluster *cluster) error
この関数の中では、すでに構築されている Ceph のバージョンを確認し、
アップグレードをするのかどうかを判定、その後オーケストレーションするという流れで進みます。
まずバージョンチェック。Job が立ち上がります。
cephVersion, isUpgrade, err := c.detectAndValidateCephVersion(cluster)
cluster.isUpgrade = isUpgrade
CephCluster の Condition をみて、Ready であった(すでにクラスタがあり構築完了していた)場合と、そうでない場合(新規クラスタ構築 or 途中までできている) で message の文言を変えます。
構築の状態管理は、Condition で行われるようでした。
こちらに記載のある、レベルドリブントリガーを実装しています。
https://zoetrope.github.io/kubebuilder-training/introduction/basics.html
message := config.CheckConditionReady(c.context, c.namespacedName)
config.ConditionExport(c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, "ClusterProgressing", message)
その後、orchestration が実行されます。この後はこの関数の中身を見ていきます。
err = cluster.doOrchestration(c.rookImage, *cephVersion, cluster.Spec)
この関数は https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/cluster.go#L90 にあります。
この関数は cluster の中身を参照できるので、cluster.Spec は引数に与える必要ないのでは?と思えます。
func (c *cluster) doOrchestration(rookImage string, cephVersion cephver.CephVersion, spec *cephv1.ClusterSpec) error
まず、ceph の設定をユーザーが上書きするための rook-config-override
configMap を作成します。
err := populateConfigOverrideConfigMap(c.context, c.Namespace, c.ownerRef)
mon を起動させる。中では、起動中の mon の個数よりも期待する mon の個数が少なかったら足す、といった動きをします。
mon を作るところだけでかなり長くなるので、後述します。
clusterInfo, err := c.mons.Start(c.ClusterInfo, rookImage, cephVersion, *c.Spec)
post action を実行して、mon の起動処理が終了します。
err = c.postMonStartupActions()
mgr を起動します。気力があったら書きます。
mgrs := mgr.New(c.context, c.ClusterInfo, *spec, rookImage)
err = mgrs.Start()
osd を起動します。気力があったら書きます。
osd を作るところがおそらくいちばん大変かつ面白いところである気がするので、今度調査して追記します。
osds := osd.New(c.context, c.ClusterInfo, *spec, rookImage)
err = osds.Start()
これで ceph cluster の initalization が終わり、全ての reconcilation が終了します。
mon の起動処理
一旦省略した mons.Start
について、中身を見ていきます。
clusterInfo, err := c.mons.Start(c.ClusterInfo, rookImage, cephVersion, *c.Spec)
この関数は、 https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L186 にあります。
func (c *Cluster) Start(clusterInfo *cephclient.ClusterInfo, rookVersion string, cephVersion cephver.CephVersion, spec cephv1.ClusterSpec) (*cephclient.ClusterInfo, error)
この関数では、以下の内容の処理を実施します。
- Orchestration の作業に対しロックを取る
- 起動可能かの Validation を実施する
- ClusterInfo の初期化をする
- mon を起動する
この、mon を起動する
というところから見ていきます。
この関数は return で実行されます。
return c.ClusterInfo, c.startMons(c.spec.Mon.Count)
この関数は、 https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L221 にあります。
func (c *Cluster) startMons(targetCount int) error
まず、mon の設定情報を取得します。
existingCount, mons, err := c.initMonConfig(targetCount)
mons
は *monConfig
の配列であり、 monConfig
は以下の要素を持つ構造体です。
名前とか IP とか、mon の識別子として利用できそうな値を持っています。
type monConfig struct {
// ResourceName is the name given to the mon's Kubernetes resources in metadata
ResourceName string
// DaemonName is the name given the mon daemon ("a", "b", "c,", etc.)
DaemonName string
// PublicIP is the IP of the mon's service that the mon will receive connections on
PublicIP string
// Port is the port on which the mon will listen for connections
Port int32
// The zone used for a stretch cluster
Zone string
// DataPathMap is the mapping relationship between mon data stored on the host and mon data
// stored in containers.
DataPathMap *config.DataPathMap
}
そして、mon を配置する node を決定します。
if err := c.assignMons(mons); err != nil {
return errors.Wrap(err, "failed to assign pods to mons")
}
この関数は https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L708 にあります。
func (c *Cluster) assignMons(mons []*monConfig) error
関数の中では、monConfig ごとに node を scheduling する処理が行われています。
deployment, err := scheduleMonitor(c, mon)
scheduleMonitor
の中では結構複雑なことをやっているので、自然言語で解説をします。
この中では、 mon とほぼ同じ構造を持ち、/tini /bin/sleep
を実行する canary Deployment の作成を kubernetes API に実行します。
そして、作成に成功したら(scheduling に成功したらではない)、成功した *appsv1.Deployment
を返却します。
deployment, err
で得られる deployment は、 canary deployment の Object であることがわかります。
その得られた deployment を利用して、mon 毎に非同期で以下の作業が行われます。
まず、canary deployment が scheduling されるのを待ちます。
result, err := waitForMonitorScheduling(c, deployment)
この関数は、実態は realWaitForMonitorScheduling
であり、
https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L620 にあります。
func realWaitForMonitorScheduling(c *Cluster, d *apps.Deployment) (SchedulingResult, error)
内部では、LabelSelector: labels.Set(d.Spec.Selector.MatchLabels).String()
となる Pod を探し、該当する Pod の Spec.NodeName
から、corev1.Node
を見つけ出して SchedulingResult
に入れて return します。
つまり、mon を起動させるノードを見つけるために、偽物の Deployment を建ててしまってそれが起動できるノードを kube-scheduler に探させるという手法を取っています。
これにより、affinity や topologySpreadConstraints など、Pod で指定できる scheduling method は全て mon の起動先決定に利用できるということになります。賢いな〜
さて、処理は assignMons に戻ります。
schedulingInfo を無事に得ることができると、情報はさらに整形され、map に格納されます。
nodeChoice := result.Node
var schedule *MonScheduleInfo
if c.spec.Network.IsHost() || c.monVolumeClaimTemplate(mon) == nil {
logger.Infof("assignmon: mon %s assigned to node %s", mon.DaemonName, nodeChoice.Name)
schedule, err = getNodeInfoFromNode(*nodeChoice)
}
...
c.mapping.Schedule[mon.DaemonName] = schedule
上記の条件に引っかからない mon の schedule は nil が格納されていそうに見えますね。どのように処理されるんだろう。。。
これで assignMons が終わったので、実際に mon を建てていくフェーズに移ります。
処理は startMons に返ってきます。
mons の中を for loop で回し、以下を実行します。
for i := existingCount; i < targetCount; i++ {
...
if err := c.ensureMonsRunning(mons, i, targetCount, true); err != nil {
return err
}
...
}
この関数は、 https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L369 にあります。
func (c *Cluster) ensureMonsRunning(mons []*monConfig, i, targetCount int, requireAllInQuorum bool) error
この関数では、以下の処理を実施します。
まず IP を決めます。IP は、hostNetwork なら Node IP が、そうでなければ svc を作り、svc の ClusterIP が利用されます。
if err := c.initMonIPs(mons[0:expectedMonCount]); err != nil {
return errors.Wrap(err, "failed to init mon services")
}
configMap と csi に mon の設定を保存します。
rook-ceph-mon-endpoints
という名前の cm に保存されるらしいです。
cm だけでなく、dir や csi config にも保存されるらしいですが、そこはよくわからなかった。。。
if err := c.saveMonConfig(); err != nil {
return errors.Wrap(err, "failed to save mons")
}
ceph client 用の config を disk に書き込みます。
if err := WriteConnectionConfig(c.context, c.ClusterInfo); err != nil {
return err
}
mon の deployment を起動します。
if err := c.startDeployments(mons[0:expectedMonCount], requireAllInQuorum); err != nil {
return errors.Wrap(err, "failed to start mon pods")
}
これは、 https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L825 にあります。
func (c *Cluster) startDeployments(mons []*monConfig, requireAllInQuorum bool) error
内部の処理は、まず現状起動している mon をチェックし、足りない分を起動するというものです。
for i := 0; i < len(mons); i++ {
schedule := c.mapping.Schedule[mons[i].DaemonName]
err := c.startMon(mons[i], schedule)
startMon は、 https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L1033 にあります。
func (c *Cluster) startMon(m *monConfig, schedule *MonScheduleInfo) error
すでに起動している Deployment に対しては、update の処理が走ります。
新規に起動する Deployment について見ていきます。
CephCluster から placement を得ます。
p := cephv1.GetMonPlacement(c.spec.Placement)
pvc が必要であれば作成します。
pvc, err := c.makeDeploymentPVC(m, false)
_, err = c.context.Clientset.CoreV1().PersistentVolumeClaims(c.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
schedulingResult が nil の mon に対しては、 app=rook-ceph-mon
なところに配置するように affinity を設定し、
そうでないものは host 指定で建てます。
ここで前述の schedulingResult に nil が入りそうという不安が解消されました。
if schedule == nil {
k8sutil.SetNodeAntiAffinityForPod(&d.Spec.Template.Spec, p, requiredDuringScheduling(&c.spec),
map[string]string{k8sutil.AppAttr: AppName}, nil)
} else {
p.PodAffinity = nil
p.PodAntiAffinity = nil
k8sutil.SetNodeAntiAffinityForPod(&d.Spec.Template.Spec, p, requiredDuringScheduling(&c.spec),
map[string]string{k8sutil.AppAttr: AppName}, map[string]string{v1.LabelHostname: schedule.Hostname})
}
そして Deployment を作成しておしまい。
_, err = c.context.Clientset.AppsV1().Deployments(c.Namespace).Create(ctx, d, metav1.CreateOptions{})
これで ensureMonsRunning
が終わり、 mon の起動処理がだいたい終了します。
reconcilation loop からわかること
ここまで読んで、以下の特徴があると感じました。
- ユーザーが定義する
CephCluster
と、内部で引き回すcluster
が独立して定義されている - Condition を利用したレベルドリブントリガーが実装されている
- scheduling のために canary deployment を建ててその情報をそっくり利用する
これらは、今後自分で実装するコントローラでも使っていけるテクニックだと感じました。
また、ClusterProgressing
の Phase で実行される処理がとても多いなと感じました。
mgr, mon の起動あたりで Condition 分割しても良さそうだなと思ったのですが、結局アップデートを考慮すると、各処理で適切に no-op させるほうが確実なのかな。。。とも思いました。
個人的に期待する新たな可能性
Namespace ごとに Ceph Cluster が作成できそうという部分を初めて知りました。
クラスタ間での mirroring などの実験もできるかも?と期待しています。