<?php
namespace App\Jobs;
use App\Rabbitmq;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Maclof\Kubernetes\Models\Secret;
use Maclof\Kubernetes\Models\Service;
use Maclof\Kubernetes\Models\Ingress;
use Maclof\Kubernetes\Models\PersistentVolumeClaim;
class DeployRabbitmqJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 3;
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
/**
* @var rabbitmq
*/
protected $rabbitmq;
/**
* @var \Maclof\Kubernetes\Client
*/
private $client;
/**
* DeployRabbitmqJob constructor.
* @param Rabbitmq $rabbitmq
* @return void
*/
public function __construct(Rabbitmq $rabbitmq)
{
$this->rabbitmq = $rabbitmq;
}
/**
* Execute the job.
*
* @return void
* @throws \Exception
*/
public function handle()
{
if ($this->rabbitmq->state == config('state.failed')) {
\Log::warning("rabbitmq " . $this->rabbitmq->name . "is failed.");
return;
}
$this->client = $this->rabbitmq->cluster->client();
$state = $this->rabbitmq->state;
$desired_state = $this->rabbitmq->desired_state;
//判断 job 执行时数据状态是否发生改变,说明已经被其他job改变了,导致rabbitmq与数据库不同步,返回false,则结束。
if (!$this->checkRabbitmq($state, $desired_state)) {
return;
}
switch ($desired_state) {
case config('state.started'):
case config('state.restarted'):
$this->processStarted();
break;
case config('state.destroyed'):
$this->processDestroyed();
break;
}
}
/**
* @param string $state
* @param string $desired_state
* @return boolean
*/
private function checkRabbitmq($state, $desired_state)
{
$rabbitmq = Rabbitmq::find($this->rabbitmq->id);
if (!$rabbitmq || $state != $rabbitmq->state || $desired_state != $rabbitmq->desired_state) {
\Log::warning("rabbitmq " . $rabbitmq->name .
"'s state or desired_state has been changed");
return false;
}
if ($state == $desired_state && ($state == config('state.started') || $state == config('state.restarted'))) {
if (!$this->allAvailable() && $this->rabbitmq->state != config('state.failed')) {
$this->rabbitmq->update(['state' => config('state.pending')]);
}
return false;
}
return true;
}
private function allAvailable()
{
$rabbitmq = getDeployment($this->client,$this->rabbitmq->name);
// if (!pvcAvailable($this->client,$this->rabbitmq->name) && !$rabbitmq) {
// return false;
// }
try {
$deploymentPackage = [
"secret" => getSecret($this->client,$this->rabbitmq->name),
"deployment" => getDeployment($this->client,$this->rabbitmq->name),
"service" => getService($this->client,$this->rabbitmq->name),
// "ingress" => $this->getIngress(),
];
foreach ($deploymentPackage as $key => $val) {
if (!$val || getAnnotationBykey(
$deploymentPackage[$key],
'time_deployment'
) != md5($this->rabbitmq->updated_at->toDateTimeString())) {
\Log::info(
"$key:" . $this->rabbitmq->name . ' doesn\'t exist or update updated_at '
);
return false;
}
}
if ($deploymentPackage['deployment']->toArray()['status']['readyReplicas'] == 0) {
\Log::warning('rabbitmq deployment ' . $this->rabbitmq->name . ' starts failed');
$this->rabbitmq->update(['state' => config('state.failed')]);
requestAsync_post(
$this->rabbitmq->callback_url,
"rabbitmq",
["status" => $deploymentPackage['deployment']->toArray()['status']],
$this->rabbitmq->attributesToArray()
);
return false;
}
//更新labels
if(getAnnotationBykey($rabbitmq, 'labels') !== md5($this->rabbitmq->labels)){
\Log::info($this->rabbitmq->name.' labels ' . 'has changed');
return false;
}
} catch (\Exception $exception) {
\Log::error("allAvailable:" . $exception->getMessage());
return false;
}
return true;
}
private function getIngress()
{
try {
if (!$this->client->ingresses()->exists($this->rabbitmq->name)) {
return null;
}
return $this->client->ingresses()
->setLabelSelector(['app' => $this->rabbitmq->name])
->first();
} catch (\Exception $exception) {
return null;
}
}
/**
* 创建sercet
*/
private function tryCreateSecrets()
{
$yaml = [
'metadata' => [
'name' => $this->rabbitmq->name,
'labels' => $this->allLabels(),
'annotations' => commonAnnotations($this->rabbitmq),
],
'type' => 'Opaque',
'data' => $this->accountInfo(),
];
$secret = new Secret($yaml);
try {
if ($this->client->secrets()->exists($this->rabbitmq->name)) {
\Log::info('patch secret ' . $this->rabbitmq->name);
$this->client->secrets()->patch($secret);
} else {
\Log::info('try to create secret ' . $this->rabbitmq->name);
$this->client->secrets()->create($secret);
}
} catch (\Exception $exception) {
\Log::warning('create secret ' . $this->rabbitmq->name);
$this->client->secrets()->create($secret);
}
}
private function tryCreateRabbitmq()
{
$image = 'harbor.oneitfarm.com/deployv2/rabbitmq:bitnami';
$yaml = [
'metadata' => [
'name' => $this->rabbitmq->name,
'labels' => $this->allLabels(),
'annotations' => commonAnnotations($this->rabbitmq),
],
'spec' => [
'replicas' => 1,
//修改selector会导致孤儿replicaset产生,故固定selector
'selector' => ['matchLabels' => commonLabels($this->rabbitmq)],
'template' => [
'metadata' => ['labels' => $this->allLabels()],
'spec' => [
//'imagePullSecrets' => [['name' => 'aliyun-registry-vpc']],
'containers' => [
[
'name' => 'rabbitmq',
'imagePullPolicy' => 'IfNotPresent',
'image' => $image,
'ports' => [
[
'name' => 'http',
'containerPort' => 5672,
'protocol' => 'TCP',
],
[
'name' => 'web',
'containerPort' => 15672,
'protocol' => 'TCP',
],
],
'env' => [
[
//"name" => "RABBITMQ_DEFAULT_USER",
"name" => "RABBITMQ_USERNAME",
"valueFrom" => [
"secretKeyRef" => [
"name" => $this->rabbitmq->name,
"key" => "RABBITMQ_USERNAME"
]
]
],
[
//"name" => "RABBITMQ_DEFAULT_PASS",
"name" => "RABBITMQ_PASSWORD",
"valueFrom" => [
"secretKeyRef" => [
"name" => $this->rabbitmq->name,
"key" => "RABBITMQ_PASSWORD"
]
]
],
//todo:集群共享cookie
// [
// "name" => "RABBITMQ_ERLANG_COOKIE",
// "value" => "secret cookie here"
// ],
],
'resources' => [
'limits' => ['cpu' => '100m', 'memory' => '512Mi'],
],
// 'volumeMounts' => [
// [
// 'mountPath' => '/bitnami',
// 'name' => 'rabbitmq-data',
// ],
// ],
],
],
// 'volumes' => [
// [
// 'name' => 'rabbitmq-data',
// 'persistentVolumeClaim' => [
// 'claimName' => $this->rabbitmq->name
// ]
// ],
// ],
],
],
],
];
$deployment = new \Maclof\Kubernetes\Models\Deployment($yaml);
try {
if ($this->client->deployments()->exists($this->rabbitmq->name)) {
\Log::info('patch deployment ' . $this->rabbitmq->name);
$this->client->deployments()->patch($deployment);
} else {
\Log::info('try to craete rabbitmq deployment ' . $this->rabbitmq->name);
$this->client->deployments()->create($deployment);
}
} catch (\Exception $exception) {
\Log::warning('create rabbitmq deployment ' . $this->rabbitmq->name);
$this->client->deployments()->create($deployment);
}
}
//创建svc
private function tryCreateService()
{
$service = new Service([
'metadata' => [
'name' => $this->rabbitmq->name,
'labels' => $this->allLabels(),
'annotations' => commonAnnotations($this->rabbitmq),
],
'spec' => [
'type' => 'ClusterIP',
'ports' => [
[
'port' => 5672,
'targetPort' => 'http',
'protocol' => 'TCP',
'name' => 'http'
],
[
'port' => 15672,
'targetPort' => 'web',
'protocol' => 'TCP',
'name' => 'web'
],
],
'selector' => commonLabels($this->rabbitmq),
],
]);
try {
if ($this->client->services()->exists($this->rabbitmq->name)) {
\Log::info('Patch service ' . $this->rabbitmq->name);
$this->client->services()->patch($service);
} else {
\Log::info('try to create Service ' . $this->rabbitmq->name);
$this->client->services()->create($service);
}
} catch (\Exception $exception) {
\Log::warning('crate service ' . $this->rabbitmq->name);
$this->client->services()->create($service);
}
}
private function tryCreateIngress()
{
$ingress = new Ingress([
'metadata' => [
'name' => $this->rabbitmq->name,
'labels' => $this->allLabels(),
'annotations' => commonAnnotations($this->rabbitmq),
],
'spec' => [
'tls' => [
['hosts' => [
$this->rabbitmq->name . '-dev.oneitfarm.com',], 'secretName' => 'oneitfarm-secret'],
],
'rules' => [
[
'host' => $this->rabbitmq->name . '-dev.oneitfarm.com',
'http' => [
'paths' => [
[
'path' => '/',
'backend' => ['serviceName' => $this->rabbitmq->name, 'servicePort' => 'web'],
],
],
],
],
],
],
]);
try {
if ($this->client->ingresses()->exists($this->rabbitmq->name)) {
\Log::info('patch ingress ' . $this->rabbitmq->name);
$this->client->ingresses()->patch($ingress);
} else {
\Log::info('try create ingress ' . $this->rabbitmq->name);
$this->client->ingresses()->create($ingress);
}
} catch (\Exception $exception) {
\Log::warning('create ingress ' . $this->rabbitmq->name);
$this->client->ingresses()->create($ingress);
}
}
private function accountInfo()
{
$account = [
"RABBITMQ_USERNAME" => base64_encode($this->rabbitmq->username),
"RABBITMQ_PASSWORD" => base64_encode($this->rabbitmq->password),
];
return $account;
}
//返回所有标签
private function allLabels()
{
$sysLabels = commonLabels($this->rabbitmq);
$newLabels = [];
if ($this->rabbitmq->labels) {
$newLabels = json_decode($this->rabbitmq->labels, true);
foreach ($newLabels as $k => $v) {
if (is_int($v)) {
$v = (string)$v;
}
$newLabels[$k] = $v;
}
}
$oldLabels = [];
if ($this->client->deployments()->exists($this->rabbitmq->name)) {
$oldDeployment = getDeployment($this->client,$this->rabbitmq->name);
if (!is_null($oldDeployment)) {
$oldDeployment = $oldDeployment->toArray();
if (isset($oldDeployment['metadata']['labels'])) {
$oldLabels = $oldDeployment['metadata']['labels'];
}
}
}
$newLabels = filterArray($sysLabels, $oldLabels, $newLabels);
$newLabels = array_merge($sysLabels, $newLabels);
return $newLabels;
}
private function processStarted()
{
if ($this->allAvailable()) {
$this->rabbitmq->timestamps=false;
$this->rabbitmq->update(['state' => $this->rabbitmq->desired_state]);
$this->rabbitmq->timestamps=true;
requestAsync_post(
$this->rabbitmq->callback_url,
"rabbitmq",
["logs" => "deployment success"],
$this->rabbitmq->attributesToArray()
);
return;
}
// $this->tryCreatePvc();
$this->tryCreateSecrets();
$this->tryCreateRabbitmq();
$this->tryCreateService();
//$this->tryCreateIngress();
}
//创建pvc
private function tryCreatePvc()
{
$pvc = new PersistentVolumeClaim([
'metadata' => [
'name' => $this->rabbitmq->name,
'labels' => $this->allLabels(),
],
'spec' => [
'accessModes' => ['ReadWriteMany'],
'storageClassName' => 'nfs-ssd',
'resources' => ['requests' => ['storage' => '1Gi'],],
'selector' => ['matchLabels' => commonLabels($this->rabbitmq),],
],
]);
try {
if (!$this->client->persistentVolumeClaims()->exists($this->rabbitmq->name)) {
\Log::info("try to create pvc");
$this->client->persistentVolumeClaims()->create($pvc);
}
} catch (\Exception $exception) {
}
}
/**
* @throws \Exception
* destory开始,如果tryDeleteResources成功,那么下次就会check,那么就会去删除数据库
* 改为直接删
*/
private function processDestroyed()
{
$name = $this->rabbitmq->name;
$kubernetesPriorityReps = [
$this->client->deployments(),
$this->client->services(),
//$this->client->ingresses(),
];
$kubernetesAfterReps = [
$this->client->persistentVolumeClaims(),
$this->client->secrets(),
];
if (tryDeleteResources($kubernetesPriorityReps, $kubernetesAfterReps, $name)) {
$this->rabbitmq->delete();
}
}
}
原文:https://www.cnblogs.com/xiongyungang/p/10812543.html