Go 两数相除
1年前
通过go语言将ES搜索的服务做成一个扩展性高的微服务。然后通过PHP进行操作调用,主要为 建立mapping索引、新增、更新、查询 数据。
一、业务场景:
公司产品SKU超过100多万,产品表是个大表,搜索标题之类的,只用数据库来搜不现实,所以用ES来搜索,主要是需要搜索的字段进入ES,然后通过ES搜索出SKU主键,再去数据查一遍,从而实现快速搜索的目的。
二、实现目标:
通过go语言将ES搜索的服务做成一个扩展性高的微服务。
然后通过PHP进行操作调用,主要为 建立mapping索引、新增、更新、查询 数据。
三、实现步骤:
1、建立mapping映射
2、用PHP+Mysql 把数据查询出来
3、通过PHP 的ES服务类将数据塞入RabbitMq队列
4、go服务开多线程监听rabbitmq队列
5、go服务消费rabbitmq队列的数据,塞入ES
6、PHP 通过 ES服务类 搜索需要搜索的字段
7、将搜索出的主键放入Mysql中in查询数据,组装成列表返回
四、实现代码:
1、配置
$urlEnv,
'index' => $indexEnv,
'mqType' => $env, //mq使用的配置 默认 []
'rabbitmq' => [
'ex_name' => 'GO_ES_SEARCH',
'qu_name' => 'dcm_product_es_search',
're_key' => 'dcm_product_es_search',
'ex_type' => 'direct',
'num' => 2, //线程数量 建议最大不超过 20
], //初始化映射
'primaryKey' => 'sku', //将sku字段作为主键
'addMapping' => [ //后续添加的映射
1 => [
'mapping' => '{
"properties":{
"avgPrice": {
"type": "float"
}
}
}',
'status' => 1, // 1=需要执行 0=已经执行完毕
],
2 => [
'mapping' => '{
"properties":{
"shipCost": {
"type": "float"
}
}
}',
'status' => 1, // 1=需要执行 0=已经执行完毕
],
3 => [
'mapping' => '{
"properties":{
"fee": {
"type": "float"
},
"isFba": {
"type": "integer"
},
"least_order_num": {
"type": "integer"
},
"distributorPlatforms": {
"type": "text"
},
"endTime": {
"type": "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"updatedAt": {
"type": "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"weightOutStorage": {
"type": "float"
},
"netWeight": {
"type": "float"
},
"grossWight": {
"type": "float"
},
"resourceType": {
"type": "float"
},
"isOversea": {
"type": "integer"
},
"warehousePrice": {
"type": "text"
},
"purchasePrice": {
"type": "float"
},
"attrValueIds": {
"type": "float"
},
"createdAt": {
"type": "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"sku": {
"type": "keyword",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"status": {
"type": "integer"
},
"resourceStatus": {
"type": "integer"
},
"productStatus": {
"type": "integer"
}
}
}',
'status' => 1, // 1=需要执行 0=已经执行完毕
],
4 => [
'mapping' => '{
"properties":{
"dcmLevel": {
"type": "integer"
},
"attrIds": {
"type": "nested",
"properties" : {
"id" : {
"type" : "integer"
}
}
},
"tagIds": {
"type": "nested",
"properties" : {
"id" : {
"type" : "integer"
}
}
}
}
}',
'status' => 1, // 1=需要执行 0=已经执行完毕
],
5 => [
'mapping' => '{
"properties": {
"titleEn": {
"type": "text",
"fields": {
"keyword": {
"ignore_above": 500,
"type": "keyword"
}
}
},
"titleCn": {
"type": "text",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"categoryStr": {
"type": "text",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"distributorNumbers": {
"type": "text"
},
"infringementRemarks": {
"type": "keyword",
"fields": {
"keyword": {
"ignore_above": 2000,
"type": "keyword"
}
}
},
"newPrice": {
"type": "float"
},
"categoryId": {
"type": "text",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"primaryKey": {
"type": "keyword",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"isDel": {
"type": "integer"
}
}
}',
'status' => 1, // 1=需要执行 0=已经执行完毕
],
]
];
$config["essearch"]["product"] = $product;2、ES服务类 【rabbitmq的使用需要注意一下,我这里用的是自研的Mq的连接操作,各个主流的框架应该都有MQ插件,装一个就行,主要是让数据到MQ就行】
config = $config['essearch'][$confDefault];
}
/**
* 监听队列
*/
public function listenEsMq()
{
$path = $this->config['url'] . '/MqConn';
return $this->getRes($path, $this->config['rabbitmq']);
}
/**
* 添加索引
*/
public function addIndex()
{
$path = $this->config['url'] . '/CreateMapping';
$data = [
'index' => $this->config['index'],
'mapping' => '{}'
];
print_r(curl_request($path, json_encode($data)));
}
/**
* 添加映射
*/
public function addMapping()
{
$path = $this->config['url'] . '/CreateMapping';
foreach ($this->config['addMapping'] as $value) {
if ($value['status'] == 1) {
$data = [
'index' => $this->config['index'],
'mapping' => $value['mapping'],
];
print_r(curl_post($path, json_encode($data)));
}
}
}
/*
* 搜索数据
* @param array $search 搜索配置数组 【注意所有传值都应该是字符串,int,float都会被转成字符串】
* condition:
* [
[
'field' => 'titleCn',
'search' => 'like',
'value' => '11'
], [
'field' => 'newPrice',
'search' => 'between',
'value_l' => '0',
'value_r' => '1111'
],
[
'field' => 'categoryId',
'search' => '=',
'value' => '1765',
],
[
'path' => 'warehouse_price.stock', //json串子搜索
'field' => 'warehouse_price',
'search' => 'json_search', //嵌套搜索
'path_search' => 'between', //子搜索类型 支持以下所有搜索类型
'value_l' => '1',
'value_r' => '100',
]
];
支持的搜索类型 search: = , > , >= , < , <=, != , like , not_like , in , not_in, between_left, between_right , between,json_search
between_left : 左开区间 <= value <
between_right :右开区间 < value <=
between: 全开区间 <= value <=
json_search : 嵌套搜索 [{"warehouse_code":"72","stock":405,"head_price":"2.95","tail_price":"58.37","total_price":"63.48","duty_cost":"2.16","extra_cost":"0.00"}]
*/
public function search()
{
$path = $this->config['url'] . '/SizeSearch';
$data = $tmp = [];
//字符串转换
foreach ($this->condition as $item) {
foreach ($item as $k => $value) {
if (is_array($value)) {
$value = json_encode($value);
}
$tmp[$k] = (string)$value;
}
$data[] = $tmp;
}
$search = [
'name' => (string)$this->config['index'], (string)
'condition' => $data,
'type' => $this->type,
'limit' => $this->limit > 100000 ? 100000 : intval($this->limit),
'page' => intval($this->page),
'order_field' => $this->orderField,
'order_type' => intval($this->orderType),
];
if ($this->test) {
print_r($search);
exit;
}
return $this->getRes($path, $search);
}
/**
* 搜索字段校验
* @param $searchList
* @param $request
* @return array
*/
public function checkSearch($searchList, $request)
{
$searchArr = [];
foreach ($request as $key => $value) {
foreach ($searchList as $search) {
//当填写了别名,则用别名匹配,否则用原本字段值匹配
if ((isset($search['field_alias']) && $key == $search['field_alias']) || (!isset($search['field_alias']) && $key == $search['field'])) {
if ($value !== '') {
//主键转换
if ($key == $this->config['primaryKey']) {
$search['field'] = 'primaryKey';
if (!is_array($value)) {
$value = explode(',', $value);
}
}
if ($search['search'] == 'json_search') { //表示嵌套搜索
if (!is_array($value)) {
$value = explode(',', $value);
}
$searchArr[] = [
'field' => $search['field'],
'search' => $search['search'],
'value' => $value,
'path' => $search['path'],
'path_search' => $search['path_search'],
];
} else {
if (strpos($search['search'], 'between') !== false) {
$v = explode(',', $value);
if (isset($v[0]) && isset($v[1])) {
//将日期格式化到ES接受的格式
if (isset($search['format']) && $search['format'] == 'time') {
//适配前端只传日期
if (mb_strlen($v[0]) < 11 && date('Y-m-d', strtotime($v[0])) == $v[0]) {
$v[0] = $v[0] . ' 00:00:00';
}
if (mb_strlen($v[1]) < 11 && date('Y-m-d', strtotime($v[1])) == $v[1]) {
$v[1] = $v[1] . ' 23:59:59';
}
}
$searchArr[] = [
'field' => $search['field'],
'search' => $search['search'],
'value_l' => $v[0],
'value_r' => $v[1],
];
}
} elseif (strpos($search['search'], 'in') !== false) {
//当有逗号时,则转换为数组,如果已经是数组则不转换
if (!is_array($value)) {
$value = explode(',', $value);
}
$searchArr[] = [
'field' => $search['field'],
'search' => $search['search'],
'value' => $value,
];
} else {
$searchArr[] = [
'field' => $search['field'],
'search' => $search['search'],
'value' => $value,
];
}
}
}
}
}
}
return $searchArr;
}
/**
* 创建或更新数据
* @param array[][] $data 二维数组
* @return array
*/
public function createOrUpdate($data)
{
$config = $this->config;
$type = 1;
$rabbitmq = load_library('rabbitmq', $config['mqType']);
$arr = $mapping = [];
$keys = array_unique(array_keys($data[0]));
$addMapping = [];
if (isset($config['addMapping']) && !empty($config['addMapping'])) {
foreach ($config['addMapping'] as $k1 => $value) {
$re = json_decode($value['mapping'], true)['properties'];
foreach ($re as $k => $item) {
$addMapping[$k] = $item;
}
}
}
//获取所有的映射字段
$mapKeys = array_keys($addMapping);
$nestedField = [];
//查询出所有的映射字段,以及是否是嵌套字段
foreach ($addMapping as $key => $value) {
if ($value['type'] == 'nested') {
$nestedField[] = $key;
}
}
$flag = 0;
foreach ($keys as $str) {
if (strpos($str, '_')) {
$uncamelizedWords = '_' . str_replace('_', " ", strtolower($str));
$mapping[$str] = ltrim(str_replace(" ", "", ucwords($uncamelizedWords)), '_');
if (!in_array($mapping[$str], $mapKeys)) {
return [-1, '存在没有映射的字段' . $mapping[$str] . ',请添加映射后再推送'];
}
} else {
$mapping[$str] = $str;
}
if (strpos($str, $config['primaryKey']) !== false) {
$mapping[$str] = 'primaryKey';
$flag = 1;
}
}
if (empty($flag)) {
return [-1, '必须填写主键字段' . $config['primaryKey']];
}
//将字段名称转成单驼峰
foreach ($data as $k => $re) {
$t = [];
foreach ($re as $key => $value) {
if (in_array($key, array_keys($mapping))) {
if (in_array($mapping[$key], $nestedField)) { //嵌套字段,进行修改
if (($value !== '' && $value !== null) || $value === 0) {
if ($value === 0) {
$value = "0";
}
$jsonStr = json_decode((string)$value);
if (!empty($jsonStr)) {
$t[$mapping[$key] . 'NestedEs'] = [$mapping[$key] => $jsonStr];
} else {
return [-1, $mapping[$key] . ' 字段json解析错误!' . $jsonStr];
}
}
} else {
if (($value !== '' && $value !== null) || $value === 0) {
if ($value === 0) {
$value = "0";
}
//加上对应的类型
$t[$mapping[$key]] = $value;
}
}
}
}
$arr[] = $t;
}
if ($this->test == 1) {
return [1, $arr];
}
//分批推送到队列
$allNum = count($arr);
foreach ($arr as $key => $value) {
$saveData[] = $value;
//当循环到不够一次封装的量时,则单条推送出去
if ($allNum - $key <= $this->singlePushNum + 1) {
$newArr = [
'type' => $type,
'name' => $config['index'],
'content' => $saveData
];
$rabbitmq->sendMsg(json_encode($newArr), $config['rabbitmq']['ex_name'], $config['rabbitmq']['qu_name']);
$saveData = [];
continue;
}
if ($key % $this->singlePushNum == 0) {
$newArr = [
'type' => $type,
'name' => $config['index'],
'content' => $saveData
];
$rabbitmq->sendMsg(json_encode($newArr), $config['rabbitmq']['ex_name'], $config['rabbitmq']['qu_name']);
$saveData = [];
}
}
return [1, '推送成功!'];
}
public function getRes($path, $data)
{
return curl_request($path, json_encode($data));
}
} 3、调用示例
'06WFS64900',
'title_cn' => '手机',
'category_id' => 1567,
];
$es = new EsActionService('product');
$es->singlePushNum = 200;
$result = $es->createOrUpdate($res);
//查询
$request = [
'sku' => '00000198,00EOT00708'
];
$arg = [
'condition' => [
[
'field' => 'sku',
'search' => 'in',
],
[
'field' => 'titleEn',
'search' => 'like',
],
[
'field' => 'titleCn',
'search' => 'like',
],
[
'field' => 'createdAt',
'search' => 'between',
'format' => 'time'
],
[
'field' => 'endTime',
'search' => 'between_left',
'format' => 'time' //格式化时间 2020-11-01 22:01:00 兼容前端只传日期ES搜索不到,后端自动加上时分秒
],
[
'path' => 'attrIds.id', //json串子搜索
'field' => 'attrIds',
'search' => 'json_search', //嵌套搜索
'path_search' => 'in', //子搜索类型
],
],
];
$es = new EsActionService('product');
$es->test = isset($request['testEs']) ? 1 : 0;
$es->condition = $es->checkSearch($arg['condition'], $request);
$res = $es->search(); 4、go服务代码【估计你们不会想看,所以我就传到了自己的GitHub上,开箱即用】
https://github.com/oyym123/go_es_search
留言簿