Skip to main content

takutakahashi.dev

Rook/Ceph のクラスタ作成処理を読む

この記事は Rookと仲間たち、クラウドネイティブなストレージの Advent Calendar 2020 の21日目の記事です。

今回は、kubectl apply -f cephcluster.yaml を実行した際に、
rook がどのように ceph cluster を作成するのかの処理を追ってみました。

rook の Version は v1.5.3 を利用しました。

まず、reconcile loop の始まりを見つけます。

https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/controller.go#L229 の reconcile で reconcilation loop が実行されます。

この関数から見ていきます。

func (r *ReconcileCephCluster) reconcile

まず CephCluster に Finalizer を付けます。

これは Delete 処理が実行された際に特定の Cleanup 処理を実行するためのものです。

Finalizer がないリソースは、Delete された際に即座に削除されます。

err = opcontroller.AddFinalizerIfNotPresent(r.client, cephCluster)

create, delete で処理が分岐します。まずは createを見てみます。

create は以下の関数により実行されます。

関数は https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/controller.go#L325 にあります。

func (c *ClusterController) onAdd(clusterObj *cephv1.CephCluster, ref *metav1.OwnerReference) error

controller は Namespace ごとに cluster 構造体の情報を持っているらしかったです。

cluster 構造体は以下のメンバを持っており、ユーザーに直接見せるインターフェースよりも、

クラスタの構築に必要な関数を持つ構造体となっています。

type cluster struct {
	ClusterInfo        *client.ClusterInfo
	context            *clusterd.Context
	Namespace          string
	Spec               *cephv1.ClusterSpec
	crdName            string
	mons               *mon.Cluster
	stopCh             chan struct{}
	closedStopCh       bool
	ownerRef           metav1.OwnerReference
	isUpgrade          bool
	watchersActivated  bool
	monitoringChannels map[string]*clusterHealth
}

Namespace ごとに独立した cluster を定義できるらしい。なるほど?

cluster, ok := c.clusterMap[clusterObj.Namespace]
	if !ok {
		// It's a new cluster so let's populate the struct
		cluster = newCluster(clusterObj, c.context, c.csiConfigMutex, ref)
	}

CSIMutex をロックし、callback を実行後アンロックします。何をやっているかは置いておく。

最後に initalizeCluster を実行し、Ceph クラスタの作成処理が実行されます。

return c.initializeCluster(cluster, clusterObj)

この関数の中で clusterObj は Spec を cluster に引き渡す役割のみを行い、

後続の関数では全て cluster を引き回す形で実装されています。

この関数は https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/cluster.go#L167 にあります。

func (c *ClusterController) initializeCluster(cluster *cluster, clusterObj *cephv1.CephCluster) error

cluster の初期化処理が続き、その後 ExternalCluster かどうかで分岐が入ります。

if cluster.Spec.External.Enable {
		err := c.configureExternalCephCluster(cluster)
		if err != nil {
			config.ConditionExport(c.context, c.namespacedName, cephv1.ConditionFailure, v1.ConditionTrue, "ClusterFailure", "Failed to configure external ceph cluster")
			return errors.Wrap(err, "failed to configure external ceph cluster")
		}
	} else {
...
}
...
c.configureCephMonitoring(cluster, cluster.ClusterInfo)
return nil

externalCluster の設定は、なんとこの関数だけで実行されます。わかりやすい。

(正確には、後続で monitoring の設定が実行される)

err := c.configureExternalCephCluster(cluster)

externalCluster の処理は置いておいて、localCluster の作成の処理を続けて追っていきます。

mgr の Deployment が存在するか確認します。

app=rook-ceph-mgr というラベルをもつ Deployment を探してくるらしい。

mgr Pod が起動しているかどうかの確認はしていないようでした。

opts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", k8sutil.AppAttr, mgr.AppName)}
		mgrDeployments, err := c.context.Clientset.AppsV1().Deployments(cluster.Namespace).List(ctx, opts)
		if err == nil && len(mgrDeployments.Items) > 0 && cluster.ClusterInfo != nil {
			c.configureCephMonitoring(cluster, clusterInfo)
		}

		err = c.configureLocalCephCluster(cluster)

localCluster の構築も、このようにひとつの関数で行われます。

err = c.configureLocalCephCluster(cluster)

この関数の中身を見ていきます。

関数は https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/cluster.go#L220 にあります。

func (c *ClusterController) configureLocalCephCluster(cluster *cluster) error

この関数の中では、すでに構築されている Ceph のバージョンを確認し、

アップグレードをするのかどうかを判定、その後オーケストレーションするという流れで進みます。

まずバージョンチェック。Job が立ち上がります。

cephVersion, isUpgrade, err := c.detectAndValidateCephVersion(cluster)
cluster.isUpgrade = isUpgrade

CephCluster の Condition をみて、Ready であった(すでにクラスタがあり構築完了していた)場合と、そうでない場合(新規クラスタ構築 or 途中までできている) で message の文言を変えます。

構築の状態管理は、Condition で行われるようでした。

こちらに記載のある、レベルドリブントリガーを実装しています。

https://zoetrope.github.io/kubebuilder-training/introduction/basics.html

message := config.CheckConditionReady(c.context, c.namespacedName)
config.ConditionExport(c.context, c.namespacedName, cephv1.ConditionProgressing, v1.ConditionTrue, "ClusterProgressing", message)

その後、orchestration が実行されます。この後はこの関数の中身を見ていきます。

err = cluster.doOrchestration(c.rookImage, *cephVersion, cluster.Spec)

この関数は https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/cluster.go#L90 にあります。

この関数は cluster の中身を参照できるので、cluster.Spec は引数に与える必要ないのでは?と思えます。

func (c *cluster) doOrchestration(rookImage string, cephVersion cephver.CephVersion, spec *cephv1.ClusterSpec) error

まず、ceph の設定をユーザーが上書きするための rook-config-override configMap を作成します。

err := populateConfigOverrideConfigMap(c.context, c.Namespace, c.ownerRef)

mon を起動させる。中では、起動中の mon の個数よりも期待する mon の個数が少なかったら足す、といった動きをします。

mon を作るところだけでかなり長くなるので、後述します。

clusterInfo, err := c.mons.Start(c.ClusterInfo, rookImage, cephVersion, *c.Spec)

post action を実行して、mon の起動処理が終了します。

err = c.postMonStartupActions()

mgr を起動します。気力があったら書きます。

mgrs := mgr.New(c.context, c.ClusterInfo, *spec, rookImage)
err = mgrs.Start()

osd を起動します。気力があったら書きます。

osd を作るところがおそらくいちばん大変かつ面白いところである気がするので、今度調査して追記します。

osds := osd.New(c.context, c.ClusterInfo, *spec, rookImage)
err = osds.Start()

これで ceph cluster の initalization が終わり、全ての reconcilation が終了します。

mon の起動処理

一旦省略した mons.Start について、中身を見ていきます。

clusterInfo, err := c.mons.Start(c.ClusterInfo, rookImage, cephVersion, *c.Spec)

この関数は、 https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L186 にあります。

func (c *Cluster) Start(clusterInfo *cephclient.ClusterInfo, rookVersion string, cephVersion cephver.CephVersion, spec cephv1.ClusterSpec) (*cephclient.ClusterInfo, error)

この関数では、以下の内容の処理を実施します。

  • Orchestration の作業に対しロックを取る
  • 起動可能かの Validation を実施する
  • ClusterInfo の初期化をする
  • mon を起動する

この、mon を起動する というところから見ていきます。

この関数は return で実行されます。

return c.ClusterInfo, c.startMons(c.spec.Mon.Count)

この関数は、 https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L221 にあります。

func (c *Cluster) startMons(targetCount int) error

まず、mon の設定情報を取得します。

existingCount, mons, err := c.initMonConfig(targetCount)

mons*monConfig の配列であり、 monConfig は以下の要素を持つ構造体です。

名前とか IP とか、mon の識別子として利用できそうな値を持っています。

type monConfig struct {
	// ResourceName is the name given to the mon's Kubernetes resources in metadata
	ResourceName string
	// DaemonName is the name given the mon daemon ("a", "b", "c,", etc.)
	DaemonName string
	// PublicIP is the IP of the mon's service that the mon will receive connections on
	PublicIP string
	// Port is the port on which the mon will listen for connections
	Port int32
	// The zone used for a stretch cluster
	Zone string
	// DataPathMap is the mapping relationship between mon data stored on the host and mon data
	// stored in containers.
	DataPathMap *config.DataPathMap
}

そして、mon を配置する node を決定します。

if err := c.assignMons(mons); err != nil {
		return errors.Wrap(err, "failed to assign pods to mons")
	}

この関数は https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L708 にあります。

func (c *Cluster) assignMons(mons []*monConfig) error

関数の中では、monConfig ごとに node を scheduling する処理が行われています。

deployment, err := scheduleMonitor(c, mon)

scheduleMonitor の中では結構複雑なことをやっているので、自然言語で解説をします。

この中では、 mon とほぼ同じ構造を持ち、/tini /bin/sleep を実行する canary Deployment の作成を kubernetes API に実行します。

そして、作成に成功したら(scheduling に成功したらではない)、成功した *appsv1.Deployment を返却します。

deployment, err で得られる deployment は、 canary deployment の Object であることがわかります。

その得られた deployment を利用して、mon 毎に非同期で以下の作業が行われます。

まず、canary deployment が scheduling されるのを待ちます。

result, err := waitForMonitorScheduling(c, deployment)

この関数は、実態は realWaitForMonitorScheduling であり、

https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L620 にあります。

func realWaitForMonitorScheduling(c *Cluster, d *apps.Deployment) (SchedulingResult, error)

内部では、LabelSelector: labels.Set(d.Spec.Selector.MatchLabels).String() となる Pod を探し、該当する Pod の Spec.NodeName から、corev1.Node を見つけ出して SchedulingResult に入れて return します。

つまり、mon を起動させるノードを見つけるために、偽物の Deployment を建ててしまってそれが起動できるノードを kube-scheduler に探させるという手法を取っています。

これにより、affinity や topologySpreadConstraints など、Pod で指定できる scheduling method は全て mon の起動先決定に利用できるということになります。賢いな〜

さて、処理は assignMons に戻ります。

schedulingInfo を無事に得ることができると、情報はさらに整形され、map に格納されます。

nodeChoice := result.Node
var schedule *MonScheduleInfo
if c.spec.Network.IsHost() || c.monVolumeClaimTemplate(mon) == nil {
				logger.Infof("assignmon: mon %s assigned to node %s", mon.DaemonName, nodeChoice.Name)
				schedule, err = getNodeInfoFromNode(*nodeChoice)
}
...
c.mapping.Schedule[mon.DaemonName] = schedule

上記の条件に引っかからない mon の schedule は nil が格納されていそうに見えますね。どのように処理されるんだろう。。。

これで assignMons が終わったので、実際に mon を建てていくフェーズに移ります。

処理は startMons に返ってきます。

mons の中を for loop で回し、以下を実行します。

for i := existingCount; i < targetCount; i++ {
...
  if err := c.ensureMonsRunning(mons, i, targetCount, true); err != nil {
	  			return err
		  	}
...
}

この関数は、 https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L369 にあります。

func (c *Cluster) ensureMonsRunning(mons []*monConfig, i, targetCount int, requireAllInQuorum bool) error

この関数では、以下の処理を実施します。

まず IP を決めます。IP は、hostNetwork なら Node IP が、そうでなければ svc を作り、svc の ClusterIP が利用されます。

if err := c.initMonIPs(mons[0:expectedMonCount]); err != nil {
		return errors.Wrap(err, "failed to init mon services")
	}

configMap と csi に mon の設定を保存します。

rook-ceph-mon-endpoints という名前の cm に保存されるらしいです。

cm だけでなく、dir や csi config にも保存されるらしいですが、そこはよくわからなかった。。。

if err := c.saveMonConfig(); err != nil {
		return errors.Wrap(err, "failed to save mons")
	}

ceph client 用の config を disk に書き込みます。

if err := WriteConnectionConfig(c.context, c.ClusterInfo); err != nil {
		return err
	}

mon の deployment を起動します。

if err := c.startDeployments(mons[0:expectedMonCount], requireAllInQuorum); err != nil {
		return errors.Wrap(err, "failed to start mon pods")
	}

これは、 https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L825 にあります。

func (c *Cluster) startDeployments(mons []*monConfig, requireAllInQuorum bool) error

内部の処理は、まず現状起動している mon をチェックし、足りない分を起動するというものです。

for i := 0; i < len(mons); i++ {
		schedule := c.mapping.Schedule[mons[i].DaemonName]
		err := c.startMon(mons[i], schedule)

startMon は、 https://github.com/rook/rook/tree/v1.5.3/pkg/operator/ceph/cluster/mon/mon.go#L1033 にあります。

func (c *Cluster) startMon(m *monConfig, schedule *MonScheduleInfo) error

すでに起動している Deployment に対しては、update の処理が走ります。

新規に起動する Deployment について見ていきます。

CephCluster から placement を得ます。

p := cephv1.GetMonPlacement(c.spec.Placement)

pvc が必要であれば作成します。

pvc, err := c.makeDeploymentPVC(m, false)
_, err = c.context.Clientset.CoreV1().PersistentVolumeClaims(c.Namespace).Create(ctx, pvc, metav1.CreateOptions{})

schedulingResult が nil の mon に対しては、 app=rook-ceph-mon なところに配置するように affinity を設定し、

そうでないものは host 指定で建てます。

ここで前述の schedulingResult に nil が入りそうという不安が解消されました。

if schedule == nil {
		k8sutil.SetNodeAntiAffinityForPod(&d.Spec.Template.Spec, p, requiredDuringScheduling(&c.spec),
			map[string]string{k8sutil.AppAttr: AppName}, nil)
	} else {
		p.PodAffinity = nil
		p.PodAntiAffinity = nil
		k8sutil.SetNodeAntiAffinityForPod(&d.Spec.Template.Spec, p, requiredDuringScheduling(&c.spec),
			map[string]string{k8sutil.AppAttr: AppName}, map[string]string{v1.LabelHostname: schedule.Hostname})
	}

そして Deployment を作成しておしまい。

_, err = c.context.Clientset.AppsV1().Deployments(c.Namespace).Create(ctx, d, metav1.CreateOptions{})

これで ensureMonsRunning が終わり、 mon の起動処理がだいたい終了します。

reconcilation loop からわかること

ここまで読んで、以下の特徴があると感じました。

  1. ユーザーが定義する CephCluster と、内部で引き回す cluster が独立して定義されている
  2. Condition を利用したレベルドリブントリガーが実装されている
  3. scheduling のために canary deployment を建ててその情報をそっくり利用する

これらは、今後自分で実装するコントローラでも使っていけるテクニックだと感じました。

また、ClusterProgressing の Phase で実行される処理がとても多いなと感じました。

mgr, mon の起動あたりで Condition 分割しても良さそうだなと思ったのですが、結局アップデートを考慮すると、各処理で適切に no-op させるほうが確実なのかな。。。とも思いました。

個人的に期待する新たな可能性

Namespace ごとに Ceph Cluster が作成できそうという部分を初めて知りました。

クラスタ間での mirroring などの実験もできるかも?と期待しています。